You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:45:45 UTC
[03/50] [abbrv] flink git commit: [FLINK-4529] [flip-6] Move
TaskExecutor, JobMaster and ResourceManager out of the rpc package
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
deleted file mode 100644
index 2ee280f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
+++ /dev/null
@@ -1,540 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.rpc.taskexecutor.SlotStatus;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-public class SlotManagerTest {
-
- private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
-
- private static final long DEFAULT_TESTING_MEMORY = 512;
-
- private static final ResourceProfile DEFAULT_TESTING_PROFILE =
- new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY);
-
- private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
- new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2);
-
- private ResourceManagerGateway resourceManagerGateway;
-
- @Before
- public void setUp() {
- resourceManagerGateway = mock(ResourceManagerGateway.class);
- }
-
- /**
- * Tests that there are no free slots when we request, need to allocate from cluster manager master
- */
- @Test
- public void testRequestSlotWithoutFreeSlot() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-
- assertEquals(0, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(1, slotManager.getPendingRequestCount());
- assertEquals(1, slotManager.getAllocatedContainers().size());
- assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0));
- }
-
- /**
- * Tests that there are some free slots when we request, and the request is fulfilled immediately
- */
- @Test
- public void testRequestSlotWithFreeSlot() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
- directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
- assertEquals(1, slotManager.getFreeSlotCount());
-
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertEquals(0, slotManager.getAllocatedContainers().size());
- }
-
- /**
- * Tests that there are some free slots when we request, but none of them are suitable
- */
- @Test
- public void testRequestSlotWithoutSuitableSlot() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
- directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 2);
- assertEquals(2, slotManager.getFreeSlotCount());
-
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
- assertEquals(0, slotManager.getAllocatedSlotCount());
- assertEquals(2, slotManager.getFreeSlotCount());
- assertEquals(1, slotManager.getPendingRequestCount());
- assertEquals(1, slotManager.getAllocatedContainers().size());
- assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
- }
-
- /**
- * Tests that we send duplicated slot request
- */
- @Test
- public void testDuplicatedSlotRequest() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
-
- SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
-
- slotManager.requestSlot(request1);
- slotManager.requestSlot(request2);
- slotManager.requestSlot(request2);
- slotManager.requestSlot(request1);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(1, slotManager.getPendingRequestCount());
- assertEquals(1, slotManager.getAllocatedContainers().size());
- assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
- }
-
- /**
- * Tests that we send multiple slot requests
- */
- @Test
- public void testRequestMultipleSlots() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 5);
-
- // request 3 normal slots
- for (int i = 0; i < 3; ++i) {
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
- }
-
- // request 2 big slots
- for (int i = 0; i < 2; ++i) {
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
- }
-
- // request 1 normal slot again
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-
- assertEquals(4, slotManager.getAllocatedSlotCount());
- assertEquals(1, slotManager.getFreeSlotCount());
- assertEquals(2, slotManager.getPendingRequestCount());
- assertEquals(2, slotManager.getAllocatedContainers().size());
- assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
- assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(1));
- }
-
- /**
- * Tests that a new slot appeared in SlotReport, and we used it to fulfill a pending request
- */
- @Test
- public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
- assertEquals(1, slotManager.getPendingRequestCount());
-
- SlotID slotId = SlotID.generate();
- SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertTrue(slotManager.isAllocated(slotId));
- }
-
- /**
- * Tests that a new slot appeared in SlotReport, but we have no pending request
- */
- @Test
- public void testNewlyAppearedFreeSlot() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- SlotID slotId = SlotID.generate();
- SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(0, slotManager.getAllocatedSlotCount());
- assertEquals(1, slotManager.getFreeSlotCount());
- }
-
- /**
- * Tests that a new slot appeared in SlotReport, but it't not suitable for all the pending requests
- */
- @Test
- public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
- assertEquals(1, slotManager.getPendingRequestCount());
-
- SlotID slotId = SlotID.generate();
- SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(0, slotManager.getAllocatedSlotCount());
- assertEquals(1, slotManager.getFreeSlotCount());
- assertEquals(1, slotManager.getPendingRequestCount());
- assertFalse(slotManager.isAllocated(slotId));
- }
-
- /**
- * Tests that a new slot appeared in SlotReport, and it's been reported using by some job
- */
- @Test
- public void testNewlyAppearedInUseSlot() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
- SlotID slotId = SlotID.generate();
- SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertTrue(slotManager.isAllocated(slotId));
- }
-
- /**
- * Tests that we had a slot in-use, and it's confirmed by SlotReport
- */
- @Test
- public void testExistingInUseSlotUpdateStatus() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- slotManager.requestSlot(request);
-
- // make this slot in use
- SlotID slotId = SlotID.generate();
- SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertTrue(slotManager.isAllocated(slotId));
-
- // slot status is confirmed
- SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE,
- request.getAllocationId(), request.getJobId());
- slotManager.updateSlotStatus(slotStatus2);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertTrue(slotManager.isAllocated(slotId));
- }
-
- /**
- * Tests that we had a slot in-use, but it's empty according to the SlotReport
- */
- @Test
- public void testExistingInUseSlotAdjustedToEmpty() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- slotManager.requestSlot(request1);
-
- // make this slot in use
- SlotID slotId = SlotID.generate();
- SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
- slotManager.updateSlotStatus(slotStatus);
-
- // another request pending
- SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- slotManager.requestSlot(request2);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(1, slotManager.getPendingRequestCount());
- assertTrue(slotManager.isAllocated(slotId));
- assertTrue(slotManager.isAllocated(request1.getAllocationId()));
-
-
- // but slot is reported empty again, request2 will be fulfilled, request1 will be missing
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertTrue(slotManager.isAllocated(slotId));
- assertTrue(slotManager.isAllocated(request2.getAllocationId()));
- }
-
- /**
- * Tests that we had a slot in use, and it's also reported in use by TaskManager, but the allocation
- * information didn't match.
- */
- @Test
- public void testExistingInUseSlotWithDifferentAllocationInfo() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- slotManager.requestSlot(request);
-
- // make this slot in use
- SlotID slotId = SlotID.generate();
- SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertTrue(slotManager.isAllocated(slotId));
- assertTrue(slotManager.isAllocated(request.getAllocationId()));
-
- SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
- // update slot status with different allocation info
- slotManager.updateSlotStatus(slotStatus2);
-
- // original request is missing and won't be allocated
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertTrue(slotManager.isAllocated(slotId));
- assertFalse(slotManager.isAllocated(request.getAllocationId()));
- assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
- }
-
- /**
- * Tests that we had a free slot, and it's confirmed by SlotReport
- */
- @Test
- public void testExistingEmptySlotUpdateStatus() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
- slotManager.addFreeSlot(slot);
-
- SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE);
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(0, slotManager.getAllocatedSlotCount());
- assertEquals(1, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- }
-
- /**
- * Tests that we had a free slot, and it's reported in-use by TaskManager
- */
- @Test
- public void testExistingEmptySlotAdjustedToInUse() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
- slotManager.addFreeSlot(slot);
-
- SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE,
- new AllocationID(), new JobID());
- slotManager.updateSlotStatus(slotStatus);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertTrue(slotManager.isAllocated(slot.getSlotId()));
- }
-
- /**
- * Tests that we did some allocation but failed / rejected by TaskManager, request will retry
- */
- @Test
- public void testSlotAllocationFailedAtTaskManager() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
- slotManager.addFreeSlot(slot);
-
- SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- slotManager.requestSlot(request);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertTrue(slotManager.isAllocated(slot.getSlotId()));
-
- slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- }
-
-
- /**
- * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request
- */
- @Test
- public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
- ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
- slotManager.addFreeSlot(slot);
-
- SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- slotManager.requestSlot(request);
-
- // slot is set empty by heartbeat
- SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), slot.getResourceProfile());
- slotManager.updateSlotStatus(slotStatus);
-
- // another request took this slot
- SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
- slotManager.requestSlot(request2);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- assertFalse(slotManager.isAllocated(request.getAllocationId()));
- assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-
- // original request should be pended
- slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(1, slotManager.getPendingRequestCount());
- assertFalse(slotManager.isAllocated(request.getAllocationId()));
- assertTrue(slotManager.isAllocated(request2.getAllocationId()));
- }
-
- @Test
- public void testNotifyTaskManagerFailure() {
- TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
- ResourceID resource1 = ResourceID.generate();
- ResourceID resource2 = ResourceID.generate();
-
- ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE);
- ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE);
- ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE);
- ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE);
-
- slotManager.addFreeSlot(slot11);
- slotManager.addFreeSlot(slot21);
-
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
- slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-
- assertEquals(2, slotManager.getAllocatedSlotCount());
- assertEquals(0, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
-
- slotManager.addFreeSlot(slot12);
- slotManager.addFreeSlot(slot22);
-
- assertEquals(2, slotManager.getAllocatedSlotCount());
- assertEquals(2, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
-
- slotManager.notifyTaskManagerFailure(resource2);
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(1, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
-
- // notify an not exist resource failure
- slotManager.notifyTaskManagerFailure(ResourceID.generate());
-
- assertEquals(1, slotManager.getAllocatedSlotCount());
- assertEquals(1, slotManager.getFreeSlotCount());
- assertEquals(0, slotManager.getPendingRequestCount());
- }
-
- // ------------------------------------------------------------------------
- // testing utilities
- // ------------------------------------------------------------------------
-
- private void directlyProvideFreeSlots(
- final SlotManager slotManager,
- final ResourceProfile resourceProfile,
- final int freeSlotNum)
- {
- for (int i = 0; i < freeSlotNum; ++i) {
- slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile)));
- }
- }
-
- // ------------------------------------------------------------------------
- // testing classes
- // ------------------------------------------------------------------------
-
- private static class TestingSlotManager extends SlotManager {
-
- private final List<ResourceProfile> allocatedContainers;
-
- TestingSlotManager(ResourceManagerGateway resourceManagerGateway) {
- super(resourceManagerGateway);
- this.allocatedContainers = new LinkedList<>();
- }
-
- /**
- * Choose slot randomly if it matches requirement
- *
- * @param request The slot request
- * @param freeSlots All slots which can be used
- * @return The chosen slot or null if cannot find a match
- */
- @Override
- protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
- for (ResourceSlot slot : freeSlots.values()) {
- if (slot.isMatchingRequirement(request.getResourceProfile())) {
- return slot;
- }
- }
- return null;
- }
-
- /**
- * Choose request randomly if offered slot can match its requirement
- *
- * @param offeredSlot The free slot
- * @param pendingRequests All the pending slot requests
- * @return The chosen request's AllocationID or null if cannot find a match
- */
- @Override
- protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot,
- Map<AllocationID, SlotRequest> pendingRequests)
- {
- for (Map.Entry<AllocationID, SlotRequest> pendingRequest : pendingRequests.entrySet()) {
- if (offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile())) {
- return pendingRequest.getValue();
- }
- }
- return null;
- }
-
- @Override
- protected void allocateContainer(ResourceProfile resourceProfile) {
- allocatedContainers.add(resourceProfile);
- }
-
- List<ResourceProfile> getAllocatedContainers() {
- return allocatedContainers;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
new file mode 100644
index 0000000..80fa19c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+import akka.dispatch.Futures;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the generic retrying registration class, validating the failure, retry, and back-off behavior.
+ */
+public class RetryingRegistrationTest extends TestLogger {
+
+ @Test
+ public void testSimpleSuccessfulRegistration() throws Exception {
+ final String testId = "laissez les bon temps roulez";
+ final String testEndpointAddress = "<test-address>";
+ final UUID leaderId = UUID.randomUUID();
+
+ // an endpoint that immediately returns success
+ TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId));
+ TestingRpcService rpc = new TestingRpcService();
+
+ try {
+ rpc.registerGateway(testEndpointAddress, testGateway);
+
+ TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+ registration.startRegistration();
+
+ Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
+ assertNotNull(future);
+
+ // multiple accesses return the same future
+ assertEquals(future, registration.getFuture());
+
+ Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+ Await.result(future, new FiniteDuration(10, SECONDS));
+
+ // validate correct invocation and result
+ assertEquals(testId, success.f1.getCorrelationId());
+ assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
+ }
+ finally {
+ testGateway.stop();
+ rpc.stopService();
+ }
+ }
+
+ @Test
+ public void testPropagateFailures() throws Exception {
+ final String testExceptionMessage = "testExceptionMessage";
+
+ // RPC service that fails with exception upon the connection
+ RpcService rpc = mock(RpcService.class);
+ when(rpc.connect(anyString(), any(Class.class))).thenThrow(new RuntimeException(testExceptionMessage));
+
+ TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "testaddress", UUID.randomUUID());
+ registration.startRegistration();
+
+ Future<?> future = registration.getFuture();
+ assertTrue(future.failed().isCompleted());
+
+ assertEquals(testExceptionMessage, future.failed().value().get().get().getMessage());
+ }
+
+ @Test
+ public void testRetryConnectOnFailure() throws Exception {
+ final String testId = "laissez les bon temps roulez";
+ final UUID leaderId = UUID.randomUUID();
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId));
+
+ try {
+ // RPC service that fails upon the first connection, but succeeds on the second
+ RpcService rpc = mock(RpcService.class);
+ when(rpc.connect(anyString(), any(Class.class))).thenReturn(
+ Futures.failed(new Exception("test connect failure")), // first connection attempt fails
+ Futures.successful(testGateway) // second connection attempt succeeds
+ );
+ when(rpc.getExecutionContext()).thenReturn(ExecutionContext$.MODULE$.fromExecutor(executor));
+
+ TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId);
+ registration.startRegistration();
+
+ Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+ Await.result(registration.getFuture(), new FiniteDuration(10, SECONDS));
+
+ // validate correct invocation and result
+ assertEquals(testId, success.f1.getCorrelationId());
+ assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
+ }
+ finally {
+ testGateway.stop();
+ executor.shutdown();
+ }
+ }
+
+ @Test
+ public void testRetriesOnTimeouts() throws Exception {
+ final String testId = "rien ne va plus";
+ final String testEndpointAddress = "<test-address>";
+ final UUID leaderId = UUID.randomUUID();
+
+ // an endpoint that immediately returns futures with timeouts before returning a successful future
+ TestRegistrationGateway testGateway = new TestRegistrationGateway(
+ null, // timeout
+ null, // timeout
+ new TestRegistrationSuccess(testId) // success
+ );
+
+ TestingRpcService rpc = new TestingRpcService();
+
+ try {
+ rpc.registerGateway(testEndpointAddress, testGateway);
+
+ TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+
+ long started = System.nanoTime();
+ registration.startRegistration();
+
+ Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
+ Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+ Await.result(future, new FiniteDuration(10, SECONDS));
+
+ long finished = System.nanoTime();
+ long elapsedMillis = (finished - started) / 1000000;
+
+ // validate correct invocation and result
+ assertEquals(testId, success.f1.getCorrelationId());
+ assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
+
+ // validate that some retry-delay / back-off behavior happened
+ assertTrue("retries did not properly back off", elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT);
+ }
+ finally {
+ rpc.stopService();
+ testGateway.stop();
+ }
+ }
+
+ @Test
+ public void testDecline() throws Exception {
+ final String testId = "qui a coupe le fromage";
+ final String testEndpointAddress = "<test-address>";
+ final UUID leaderId = UUID.randomUUID();
+
+ TestingRpcService rpc = new TestingRpcService();
+
+ TestRegistrationGateway testGateway = new TestRegistrationGateway(
+ null, // timeout
+ new RegistrationResponse.Decline("no reason "),
+ null, // timeout
+ new TestRegistrationSuccess(testId) // success
+ );
+
+ try {
+ rpc.registerGateway(testEndpointAddress, testGateway);
+
+ TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+
+ long started = System.nanoTime();
+ registration.startRegistration();
+
+ Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
+ Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+ Await.result(future, new FiniteDuration(10, SECONDS));
+
+ long finished = System.nanoTime();
+ long elapsedMillis = (finished - started) / 1000000;
+
+ // validate correct invocation and result
+ assertEquals(testId, success.f1.getCorrelationId());
+ assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
+
+ // validate that some retry-delay / back-off behavior happened
+ assertTrue("retries did not properly back off", elapsedMillis >=
+ 2 * TestRetryingRegistration.INITIAL_TIMEOUT + TestRetryingRegistration.DELAY_ON_DECLINE);
+ }
+ finally {
+ testGateway.stop();
+ rpc.stopService();
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testRetryOnError() throws Exception {
+ final String testId = "Petit a petit, l'oiseau fait son nid";
+ final String testEndpointAddress = "<test-address>";
+ final UUID leaderId = UUID.randomUUID();
+
+ TestingRpcService rpc = new TestingRpcService();
+
+ try {
+ // gateway that upon calls first responds with a failure, then with a success
+ TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
+
+ when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(
+ Futures.<RegistrationResponse>failed(new Exception("test exception")),
+ Futures.<RegistrationResponse>successful(new TestRegistrationSuccess(testId)));
+
+ rpc.registerGateway(testEndpointAddress, testGateway);
+
+ TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+
+ long started = System.nanoTime();
+ registration.startRegistration();
+
+ Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
+ Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+ Await.result(future, new FiniteDuration(10, SECONDS));
+
+ long finished = System.nanoTime();
+ long elapsedMillis = (finished - started) / 1000000;
+
+ assertEquals(testId, success.f1.getCorrelationId());
+
+ // validate that some retry-delay / back-off behavior happened
+ assertTrue("retries did not properly back off",
+ elapsedMillis >= TestRetryingRegistration.DELAY_ON_ERROR);
+ }
+ finally {
+ rpc.stopService();
+ }
+ }
+
+ @Test
+ public void testCancellation() throws Exception {
+ final String testEndpointAddress = "my-test-address";
+ final UUID leaderId = UUID.randomUUID();
+
+ TestingRpcService rpc = new TestingRpcService();
+
+ try {
+ Promise<RegistrationResponse> result = Futures.promise();
+
+ TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
+ when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result.future());
+
+ rpc.registerGateway(testEndpointAddress, testGateway);
+
+ TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+ registration.startRegistration();
+
+ // cancel and fail the current registration attempt
+ registration.cancel();
+ result.failure(new TimeoutException());
+
+ // there should not be a second registration attempt
+ verify(testGateway, atMost(1)).registrationCall(any(UUID.class), anyLong());
+ }
+ finally {
+ rpc.stopService();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // test registration
+ // ------------------------------------------------------------------------
+
+ private static class TestRegistrationSuccess extends RegistrationResponse.Success {
+ private static final long serialVersionUID = 5542698790917150604L;
+
+ private final String correlationId;
+
+ private TestRegistrationSuccess(String correlationId) {
+ this.correlationId = correlationId;
+ }
+
+ public String getCorrelationId() {
+ return correlationId;
+ }
+ }
+
+ private static class TestRetryingRegistration extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
+
+ // we use shorter timeouts here to speed up the tests
+ static final long INITIAL_TIMEOUT = 20;
+ static final long MAX_TIMEOUT = 200;
+ static final long DELAY_ON_ERROR = 200;
+ static final long DELAY_ON_DECLINE = 200;
+
+ public TestRetryingRegistration(RpcService rpc, String targetAddress, UUID leaderId) {
+ super(LoggerFactory.getLogger(RetryingRegistrationTest.class),
+ rpc, "TestEndpoint",
+ TestRegistrationGateway.class,
+ targetAddress, leaderId,
+ INITIAL_TIMEOUT, MAX_TIMEOUT, DELAY_ON_ERROR, DELAY_ON_DECLINE);
+ }
+
+ @Override
+ protected Future<RegistrationResponse> invokeRegistration(
+ TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) {
+ return gateway.registrationCall(leaderId, timeoutMillis);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
new file mode 100644
index 0000000..431fbe8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+import akka.dispatch.Futures;
+
+import org.apache.flink.runtime.rpc.TestingGatewayBase;
+import org.apache.flink.util.Preconditions;
+
+import scala.concurrent.Future;
+
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestRegistrationGateway extends TestingGatewayBase {
+
+ private final BlockingQueue<RegistrationCall> invocations;
+
+ private final RegistrationResponse[] responses;
+
+ private int pos;
+
+ public TestRegistrationGateway(RegistrationResponse... responses) {
+ Preconditions.checkArgument(responses != null && responses.length > 0);
+
+ this.invocations = new LinkedBlockingQueue<>();
+ this.responses = responses;
+
+ }
+
+ // ------------------------------------------------------------------------
+
+ public Future<RegistrationResponse> registrationCall(UUID leaderId, long timeout) {
+ invocations.add(new RegistrationCall(leaderId, timeout));
+
+ RegistrationResponse response = responses[pos];
+ if (pos < responses.length - 1) {
+ pos++;
+ }
+
+ // return a completed future (for a proper value), or one that never completes and will time out (for null)
+ return response != null ? Futures.successful(response) : this.<RegistrationResponse>futureWithTimeout(timeout);
+ }
+
+ public BlockingQueue<RegistrationCall> getInvocations() {
+ return invocations;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static class RegistrationCall {
+ private final UUID leaderId;
+ private final long timeout;
+
+ public RegistrationCall(UUID leaderId, long timeout) {
+ this.leaderId = leaderId;
+ this.timeout = timeout;
+ }
+
+ public UUID leaderId() {
+ return leaderId;
+ }
+
+ public long timeout() {
+ return timeout;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
deleted file mode 100644
index 32c6cac..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.messages.StopCluster;
-import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.testingUtils.TestingMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.TestingResourceManager;
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import scala.Option;
-
-
-/**
- * Runs tests to ensure that a cluster is shutdown properly.
- */
-public class ClusterShutdownITCase extends TestLogger {
-
- private static ActorSystem system;
-
- private static Configuration config = new Configuration();
-
- @BeforeClass
- public static void setup() {
- system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- }
-
- /**
- * Tests a faked cluster shutdown procedure without the ResourceManager.
- */
- @Test
- public void testClusterShutdownWithoutResourceManager() {
-
- new JavaTestKit(system){{
- new Within(duration("30 seconds")) {
- @Override
- protected void run() {
-
- ActorGateway me =
- TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-
- // start job manager which doesn't shutdown the actor system
- ActorGateway jobManager =
- TestingUtils.createJobManager(system, config, "jobmanager1");
-
- // Tell the JobManager to inform us of shutdown actions
- jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
- // Register a TaskManager
- ActorGateway taskManager =
- TestingUtils.createTaskManager(system, jobManager, config, true, true);
-
- // Tell the TaskManager to inform us of TaskManager shutdowns
- taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-
- // No resource manager connected
- jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
-
- expectMsgAllOf(
- new TestingMessages.ComponentShutdown(taskManager.actor()),
- new TestingMessages.ComponentShutdown(jobManager.actor()),
- StopClusterSuccessful.getInstance()
- );
-
- }};
- }};
- }
-
- /**
- * Tests a faked cluster shutdown procedure with the ResourceManager.
- */
- @Test
- public void testClusterShutdownWithResourceManager() {
-
- new JavaTestKit(system){{
- new Within(duration("30 seconds")) {
- @Override
- protected void run() {
-
- ActorGateway me =
- TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-
- // start job manager which doesn't shutdown the actor system
- ActorGateway jobManager =
- TestingUtils.createJobManager(system, config, "jobmanager2");
-
- // Tell the JobManager to inform us of shutdown actions
- jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
- // Register a TaskManager
- ActorGateway taskManager =
- TestingUtils.createTaskManager(system, jobManager, config, true, true);
-
- // Tell the TaskManager to inform us of TaskManager shutdowns
- taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
- // Start resource manager and let it register
- ActorGateway resourceManager =
- TestingUtils.createResourceManager(system, jobManager.actor(), config);
-
- // Tell the ResourceManager to inform us of ResourceManager shutdowns
- resourceManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
- // notify about a resource manager registration at the job manager
- resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
-
- // Wait for resource manager
- expectMsgEquals(Messages.getAcknowledge());
-
-
- // Shutdown cluster with resource manager connected
- jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
-
- expectMsgAllOf(
- new TestingMessages.ComponentShutdown(taskManager.actor()),
- new TestingMessages.ComponentShutdown(jobManager.actor()),
- new TestingMessages.ComponentShutdown(resourceManager.actor()),
- StopClusterSuccessful.getInstance()
- );
-
- }};
- }};
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
new file mode 100644
index 0000000..5799e62
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * resourceManager HA test, including grant leadership and revoke leadership
+ */
+public class ResourceManagerHATest {
+
+ @Test
+ public void testGrantAndRevokeLeadership() throws Exception {
+ // mock a RpcService which will return a special RpcGateway when call its startServer method, the returned RpcGateway directly execute runAsync call
+ TestingResourceManagerGatewayProxy gateway = mock(TestingResourceManagerGatewayProxy.class);
+ doCallRealMethod().when(gateway).runAsync(any(Runnable.class));
+
+ RpcService rpcService = mock(RpcService.class);
+ when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway);
+
+ TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+ TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+ highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
+
+ final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices);
+ resourceManager.start();
+ // before grant leadership, resourceManager's leaderId is null
+ Assert.assertNull(resourceManager.getLeaderSessionID());
+ final UUID leaderId = UUID.randomUUID();
+ leaderElectionService.isLeader(leaderId);
+ // after grant leadership, resourceManager's leaderId has value
+ Assert.assertEquals(leaderId, resourceManager.getLeaderSessionID());
+ // then revoke leadership, resourceManager's leaderId is null again
+ leaderElectionService.notLeader();
+ Assert.assertNull(resourceManager.getLeaderSessionID());
+ }
+
+ private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutor, StartStoppable, RpcGateway {
+ @Override
+ public void runAsync(Runnable runnable) {
+ runnable.run();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
deleted file mode 100644
index 0c2ca1a..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.messages.RegistrationMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.TestingResourceManager;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import scala.Option;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * It cases which test the interaction of the resource manager with job manager and task managers.
- * Runs all tests in one Actor system.
- */
-public class ResourceManagerITCase extends TestLogger {
-
- private static ActorSystem system;
-
- private static Configuration config = new Configuration();
-
- @BeforeClass
- public static void setup() {
- system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- }
-
- /**
- * Tests whether the resource manager connects and reconciles existing task managers.
- */
- @Test
- public void testResourceManagerReconciliation() {
-
- new JavaTestKit(system){{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
-
- ActorGateway jobManager =
- TestingUtils.createJobManager(system, config, "ReconciliationTest");
- ActorGateway me =
- TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-
- // !! no resource manager started !!
-
- ResourceID resourceID = ResourceID.generate();
-
- TaskManagerLocation location = mock(TaskManagerLocation.class);
- when(location.getResourceID()).thenReturn(resourceID);
-
- HardwareDescription resourceProfile = HardwareDescription.extractFromSystem(1_000_000);
-
- jobManager.tell(
- new RegistrationMessages.RegisterTaskManager(resourceID, location, resourceProfile, 1),
- me);
-
- expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class);
-
- // now start the resource manager
- ActorGateway resourceManager =
- TestingUtils.createResourceManager(system, jobManager.actor(), config);
-
- // register at testing job manager to receive a message once a resource manager registers
- resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
-
- // Wait for resource manager
- expectMsgEquals(Messages.getAcknowledge());
-
- // check if we registered the task manager resource
- resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), me);
-
- TestingResourceManager.GetRegisteredResourcesReply reply =
- expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
- assertEquals(1, reply.resources.size());
- assertTrue(reply.resources.contains(resourceID));
-
- }};
- }};
- }
-
- /**
- * Tests whether the resource manager gets informed upon TaskManager registration.
- */
- @Test
- public void testResourceManagerTaskManagerRegistration() {
-
- new JavaTestKit(system){{
- new Within(duration("30 seconds")) {
- @Override
- protected void run() {
-
- ActorGateway jobManager =
- TestingUtils.createJobManager(system, config, "RegTest");
- ActorGateway me =
- TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-
- // start the resource manager
- ActorGateway resourceManager =
- TestingUtils.createResourceManager(system, jobManager.actor(), config);
-
- // notify about a resource manager registration at the job manager
- resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
-
- // Wait for resource manager
- expectMsgEquals(Messages.getAcknowledge());
-
- // start task manager and wait for registration
- ActorGateway taskManager =
- TestingUtils.createTaskManager(system, jobManager.actor(), config, true, true);
-
- // check if we registered the task manager resource
- resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), me);
-
- TestingResourceManager.GetRegisteredResourcesReply reply =
- expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
- assertEquals(1, reply.resources.size());
-
- }};
- }};
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
deleted file mode 100644
index 043c81c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
-import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
-import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.TestingResourceManager;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import scala.Option;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-/**
- * General tests for the resource manager component.
- */
-public class ResourceManagerTest {
-
- private static ActorSystem system;
-
- private static ActorGateway fakeJobManager;
- private static ActorGateway resourceManager;
-
- private static Configuration config = new Configuration();
-
- @BeforeClass
- public static void setup() {
- system = AkkaUtils.createLocalActorSystem(config);
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- }
-
- /**
- * Tests the registration and reconciliation of the ResourceManager with the JobManager
- */
- @Test
- public void testJobManagerRegistrationAndReconciliation() {
- new JavaTestKit(system){{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
- resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
-
- expectMsgClass(RegisterResourceManager.class);
-
- List<ResourceID> resourceList = new ArrayList<>();
- resourceList.add(ResourceID.generate());
- resourceList.add(ResourceID.generate());
- resourceList.add(ResourceID.generate());
-
- resourceManager.tell(
- new RegisterResourceManagerSuccessful(fakeJobManager.actor(), resourceList),
- fakeJobManager);
-
- resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
- TestingResourceManager.GetRegisteredResourcesReply reply =
- expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
- for (ResourceID id : resourceList) {
- if (!reply.resources.contains(id)) {
- fail("Expected to find all resources that were provided during registration.");
- }
- }
- }};
- }};
- }
-
- /**
- * Tests delayed or erroneous registration of the ResourceManager with the JobManager
- */
- @Test
- public void testDelayedJobManagerRegistration() {
- new JavaTestKit(system){{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
-
- // set a short timeout for lookups
- Configuration shortTimeoutConfig = config.clone();
- shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "1 s");
-
- fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
- resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), shortTimeoutConfig);
-
- // wait for registration message
- RegisterResourceManager msg = expectMsgClass(RegisterResourceManager.class);
- // give wrong response
- getLastSender().tell(new JobManagerMessages.LeaderSessionMessage(null, new Object()),
- fakeJobManager.actor());
-
- // expect another retry and let it time out
- expectMsgClass(RegisterResourceManager.class);
-
- // wait for next try after timeout
- expectMsgClass(RegisterResourceManager.class);
-
- }};
- }};
- }
-
- @Test
- public void testTriggerReconnect() {
- new JavaTestKit(system){{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
-
- // set a long timeout for lookups such that the test fails in case of timeouts
- Configuration shortTimeoutConfig = config.clone();
- shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "99999 s");
-
- fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
- resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), shortTimeoutConfig);
-
- // wait for registration message
- RegisterResourceManager msg = expectMsgClass(RegisterResourceManager.class);
- // all went well
- resourceManager.tell(
- new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()),
- fakeJobManager);
-
- // force a reconnect
- resourceManager.tell(
- new TriggerRegistrationAtJobManager(fakeJobManager.actor()),
- fakeJobManager);
-
- // new registration attempt should come in
- expectMsgClass(RegisterResourceManager.class);
-
- }};
- }};
- }
-
- /**
- * Tests the registration and accounting of resources at the ResourceManager.
- */
- @Test
- public void testTaskManagerRegistration() {
- new JavaTestKit(system){{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
-
- fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
- resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
-
- // register with JM
- expectMsgClass(RegisterResourceManager.class);
- resourceManager.tell(
- new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()),
- fakeJobManager);
-
- ResourceID resourceID = ResourceID.generate();
-
- // Send task manager registration
- resourceManager.tell(new NotifyResourceStarted(resourceID),
- fakeJobManager);
-
- expectMsgClass(Acknowledge.class);
-
- // check for number registration of registered resources
- resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
- TestingResourceManager.GetRegisteredResourcesReply reply =
- expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
- assertEquals(1, reply.resources.size());
-
- // Send task manager registration again
- resourceManager.tell(new NotifyResourceStarted(resourceID),
- fakeJobManager);
-
- expectMsgClass(Acknowledge.class);
-
- // check for number registration of registered resources
- resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
- reply = expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
- assertEquals(1, reply.resources.size());
-
- // Send invalid null resource id to throw an exception during resource registration
- resourceManager.tell(new NotifyResourceStarted(null),
- fakeJobManager);
-
- expectMsgClass(Acknowledge.class);
-
- // check for number registration of registered resources
- resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
- reply = expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
- assertEquals(1, reply.resources.size());
- }};
- }};
- }
-
- @Test
- public void testResourceRemoval() {
- new JavaTestKit(system){{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
-
- fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
- resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
-
- // register with JM
- expectMsgClass(RegisterResourceManager.class);
- resourceManager.tell(
- new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()),
- fakeJobManager);
-
- ResourceID resourceID = ResourceID.generate();
-
- // remove unknown resource
- resourceManager.tell(new RemoveResource(resourceID), fakeJobManager);
-
- // Send task manager registration
- resourceManager.tell(new NotifyResourceStarted(resourceID),
- fakeJobManager);
-
- expectMsgClass(Acknowledge.class);
-
- // check for number registration of registered resources
- resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
- TestingResourceManager.GetRegisteredResourcesReply reply =
- expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
- assertEquals(1, reply.resources.size());
- assertTrue(reply.resources.contains(resourceID));
-
- // remove resource
- resourceManager.tell(new RemoveResource(resourceID), fakeJobManager);
-
- // check for number registration of registered resources
- resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
- reply = expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
- assertEquals(0, reply.resources.size());
-
- }};
- }};
- }
-
- /**
- * Tests notification of JobManager about a failed resource.
- */
- @Test
- public void testResourceFailureNotification() {
- new JavaTestKit(system){{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
-
- fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
- resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
-
- // register with JM
- expectMsgClass(RegisterResourceManager.class);
- resourceManager.tell(
- new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()),
- fakeJobManager);
-
- ResourceID resourceID1 = ResourceID.generate();
- ResourceID resourceID2 = ResourceID.generate();
-
- // Send task manager registration
- resourceManager.tell(new NotifyResourceStarted(resourceID1),
- fakeJobManager);
-
- expectMsgClass(Acknowledge.class);
-
- // Send task manager registration
- resourceManager.tell(new NotifyResourceStarted(resourceID2),
- fakeJobManager);
-
- expectMsgClass(Acknowledge.class);
-
- // check for number registration of registered resources
- resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
- TestingResourceManager.GetRegisteredResourcesReply reply =
- expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
- assertEquals(2, reply.resources.size());
- assertTrue(reply.resources.contains(resourceID1));
- assertTrue(reply.resources.contains(resourceID2));
-
- // fail resources
- resourceManager.tell(new TestingResourceManager.FailResource(resourceID1), fakeJobManager);
- resourceManager.tell(new TestingResourceManager.FailResource(resourceID2), fakeJobManager);
-
- ResourceRemoved answer = expectMsgClass(ResourceRemoved.class);
- ResourceRemoved answer2 = expectMsgClass(ResourceRemoved.class);
-
- assertEquals(resourceID1, answer.resourceId());
- assertEquals(resourceID2, answer2.resourceId());
-
- }};
- }};
- }
-}