You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:45:44 UTC
[02/50] [abbrv] flink git commit: [FLINK-4529] [flip-6] Move
TaskExecutor, JobMaster and ResourceManager out of the rpc package
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
new file mode 100644
index 0000000..52d9d06
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class SlotManagerTest {
+
+ private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
+
+ private static final long DEFAULT_TESTING_MEMORY = 512;
+
+ private static final ResourceProfile DEFAULT_TESTING_PROFILE =
+ new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY);
+
+ private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
+ new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2);
+
+ private ResourceManagerGateway resourceManagerGateway;
+
+ @Before
+ public void setUp() {
+ resourceManagerGateway = mock(ResourceManagerGateway.class);
+ }
+
+ /**
+ * Tests that there are no free slots when we request, need to allocate from cluster manager master
+ */
+ @Test
+ public void testRequestSlotWithoutFreeSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertEquals(1, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0));
+ }
+
+ /**
+ * Tests that there are some free slots when we request, and the request is fulfilled immediately
+ */
+ @Test
+ public void testRequestSlotWithFreeSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
+ assertEquals(1, slotManager.getFreeSlotCount());
+
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertEquals(0, slotManager.getAllocatedContainers().size());
+ }
+
+ /**
+ * Tests that there are some free slots when we request, but none of them are suitable
+ */
+ @Test
+ public void testRequestSlotWithoutSuitableSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 2);
+ assertEquals(2, slotManager.getFreeSlotCount());
+
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(2, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertEquals(1, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+ }
+
+ /**
+ * Tests that we send duplicated slot request
+ */
+ @Test
+ public void testDuplicatedSlotRequest() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
+
+ SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
+
+ slotManager.requestSlot(request1);
+ slotManager.requestSlot(request2);
+ slotManager.requestSlot(request2);
+ slotManager.requestSlot(request1);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertEquals(1, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+ }
+
+ /**
+ * Tests that we send multiple slot requests
+ */
+ @Test
+ public void testRequestMultipleSlots() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 5);
+
+ // request 3 normal slots
+ for (int i = 0; i < 3; ++i) {
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ }
+
+ // request 2 big slots
+ for (int i = 0; i < 2; ++i) {
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+ }
+
+ // request 1 normal slot again
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+ assertEquals(4, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(2, slotManager.getPendingRequestCount());
+ assertEquals(2, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(1));
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, and we used it to fulfill a pending request
+ */
+ @Test
+ public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ assertEquals(1, slotManager.getPendingRequestCount());
+
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, but we have no pending request
+ */
+ @Test
+ public void testNewlyAppearedFreeSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, but it't not suitable for all the pending requests
+ */
+ @Test
+ public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+ assertEquals(1, slotManager.getPendingRequestCount());
+
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertFalse(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, and it's been reported using by some job
+ */
+ @Test
+ public void testNewlyAppearedInUseSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that we had a slot in-use, and it's confirmed by SlotReport
+ */
+ @Test
+ public void testExistingInUseSlotUpdateStatus() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ // make this slot in use
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertTrue(slotManager.isAllocated(slotId));
+
+ // slot status is confirmed
+ SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE,
+ request.getAllocationId(), request.getJobId());
+ slotManager.updateSlotStatus(slotStatus2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that we had a slot in-use, but it's empty according to the SlotReport
+ */
+ @Test
+ public void testExistingInUseSlotAdjustedToEmpty() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request1);
+
+ // make this slot in use
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ // another request pending
+ SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertTrue(slotManager.isAllocated(request1.getAllocationId()));
+
+
+ // but slot is reported empty again, request2 will be fulfilled, request1 will be missing
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+ }
+
+ /**
+ * Tests that we had a slot in use, and it's also reported in use by TaskManager, but the allocation
+ * information didn't match.
+ */
+ @Test
+ public void testExistingInUseSlotWithDifferentAllocationInfo() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ // make this slot in use
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertTrue(slotManager.isAllocated(request.getAllocationId()));
+
+ SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
+ // update slot status with different allocation info
+ slotManager.updateSlotStatus(slotStatus2);
+
+ // original request is missing and won't be allocated
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertFalse(slotManager.isAllocated(request.getAllocationId()));
+ assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
+ }
+
+ /**
+ * Tests that we had a free slot, and it's confirmed by SlotReport
+ */
+ @Test
+ public void testExistingEmptySlotUpdateStatus() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+ slotManager.addFreeSlot(slot);
+
+ SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ }
+
+ /**
+ * Tests that we had a free slot, and it's reported in-use by TaskManager
+ */
+ @Test
+ public void testExistingEmptySlotAdjustedToInUse() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+ slotManager.addFreeSlot(slot);
+
+ SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE,
+ new AllocationID(), new JobID());
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slot.getSlotId()));
+ }
+
+ /**
+ * Tests that we did some allocation but failed / rejected by TaskManager, request will retry
+ */
+ @Test
+ public void testSlotAllocationFailedAtTaskManager() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+ slotManager.addFreeSlot(slot);
+
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slot.getSlotId()));
+
+ slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ }
+
+
+ /**
+ * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request
+ */
+ @Test
+ public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+ slotManager.addFreeSlot(slot);
+
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ // slot is set empty by heartbeat
+ SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), slot.getResourceProfile());
+ slotManager.updateSlotStatus(slotStatus);
+
+ // another request took this slot
+ SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertFalse(slotManager.isAllocated(request.getAllocationId()));
+ assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+
+ // original request should be pended
+ slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertFalse(slotManager.isAllocated(request.getAllocationId()));
+ assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+ }
+
+ @Test
+ public void testNotifyTaskManagerFailure() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+ ResourceID resource1 = ResourceID.generate();
+ ResourceID resource2 = ResourceID.generate();
+
+ ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE);
+ ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE);
+ ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE);
+ ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE);
+
+ slotManager.addFreeSlot(slot11);
+ slotManager.addFreeSlot(slot21);
+
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+ assertEquals(2, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+
+ slotManager.addFreeSlot(slot12);
+ slotManager.addFreeSlot(slot22);
+
+ assertEquals(2, slotManager.getAllocatedSlotCount());
+ assertEquals(2, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+
+ slotManager.notifyTaskManagerFailure(resource2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+
+ // notify an not exist resource failure
+ slotManager.notifyTaskManagerFailure(ResourceID.generate());
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ }
+
+ // ------------------------------------------------------------------------
+ // testing utilities
+ // ------------------------------------------------------------------------
+
+ private void directlyProvideFreeSlots(
+ final SlotManager slotManager,
+ final ResourceProfile resourceProfile,
+ final int freeSlotNum)
+ {
+ for (int i = 0; i < freeSlotNum; ++i) {
+ slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile)));
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // testing classes
+ // ------------------------------------------------------------------------
+
+ private static class TestingSlotManager extends SlotManager {
+
+ private final List<ResourceProfile> allocatedContainers;
+
+ TestingSlotManager(ResourceManagerGateway resourceManagerGateway) {
+ super(resourceManagerGateway);
+ this.allocatedContainers = new LinkedList<>();
+ }
+
+ /**
+ * Choose slot randomly if it matches requirement
+ *
+ * @param request The slot request
+ * @param freeSlots All slots which can be used
+ * @return The chosen slot or null if cannot find a match
+ */
+ @Override
+ protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
+ for (ResourceSlot slot : freeSlots.values()) {
+ if (slot.isMatchingRequirement(request.getResourceProfile())) {
+ return slot;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Choose request randomly if offered slot can match its requirement
+ *
+ * @param offeredSlot The free slot
+ * @param pendingRequests All the pending slot requests
+ * @return The chosen request's AllocationID or null if cannot find a match
+ */
+ @Override
+ protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot,
+ Map<AllocationID, SlotRequest> pendingRequests)
+ {
+ for (Map.Entry<AllocationID, SlotRequest> pendingRequest : pendingRequests.entrySet()) {
+ if (offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile())) {
+ return pendingRequest.getValue();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ protected void allocateContainer(ResourceProfile resourceProfile) {
+ allocatedContainers.add(resourceProfile);
+ }
+
+ List<ResourceProfile> getAllocatedContainers() {
+ return allocatedContainers;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 2790cf8..f55069e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -21,28 +21,14 @@ package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorSystem;
import akka.util.Timeout;
import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.NonHaServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Test;
-import org.mockito.Mockito;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class AkkaRpcServiceTest extends TestLogger {
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
deleted file mode 100644
index 9508825..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.registration;
-
-import akka.dispatch.Futures;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import org.slf4j.LoggerFactory;
-
-import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-/**
- * Tests for the generic retrying registration class, validating the failure, retry, and back-off behavior.
- */
-public class RetryingRegistrationTest extends TestLogger {
-
- @Test
- public void testSimpleSuccessfulRegistration() throws Exception {
- final String testId = "laissez les bon temps roulez";
- final String testEndpointAddress = "<test-address>";
- final UUID leaderId = UUID.randomUUID();
-
- // an endpoint that immediately returns success
- TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId));
- TestingRpcService rpc = new TestingRpcService();
-
- try {
- rpc.registerGateway(testEndpointAddress, testGateway);
-
- TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
- registration.startRegistration();
-
- Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
- assertNotNull(future);
-
- // multiple accesses return the same future
- assertEquals(future, registration.getFuture());
-
- Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
- Await.result(future, new FiniteDuration(10, SECONDS));
-
- // validate correct invocation and result
- assertEquals(testId, success.f1.getCorrelationId());
- assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
- }
- finally {
- testGateway.stop();
- rpc.stopService();
- }
- }
-
- @Test
- public void testPropagateFailures() throws Exception {
- final String testExceptionMessage = "testExceptionMessage";
-
- // RPC service that fails with exception upon the connection
- RpcService rpc = mock(RpcService.class);
- when(rpc.connect(anyString(), any(Class.class))).thenThrow(new RuntimeException(testExceptionMessage));
-
- TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "testaddress", UUID.randomUUID());
- registration.startRegistration();
-
- Future<?> future = registration.getFuture();
- assertTrue(future.failed().isCompleted());
-
- assertEquals(testExceptionMessage, future.failed().value().get().get().getMessage());
- }
-
- @Test
- public void testRetryConnectOnFailure() throws Exception {
- final String testId = "laissez les bon temps roulez";
- final UUID leaderId = UUID.randomUUID();
-
- ExecutorService executor = Executors.newCachedThreadPool();
- TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId));
-
- try {
- // RPC service that fails upon the first connection, but succeeds on the second
- RpcService rpc = mock(RpcService.class);
- when(rpc.connect(anyString(), any(Class.class))).thenReturn(
- Futures.failed(new Exception("test connect failure")), // first connection attempt fails
- Futures.successful(testGateway) // second connection attempt succeeds
- );
- when(rpc.getExecutionContext()).thenReturn(ExecutionContext$.MODULE$.fromExecutor(executor));
-
- TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId);
- registration.startRegistration();
-
- Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
- Await.result(registration.getFuture(), new FiniteDuration(10, SECONDS));
-
- // validate correct invocation and result
- assertEquals(testId, success.f1.getCorrelationId());
- assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
- }
- finally {
- testGateway.stop();
- executor.shutdown();
- }
- }
-
- @Test
- public void testRetriesOnTimeouts() throws Exception {
- final String testId = "rien ne va plus";
- final String testEndpointAddress = "<test-address>";
- final UUID leaderId = UUID.randomUUID();
-
- // an endpoint that immediately returns futures with timeouts before returning a successful future
- TestRegistrationGateway testGateway = new TestRegistrationGateway(
- null, // timeout
- null, // timeout
- new TestRegistrationSuccess(testId) // success
- );
-
- TestingRpcService rpc = new TestingRpcService();
-
- try {
- rpc.registerGateway(testEndpointAddress, testGateway);
-
- TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-
- long started = System.nanoTime();
- registration.startRegistration();
-
- Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
- Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
- Await.result(future, new FiniteDuration(10, SECONDS));
-
- long finished = System.nanoTime();
- long elapsedMillis = (finished - started) / 1000000;
-
- // validate correct invocation and result
- assertEquals(testId, success.f1.getCorrelationId());
- assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
-
- // validate that some retry-delay / back-off behavior happened
- assertTrue("retries did not properly back off", elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT);
- }
- finally {
- rpc.stopService();
- testGateway.stop();
- }
- }
-
- @Test
- public void testDecline() throws Exception {
- final String testId = "qui a coupe le fromage";
- final String testEndpointAddress = "<test-address>";
- final UUID leaderId = UUID.randomUUID();
-
- TestingRpcService rpc = new TestingRpcService();
-
- TestRegistrationGateway testGateway = new TestRegistrationGateway(
- null, // timeout
- new RegistrationResponse.Decline("no reason "),
- null, // timeout
- new TestRegistrationSuccess(testId) // success
- );
-
- try {
- rpc.registerGateway(testEndpointAddress, testGateway);
-
- TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-
- long started = System.nanoTime();
- registration.startRegistration();
-
- Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
- Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
- Await.result(future, new FiniteDuration(10, SECONDS));
-
- long finished = System.nanoTime();
- long elapsedMillis = (finished - started) / 1000000;
-
- // validate correct invocation and result
- assertEquals(testId, success.f1.getCorrelationId());
- assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
-
- // validate that some retry-delay / back-off behavior happened
- assertTrue("retries did not properly back off", elapsedMillis >=
- 2 * TestRetryingRegistration.INITIAL_TIMEOUT + TestRetryingRegistration.DELAY_ON_DECLINE);
- }
- finally {
- testGateway.stop();
- rpc.stopService();
- }
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testRetryOnError() throws Exception {
- final String testId = "Petit a petit, l'oiseau fait son nid";
- final String testEndpointAddress = "<test-address>";
- final UUID leaderId = UUID.randomUUID();
-
- TestingRpcService rpc = new TestingRpcService();
-
- try {
- // gateway that upon calls first responds with a failure, then with a success
- TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
-
- when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(
- Futures.<RegistrationResponse>failed(new Exception("test exception")),
- Futures.<RegistrationResponse>successful(new TestRegistrationSuccess(testId)));
-
- rpc.registerGateway(testEndpointAddress, testGateway);
-
- TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-
- long started = System.nanoTime();
- registration.startRegistration();
-
- Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
- Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
- Await.result(future, new FiniteDuration(10, SECONDS));
-
- long finished = System.nanoTime();
- long elapsedMillis = (finished - started) / 1000000;
-
- assertEquals(testId, success.f1.getCorrelationId());
-
- // validate that some retry-delay / back-off behavior happened
- assertTrue("retries did not properly back off",
- elapsedMillis >= TestRetryingRegistration.DELAY_ON_ERROR);
- }
- finally {
- rpc.stopService();
- }
- }
-
- @Test
- public void testCancellation() throws Exception {
- final String testEndpointAddress = "my-test-address";
- final UUID leaderId = UUID.randomUUID();
-
- TestingRpcService rpc = new TestingRpcService();
-
- try {
- Promise<RegistrationResponse> result = Futures.promise();
-
- TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
- when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result.future());
-
- rpc.registerGateway(testEndpointAddress, testGateway);
-
- TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
- registration.startRegistration();
-
- // cancel and fail the current registration attempt
- registration.cancel();
- result.failure(new TimeoutException());
-
- // there should not be a second registration attempt
- verify(testGateway, atMost(1)).registrationCall(any(UUID.class), anyLong());
- }
- finally {
- rpc.stopService();
- }
- }
-
- // ------------------------------------------------------------------------
- // test registration
- // ------------------------------------------------------------------------
-
- private static class TestRegistrationSuccess extends RegistrationResponse.Success {
- private static final long serialVersionUID = 5542698790917150604L;
-
- private final String correlationId;
-
- private TestRegistrationSuccess(String correlationId) {
- this.correlationId = correlationId;
- }
-
- public String getCorrelationId() {
- return correlationId;
- }
- }
-
- private static class TestRetryingRegistration extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
-
- // we use shorter timeouts here to speed up the tests
- static final long INITIAL_TIMEOUT = 20;
- static final long MAX_TIMEOUT = 200;
- static final long DELAY_ON_ERROR = 200;
- static final long DELAY_ON_DECLINE = 200;
-
- public TestRetryingRegistration(RpcService rpc, String targetAddress, UUID leaderId) {
- super(LoggerFactory.getLogger(RetryingRegistrationTest.class),
- rpc, "TestEndpoint",
- TestRegistrationGateway.class,
- targetAddress, leaderId,
- INITIAL_TIMEOUT, MAX_TIMEOUT, DELAY_ON_ERROR, DELAY_ON_DECLINE);
- }
-
- @Override
- protected Future<RegistrationResponse> invokeRegistration(
- TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) {
- return gateway.registrationCall(leaderId, timeoutMillis);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
deleted file mode 100644
index a049e48..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.registration;
-
-import akka.dispatch.Futures;
-
-import org.apache.flink.runtime.rpc.TestingGatewayBase;
-import org.apache.flink.util.Preconditions;
-
-import scala.concurrent.Future;
-
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class TestRegistrationGateway extends TestingGatewayBase {
-
- private final BlockingQueue<RegistrationCall> invocations;
-
- private final RegistrationResponse[] responses;
-
- private int pos;
-
- public TestRegistrationGateway(RegistrationResponse... responses) {
- Preconditions.checkArgument(responses != null && responses.length > 0);
-
- this.invocations = new LinkedBlockingQueue<>();
- this.responses = responses;
-
- }
-
- // ------------------------------------------------------------------------
-
- public Future<RegistrationResponse> registrationCall(UUID leaderId, long timeout) {
- invocations.add(new RegistrationCall(leaderId, timeout));
-
- RegistrationResponse response = responses[pos];
- if (pos < responses.length - 1) {
- pos++;
- }
-
- // return a completed future (for a proper value), or one that never completes and will time out (for null)
- return response != null ? Futures.successful(response) : this.<RegistrationResponse>futureWithTimeout(timeout);
- }
-
- public BlockingQueue<RegistrationCall> getInvocations() {
- return invocations;
- }
-
- // ------------------------------------------------------------------------
-
- public static class RegistrationCall {
- private final UUID leaderId;
- private final long timeout;
-
- public RegistrationCall(UUID leaderId, long timeout) {
- this.leaderId = leaderId;
- this.timeout = timeout;
- }
-
- public UUID leaderId() {
- return leaderId;
- }
-
- public long timeout() {
- return timeout;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
deleted file mode 100644
index dfffeda..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.StartStoppable;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.UUID;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * resourceManager HA test, including grant leadership and revoke leadership
- */
-public class ResourceManagerHATest {
-
- @Test
- public void testGrantAndRevokeLeadership() throws Exception {
- // mock a RpcService which will return a special RpcGateway when call its startServer method, the returned RpcGateway directly execute runAsync call
- TestingResourceManagerGatewayProxy gateway = mock(TestingResourceManagerGatewayProxy.class);
- doCallRealMethod().when(gateway).runAsync(any(Runnable.class));
-
- RpcService rpcService = mock(RpcService.class);
- when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway);
-
- TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
- TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
- highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
-
- final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices);
- resourceManager.start();
- // before grant leadership, resourceManager's leaderId is null
- Assert.assertNull(resourceManager.getLeaderSessionID());
- final UUID leaderId = UUID.randomUUID();
- leaderElectionService.isLeader(leaderId);
- // after grant leadership, resourceManager's leaderId has value
- Assert.assertEquals(leaderId, resourceManager.getLeaderSessionID());
- // then revoke leadership, resourceManager's leaderId is null again
- leaderElectionService.notLeader();
- Assert.assertNull(resourceManager.getLeaderSessionID());
- }
-
- private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutor, StartStoppable, RpcGateway {
- @Override
- public void runAsync(Runnable runnable) {
- runnable.run();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
deleted file mode 100644
index 25a670c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.taskexecutor;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.NonHaServices;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-public class TaskExecutorTest extends TestLogger {
-
- @Test
- public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
- final ResourceID resourceID = ResourceID.generate();
- final String resourceManagerAddress = "/resource/manager/address/one";
-
- final TestingRpcService rpc = new TestingRpcService();
- try {
- // register a mock resource manager gateway
- ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
- rpc.registerGateway(resourceManagerAddress, rmGateway);
-
- NonHaServices haServices = new NonHaServices(resourceManagerAddress);
- TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor(
- new Configuration(), resourceID, rpc, "localhost", haServices, true);
- String taskManagerAddress = taskManager.getAddress();
- taskManager.start();
-
- verify(rmGateway, timeout(5000)).registerTaskExecutor(
- any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
- }
- finally {
- rpc.stopService();
- }
- }
-
- @Test
- public void testTriggerRegistrationOnLeaderChange() throws Exception {
- final ResourceID resourceID = ResourceID.generate();
-
- final String address1 = "/resource/manager/address/one";
- final String address2 = "/resource/manager/address/two";
- final UUID leaderId1 = UUID.randomUUID();
- final UUID leaderId2 = UUID.randomUUID();
-
- final TestingRpcService rpc = new TestingRpcService();
- try {
- // register the mock resource manager gateways
- ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
- ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);
- rpc.registerGateway(address1, rmGateway1);
- rpc.registerGateway(address2, rmGateway2);
-
- TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService();
-
- TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
- haServices.setResourceManagerLeaderRetriever(testLeaderService);
-
- TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor(
- new Configuration(), resourceID, rpc, "localhost", haServices, true);
- String taskManagerAddress = taskManager.getAddress();
- taskManager.start();
-
- // no connection initially, since there is no leader
- assertNull(taskManager.getResourceManagerConnection());
-
- // define a leader and see that a registration happens
- testLeaderService.notifyListener(address1, leaderId1);
-
- verify(rmGateway1, timeout(5000)).registerTaskExecutor(
- eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
- assertNotNull(taskManager.getResourceManagerConnection());
-
- // cancel the leader
- testLeaderService.notifyListener(null, null);
-
- // set a new leader, see that a registration happens
- testLeaderService.notifyListener(address2, leaderId2);
-
- verify(rmGateway2, timeout(5000)).registerTaskExecutor(
- eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
- assertNotNull(taskManager.getResourceManagerConnection());
- }
- finally {
- rpc.stopService();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
new file mode 100644
index 0000000..a8d5bd7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class TaskExecutorTest extends TestLogger {
+
+ @Test
+ public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
+ final ResourceID resourceID = ResourceID.generate();
+ final String resourceManagerAddress = "/resource/manager/address/one";
+
+ final TestingRpcService rpc = new TestingRpcService();
+ try {
+ // register a mock resource manager gateway
+ ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
+ rpc.registerGateway(resourceManagerAddress, rmGateway);
+
+ NonHaServices haServices = new NonHaServices(resourceManagerAddress);
+ TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor(
+ new Configuration(), resourceID, rpc, "localhost", haServices, true);
+ String taskManagerAddress = taskManager.getAddress();
+ taskManager.start();
+
+ verify(rmGateway, timeout(5000)).registerTaskExecutor(
+ any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+ }
+ finally {
+ rpc.stopService();
+ }
+ }
+
+ @Test
+ public void testTriggerRegistrationOnLeaderChange() throws Exception {
+ final ResourceID resourceID = ResourceID.generate();
+
+ final String address1 = "/resource/manager/address/one";
+ final String address2 = "/resource/manager/address/two";
+ final UUID leaderId1 = UUID.randomUUID();
+ final UUID leaderId2 = UUID.randomUUID();
+
+ final TestingRpcService rpc = new TestingRpcService();
+ try {
+ // register the mock resource manager gateways
+ ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
+ ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);
+ rpc.registerGateway(address1, rmGateway1);
+ rpc.registerGateway(address2, rmGateway2);
+
+ TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService();
+
+ TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+ haServices.setResourceManagerLeaderRetriever(testLeaderService);
+
+ TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor(
+ new Configuration(), resourceID, rpc, "localhost", haServices, true);
+ String taskManagerAddress = taskManager.getAddress();
+ taskManager.start();
+
+ // no connection initially, since there is no leader
+ assertNull(taskManager.getResourceManagerConnection());
+
+ // define a leader and see that a registration happens
+ testLeaderService.notifyListener(address1, leaderId1);
+
+ verify(rmGateway1, timeout(5000)).registerTaskExecutor(
+ eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+ assertNotNull(taskManager.getResourceManagerConnection());
+
+ // cancel the leader
+ testLeaderService.notifyListener(null, null);
+
+ // set a new leader, see that a registration happens
+ testLeaderService.notifyListener(address2, leaderId2);
+
+ verify(rmGateway2, timeout(5000)).registerTaskExecutor(
+ eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+ assertNotNull(taskManager.getResourceManagerConnection());
+ }
+ finally {
+ rpc.stopService();
+ }
+ }
+}