You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:31 UTC
[26/50] [abbrv] flink git commit: [FLINK-4538][FLINK-4348]
ResourceManager slot allocation protcol
http://git-wip-us.apache.org/repos/asf/flink/blob/e5894877/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
deleted file mode 100644
index 52d9d06..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
+++ /dev/null
@@ -1,538 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-public class SlotManagerTest {
-
- private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
-
- private static final long DEFAULT_TESTING_MEMORY = 512;
-
- private static final ResourceProfile DEFAULT_TESTING_PROFILE =
- new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY);
-
- private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
- new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2);
-
- private ResourceManagerGateway resourceManagerGateway;
-
- @Before
- public void setUp() {
- resourceManagerGateway = mock(ResourceManagerGateway.class);
- }
-
- /**
- * Tests that there are no free slots when we request, need to allocate from cluster manager master
- */
- @Test
- public void testRequestSlotWithoutFreeSlot() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-
- assertEquals(0, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(1, slotManager.getPendingRequestCount());
- assertEquals(1, slotManager.getAllocatedContainers().size());
- assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0));
- }
-
- /**
- * Tests that there are some free slots when we request, and the request is fulfilled immediately
- */
- @Test
- public void testRequestSlotWithFreeSlot() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
- directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
- assertEquals(1, slotManager.getFreeSlotCount());
-
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertEquals(0, slotManager.getAllocatedContainers().size());
- }
-
- /**
- * Tests that there are some free slots when we request, but none of them are suitable
- */
- @Test
- public void testRequestSlotWithoutSuitableSlot() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
- directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 2);
- assertEquals(2, slotManager.getFreeSlotCount());
-
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
- assertEquals(0, slotManager.getAllocatedSlotCount());
- assertEquals(2, slotManager.getFreeSlotCount());
- assertEquals(1, slotManager.getPendingRequestCount());
- assertEquals(1, slotManager.getAllocatedContainers().size());
- assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
- }
-
- /**
- * Tests that we send duplicated slot request
- */
- @Test
- public void testDuplicatedSlotRequest() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
-
- SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
-
- slotManager.requestSlot(request1);
- slotManager.requestSlot(request2);
- slotManager.requestSlot(request2);
- slotManager.requestSlot(request1);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(1, slotManager.getPendingRequestCount());
- assertEquals(1, slotManager.getAllocatedContainers().size());
- assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
- }
-
- /**
- * Tests that we send multiple slot requests
- */
- @Test
- public void testRequestMultipleSlots() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 5);
-
- // request 3 normal slots
- for (int i = 0; i < 3; ++i) {
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
- }
-
- // request 2 big slots
- for (int i = 0; i < 2; ++i) {
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
- }
-
- // request 1 normal slot again
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-
- assertEquals(4, slotManager.getAllocatedSlotCount());
- assertEquals(1, slotManager.getFreeSlotCount());
- assertEquals(2, slotManager.getPendingRequestCount());
- assertEquals(2, slotManager.getAllocatedContainers().size());
- assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
- assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(1));
- }
-
- /**
- * Tests that a new slot appeared in SlotReport, and we used it to fulfill a pending request
- */
- @Test
- public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
- assertEquals(1, slotManager.getPendingRequestCount());
-
- SlotID slotId = SlotID.generate();
- SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertTrue(slotManager.isAllocated(slotId));
- }
-
- /**
- * Tests that a new slot appeared in SlotReport, but we have no pending request
- */
- @Test
- public void testNewlyAppearedFreeSlot() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- SlotID slotId = SlotID.generate();
- SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(0, slotManager.getAllocatedSlotCount());
- assertEquals(1, slotManager.getFreeSlotCount());
- }
-
- /**
- * Tests that a new slot appeared in SlotReport, but it't not suitable for all the pending requests
- */
- @Test
- public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
- assertEquals(1, slotManager.getPendingRequestCount());
-
- SlotID slotId = SlotID.generate();
- SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(0, slotManager.getAllocatedSlotCount());
- assertEquals(1, slotManager.getFreeSlotCount());
- assertEquals(1, slotManager.getPendingRequestCount());
- assertFalse(slotManager.isAllocated(slotId));
- }
-
- /**
- * Tests that a new slot appeared in SlotReport, and it's been reported using by some job
- */
- @Test
- public void testNewlyAppearedInUseSlot() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
- SlotID slotId = SlotID.generate();
- SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertTrue(slotManager.isAllocated(slotId));
- }
-
- /**
- * Tests that we had a slot in-use, and it's confirmed by SlotReport
- */
- @Test
- public void testExistingInUseSlotUpdateStatus() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- slotManager.requestSlot(request);
-
- // make this slot in use
- SlotID slotId = SlotID.generate();
- SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertTrue(slotManager.isAllocated(slotId));
-
- // slot status is confirmed
- SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE,
- request.getAllocationId(), request.getJobId());
- slotManager.updateSlotStatus(slotStatus2);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertTrue(slotManager.isAllocated(slotId));
- }
-
- /**
- * Tests that we had a slot in-use, but it's empty according to the SlotReport
- */
- @Test
- public void testExistingInUseSlotAdjustedToEmpty() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- slotManager.requestSlot(request1);
-
- // make this slot in use
- SlotID slotId = SlotID.generate();
- SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
- slotManager.updateSlotStatus(slotStatus);
-
- // another request pending
- SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- slotManager.requestSlot(request2);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(1, slotManager.getPendingRequestCount());
- assertTrue(slotManager.isAllocated(slotId));
- assertTrue(slotManager.isAllocated(request1.getAllocationId()));
-
-
- // but slot is reported empty again, request2 will be fulfilled, request1 will be missing
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertTrue(slotManager.isAllocated(slotId));
- assertTrue(slotManager.isAllocated(request2.getAllocationId()));
- }
-
- /**
- * Tests that we had a slot in use, and it's also reported in use by TaskManager, but the allocation
- * information didn't match.
- */
- @Test
- public void testExistingInUseSlotWithDifferentAllocationInfo() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- slotManager.requestSlot(request);
-
- // make this slot in use
- SlotID slotId = SlotID.generate();
- SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertTrue(slotManager.isAllocated(slotId));
- assertTrue(slotManager.isAllocated(request.getAllocationId()));
-
- SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
- // update slot status with different allocation info
- slotManager.updateSlotStatus(slotStatus2);
-
- // original request is missing and won't be allocated
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertTrue(slotManager.isAllocated(slotId));
- assertFalse(slotManager.isAllocated(request.getAllocationId()));
- assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
- }
-
- /**
- * Tests that we had a free slot, and it's confirmed by SlotReport
- */
- @Test
- public void testExistingEmptySlotUpdateStatus() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
- slotManager.addFreeSlot(slot);
-
- SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE);
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(0, slotManager.getAllocatedSlotCount());
- assertEquals(1, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- }
-
- /**
- * Tests that we had a free slot, and it's reported in-use by TaskManager
- */
- @Test
- public void testExistingEmptySlotAdjustedToInUse() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
- slotManager.addFreeSlot(slot);
-
- SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE,
- new AllocationID(), new JobID());
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertTrue(slotManager.isAllocated(slot.getSlotId()));
- }
-
- /**
- * Tests that we did some allocation but failed / rejected by TaskManager, request will retry
- */
- @Test
- public void testSlotAllocationFailedAtTaskManager() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
- slotManager.addFreeSlot(slot);
-
- SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- slotManager.requestSlot(request);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertTrue(slotManager.isAllocated(slot.getSlotId()));
-
- slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- }
-
-
- /**
- * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request
- */
- @Test
- public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
- slotManager.addFreeSlot(slot);
-
- SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- slotManager.requestSlot(request);
-
- // slot is set empty by heartbeat
- SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), slot.getResourceProfile());
- slotManager.updateSlotStatus(slotStatus);
-
- // another request took this slot
- SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- slotManager.requestSlot(request2);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertFalse(slotManager.isAllocated(request.getAllocationId()));
- assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-
- // original request should be pended
- slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(1, slotManager.getPendingRequestCount());
- assertFalse(slotManager.isAllocated(request.getAllocationId()));
- assertTrue(slotManager.isAllocated(request2.getAllocationId()));
- }
-
- @Test
- public void testNotifyTaskManagerFailure() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
- ResourceID resource1 = ResourceID.generate();
- ResourceID resource2 = ResourceID.generate();
-
- ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE);
- ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE);
- ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE);
- ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE);
-
- slotManager.addFreeSlot(slot11);
- slotManager.addFreeSlot(slot21);
-
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-
- assertEquals(2, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
-
- slotManager.addFreeSlot(slot12);
- slotManager.addFreeSlot(slot22);
-
- assertEquals(2, slotManager.getAllocatedSlotCount());
- assertEquals(2, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
-
- slotManager.notifyTaskManagerFailure(resource2);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(1, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
-
- // notify an not exist resource failure
- slotManager.notifyTaskManagerFailure(ResourceID.generate());
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(1, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- }
-
- // ------------------------------------------------------------------------
- // testing utilities
- // ------------------------------------------------------------------------
-
- private void directlyProvideFreeSlots(
- final SlotManager slotManager,
- final ResourceProfile resourceProfile,
- final int freeSlotNum)
- {
- for (int i = 0; i < freeSlotNum; ++i) {
- slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile)));
- }
- }
-
- // ------------------------------------------------------------------------
- // testing classes
- // ------------------------------------------------------------------------
-
- private static class TestingSlotManager extends SlotManager {
-
- private final List<ResourceProfile> allocatedContainers;
-
- TestingSlotManager(ResourceManagerGateway resourceManagerGateway) {
- super(resourceManagerGateway);
- this.allocatedContainers = new LinkedList<>();
- }
-
- /**
- * Choose slot randomly if it matches requirement
- *
- * @param request The slot request
- * @param freeSlots All slots which can be used
- * @return The chosen slot or null if cannot find a match
- */
- @Override
- protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
- for (ResourceSlot slot : freeSlots.values()) {
- if (slot.isMatchingRequirement(request.getResourceProfile())) {
- return slot;
- }
- }
- return null;
- }
-
- /**
- * Choose request randomly if offered slot can match its requirement
- *
- * @param offeredSlot The free slot
- * @param pendingRequests All the pending slot requests
- * @return The chosen request's AllocationID or null if cannot find a match
- */
- @Override
- protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot,
- Map<AllocationID, SlotRequest> pendingRequests)
- {
- for (Map.Entry<AllocationID, SlotRequest> pendingRequest : pendingRequests.entrySet()) {
- if (offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile())) {
- return pendingRequest.getValue();
- }
- }
- return null;
- }
-
- @Override
- protected void allocateContainer(ResourceProfile resourceProfile) {
- allocatedContainers.add(resourceProfile);
- }
-
- List<ResourceProfile> getAllocatedContainers() {
- return allocatedContainers;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5894877/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
new file mode 100644
index 0000000..9ee9690
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class SlotManagerTest {
+
+ private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
+
+ private static final long DEFAULT_TESTING_MEMORY = 512;
+
+ private static final ResourceProfile DEFAULT_TESTING_PROFILE =
+ new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY);
+
+ private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
+ new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2);
+
+ private static TaskExecutorGateway taskExecutorGateway;
+
+ @BeforeClass
+ public static void setUp() {
+ taskExecutorGateway = Mockito.mock(TaskExecutorGateway.class);
+ }
+
+ /**
+ * Tests that there are no free slots when we request, need to allocate from cluster manager master
+ */
+ @Test
+ public void testRequestSlotWithoutFreeSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertEquals(1, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0));
+ }
+
+ /**
+ * Tests that there are some free slots when we request, and the request is fulfilled immediately
+ */
+ @Test
+ public void testRequestSlotWithFreeSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
+ assertEquals(1, slotManager.getFreeSlotCount());
+
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertEquals(0, slotManager.getAllocatedContainers().size());
+ }
+
+ /**
+ * Tests that there are some free slots when we request, but none of them are suitable
+ */
+ @Test
+ public void testRequestSlotWithoutSuitableSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 2);
+ assertEquals(2, slotManager.getFreeSlotCount());
+
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(2, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertEquals(1, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+ }
+
+ /**
+ * Tests that we send duplicated slot request
+ */
+ @Test
+ public void testDuplicatedSlotRequest() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
+
+ SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
+
+ slotManager.requestSlot(request1);
+ slotManager.requestSlot(request2);
+ slotManager.requestSlot(request2);
+ slotManager.requestSlot(request1);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertEquals(1, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+ }
+
+ /**
+ * Tests that we send multiple slot requests
+ */
+ @Test
+ public void testRequestMultipleSlots() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 5);
+
+ // request 3 normal slots
+ for (int i = 0; i < 3; ++i) {
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ }
+
+ // request 2 big slots
+ for (int i = 0; i < 2; ++i) {
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+ }
+
+ // request 1 normal slot again
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+ assertEquals(4, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(2, slotManager.getPendingRequestCount());
+ assertEquals(2, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(1));
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, and we used it to fulfill a pending request
+ */
+ @Test
+ public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ assertEquals(1, slotManager.getPendingRequestCount());
+
+ SlotID slotId = SlotID.generate();
+ slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, but we have no pending request
+ */
+ @Test
+ public void testNewlyAppearedFreeSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+
+ SlotID slotId = SlotID.generate();
+ slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, but it't not suitable for all the pending requests
+ */
+ @Test
+ public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+ assertEquals(1, slotManager.getPendingRequestCount());
+
+ SlotID slotId = SlotID.generate();
+ slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertFalse(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, and it's been reported using by some job
+ */
+ @Test
+ public void testNewlyAppearedInUseSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+
+ SlotID slotId = SlotID.generate();
+ slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that we had a slot in-use, and it's confirmed by SlotReport
+ */
+ @Test
+ public void testExistingInUseSlotUpdateStatus() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ // make this slot in use
+ SlotID slotId = SlotID.generate();
+ slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertTrue(slotManager.isAllocated(slotId));
+
+ // slot status is confirmed
+ SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE,
+ request.getJobId(), request.getAllocationId());
+ slotManager.updateSlotStatus(slotStatus2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that we had a slot in-use, but it's empty according to the SlotReport
+ */
+ @Test
+ public void testExistingInUseSlotAdjustedToEmpty() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+ SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request1);
+
+ // make this slot in use
+ SlotID slotId = SlotID.generate();
+ slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ // another request pending
+ SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertTrue(slotManager.isAllocated(request1.getAllocationId()));
+
+
+ // but slot is reported empty again, request2 will be fulfilled, request1 will be missing
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+ }
+
+ /**
+ * Tests that we had a slot in use, and it's also reported in use by TaskManager, but the allocation
+ * information didn't match.
+ */
+ @Test
+ public void testExistingInUseSlotWithDifferentAllocationInfo() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ // make this slot in use
+ SlotID slotId = SlotID.generate();
+ slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertTrue(slotManager.isAllocated(request.getAllocationId()));
+
+ SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
+ // update slot status with different allocation info
+ slotManager.updateSlotStatus(slotStatus2);
+
+ // original request is missing and won't be allocated
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertFalse(slotManager.isAllocated(request.getAllocationId()));
+ assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
+ }
+
+ /**
+ * Tests that we had a free slot, and it's confirmed by SlotReport
+ */
+ @Test
+ public void testExistingEmptySlotUpdateStatus() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+ ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+ slotManager.addFreeSlot(slot);
+
+ SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ }
+
+ /**
+ * Tests that we had a free slot, and it's reported in-use by TaskManager
+ */
+ @Test
+ public void testExistingEmptySlotAdjustedToInUse() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+ final SlotID slotID = SlotID.generate();
+ slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorGateway);
+
+ ResourceSlot slot = new ResourceSlot(slotID, DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+ slotManager.addFreeSlot(slot);
+
+ SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE,
+ new JobID(), new AllocationID());
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slot.getSlotId()));
+ }
+
+ /**
+ * Tests that we did some allocation but failed / rejected by TaskManager, request will retry
+ */
+ @Test
+ public void testSlotAllocationFailedAtTaskManager() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+ ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+ slotManager.addFreeSlot(slot);
+
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slot.getSlotId()));
+
+ slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ }
+
+
+ /**
+ * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request
+ */
+ @Test
+ public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+ final SlotID slotID = SlotID.generate();
+ slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorGateway);
+
+ ResourceSlot slot = new ResourceSlot(slotID, DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+ slotManager.addFreeSlot(slot);
+
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ // slot is set empty by heartbeat
+ SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), slot.getResourceProfile());
+ slotManager.updateSlotStatus(slotStatus);
+
+ // another request took this slot
+ SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertFalse(slotManager.isAllocated(request.getAllocationId()));
+ assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+
+ // original request should be pended
+ slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertFalse(slotManager.isAllocated(request.getAllocationId()));
+ assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+ }
+
+ @Test
+ public void testNotifyTaskManagerFailure() {
+ TestingSlotManager slotManager = new TestingSlotManager();
+
+ ResourceID resource1 = ResourceID.generate();
+ ResourceID resource2 = ResourceID.generate();
+
+ ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+ ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+ ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+ ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+
+ slotManager.addFreeSlot(slot11);
+ slotManager.addFreeSlot(slot21);
+
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+ assertEquals(2, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+
+ slotManager.addFreeSlot(slot12);
+ slotManager.addFreeSlot(slot22);
+
+ assertEquals(2, slotManager.getAllocatedSlotCount());
+ assertEquals(2, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+
+ slotManager.notifyTaskManagerFailure(resource2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+
+ // notify an not exist resource failure
+ slotManager.notifyTaskManagerFailure(ResourceID.generate());
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ }
+
+ // ------------------------------------------------------------------------
+ // testing utilities
+ // ------------------------------------------------------------------------
+
+ private void directlyProvideFreeSlots(
+ final SlotManager slotManager,
+ final ResourceProfile resourceProfile,
+ final int freeSlotNum)
+ {
+ for (int i = 0; i < freeSlotNum; ++i) {
+ slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile), taskExecutorGateway));
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // testing classes
+ // ------------------------------------------------------------------------
+
+ private static class TestingSlotManager extends SlotManager {
+
+ private final List<ResourceProfile> allocatedContainers;
+
+ TestingSlotManager() {
+ this.allocatedContainers = new LinkedList<>();
+ }
+
+ /**
+ * Choose slot randomly if it matches requirement
+ *
+ * @param request The slot request
+ * @param freeSlots All slots which can be used
+ * @return The chosen slot or null if cannot find a match
+ */
+ @Override
+ protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
+ for (ResourceSlot slot : freeSlots.values()) {
+ if (slot.isMatchingRequirement(request.getResourceProfile())) {
+ return slot;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Choose request randomly if offered slot can match its requirement
+ *
+ * @param offeredSlot The free slot
+ * @param pendingRequests All the pending slot requests
+ * @return The chosen request's AllocationID or null if cannot find a match
+ */
+ @Override
+ protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot,
+ Map<AllocationID, SlotRequest> pendingRequests)
+ {
+ for (Map.Entry<AllocationID, SlotRequest> pendingRequest : pendingRequests.entrySet()) {
+ if (offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile())) {
+ return pendingRequest.getValue();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ protected void allocateContainer(ResourceProfile resourceProfile) {
+ allocatedContainers.add(resourceProfile);
+ }
+
+ List<ResourceProfile> getAllocatedContainers() {
+ return allocatedContainers;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5894877/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
new file mode 100644
index 0000000..85d2880
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest extends TestLogger {
+
+ private static TestingSerialRpcService testRpcService;
+
+ @BeforeClass
+ public static void beforeClass() {
+ testRpcService = new TestingSerialRpcService();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ testRpcService.stopService();
+ testRpcService = null;
+ }
+
+ @Before
+ public void beforeTest(){
+ testRpcService.clearGateways();
+ }
+
+ /**
+ * Tests whether
+ * 1) SlotRequest is routed to the SlotManager
+ * 2) SlotRequest is confirmed
+ * 3) SlotRequest leads to a container allocation
+ * 4) Slot becomes available and TaskExecutor gets a SlotRequest
+ */
+ @Test
+ public void testSlotsUnavailableRequest() throws Exception {
+ final String rmAddress = "/rm1";
+ final String jmAddress = "/jm1";
+ final JobID jobID = new JobID();
+
+ testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
+
+
+ TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
+ ResourceManager resourceManager =
+ new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
+ resourceManager.start();
+
+ Future<RegistrationResponse> registrationFuture =
+ resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
+ try {
+ Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ Assert.fail("JobManager registration Future didn't become ready.");
+ }
+
+ final AllocationID allocationID = new AllocationID();
+ final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
+
+ SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
+ SlotRequestReply slotRequestReply =
+ resourceManager.requestSlot(slotRequest);
+
+ // 1) SlotRequest is routed to the SlotManager
+ verify(slotManager).requestSlot(slotRequest);
+
+ // 2) SlotRequest is confirmed
+ Assert.assertEquals(
+ slotRequestReply.getAllocationID(),
+ allocationID);
+
+ // 3) SlotRequest leads to a container allocation
+ verify(slotManager, timeout(5000)).allocateContainer(resourceProfile);
+
+ Assert.assertFalse(slotManager.isAllocated(allocationID));
+
+ // slot becomes available
+ final String tmAddress = "/tm1";
+ TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+ testRpcService.registerGateway(tmAddress, taskExecutorGateway);
+
+ final ResourceID resourceID = ResourceID.generate();
+ final SlotID slotID = new SlotID(resourceID, 0);
+
+ final SlotStatus slotStatus =
+ new SlotStatus(slotID, resourceProfile);
+ final SlotReport slotReport =
+ new SlotReport(Collections.singletonList(slotStatus), resourceID);
+ // register slot at SlotManager
+ slotManager.registerTaskExecutor(resourceID, taskExecutorGateway);
+ slotManager.updateSlotStatus(slotReport);
+
+ // 4) Slot becomes available and TaskExecutor gets a SlotRequest
+ verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(FiniteDuration.class));
+ }
+
+ /**
+ * Tests whether
+ * 1) a SlotRequest is routed to the SlotManager
+ * 2) a SlotRequest is confirmed
+ * 3) a SlotRequest leads to an allocation of a registered slot
+ * 4) a SlotRequest is routed to the TaskExecutor
+ */
+ @Test
+ public void testSlotAvailableRequest() throws Exception {
+ final String rmAddress = "/rm1";
+ final String jmAddress = "/jm1";
+ final String tmAddress = "/tm1";
+ final JobID jobID = new JobID();
+
+ testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
+
+ TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+ testRpcService.registerGateway(tmAddress, taskExecutorGateway);
+
+ TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
+ ResourceManager resourceManager =
+ new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
+ resourceManager.start();
+
+ Future<RegistrationResponse> registrationFuture =
+ resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
+ try {
+ Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ Assert.fail("JobManager registration Future didn't become ready.");
+ }
+
+ final ResourceID resourceID = ResourceID.generate();
+ final AllocationID allocationID = new AllocationID();
+ final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
+ final SlotID slotID = new SlotID(resourceID, 0);
+
+ final SlotStatus slotStatus =
+ new SlotStatus(slotID, resourceProfile);
+ final SlotReport slotReport =
+ new SlotReport(Collections.singletonList(slotStatus), resourceID);
+ // register slot at SlotManager
+ slotManager.registerTaskExecutor(resourceID, taskExecutorGateway);
+ slotManager.updateSlotStatus(slotReport);
+
+ SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
+ SlotRequestReply slotRequestReply =
+ resourceManager.requestSlot(slotRequest);
+
+ // 1) a SlotRequest is routed to the SlotManager
+ verify(slotManager).requestSlot(slotRequest);
+
+ // 2) a SlotRequest is confirmed
+ Assert.assertEquals(
+ slotRequestReply.getAllocationID(),
+ allocationID);
+
+ // 3) a SlotRequest leads to an allocation of a registered slot
+ Assert.assertTrue(slotManager.isAllocated(slotID));
+ Assert.assertTrue(slotManager.isAllocated(allocationID));
+
+
+ // 4) a SlotRequest is routed to the TaskExecutor
+ verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(FiniteDuration.class));
+ }
+
+
+ private static class TestingSlotManager extends SimpleSlotManager {
+
+ // change visibility of function to public for testing
+ @Override
+ public void allocateContainer(ResourceProfile resourceProfile) {
+ super.allocateContainer(resourceProfile);
+ }
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5894877/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index 7e92e8d..2212680 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -112,4 +112,8 @@ public class TestingRpcService extends AkkaRpcService {
return Futures.failed(new Exception("No gateway registered under that name"));
}
}
-}
\ No newline at end of file
+
+ public void clearGateways() {
+ registeredConnections.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5894877/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 955edcc..01776ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -137,6 +137,10 @@ public class TestingSerialRpcService implements RpcService {
}
}
+ public void clearGateways() {
+ registeredConnections.clear();
+ }
+
private static final class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable {
private final T rpcEndpoint;