You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:45:45 UTC

[03/50] [abbrv] flink git commit: [FLINK-4529] [flip-6] Move TaskExecutor, JobMaster and ResourceManager out of the rpc package

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
deleted file mode 100644
index 2ee280f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
+++ /dev/null
@@ -1,540 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.rpc.taskexecutor.SlotStatus;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-public class SlotManagerTest {
-
-	private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
-
-	private static final long DEFAULT_TESTING_MEMORY = 512;
-
-	private static final ResourceProfile DEFAULT_TESTING_PROFILE =
-		new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY);
-
-	private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
-		new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2);
-
-	private ResourceManagerGateway resourceManagerGateway;
-
-	@Before
-	public void setUp() {
-		resourceManagerGateway = mock(ResourceManagerGateway.class);
-	}
-
-	/**
-	 * Tests that there are no free slots when we request, need to allocate from cluster manager master
-	 */
-	@Test
-	public void testRequestSlotWithoutFreeSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertEquals(1, slotManager.getAllocatedContainers().size());
-		assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0));
-	}
-
-	/**
-	 * Tests that there are some free slots when we request, and the request is fulfilled immediately
-	 */
-	@Test
-	public void testRequestSlotWithFreeSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
-		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
-		assertEquals(1, slotManager.getFreeSlotCount());
-
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertEquals(0, slotManager.getAllocatedContainers().size());
-	}
-
-	/**
-	 * Tests that there are some free slots when we request, but none of them are suitable
-	 */
-	@Test
-	public void testRequestSlotWithoutSuitableSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
-		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 2);
-		assertEquals(2, slotManager.getFreeSlotCount());
-
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(2, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertEquals(1, slotManager.getAllocatedContainers().size());
-		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
-	}
-
-	/**
-	 * Tests that we send duplicated slot request
-	 */
-	@Test
-	public void testDuplicatedSlotRequest() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
-
-		SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
-
-		slotManager.requestSlot(request1);
-		slotManager.requestSlot(request2);
-		slotManager.requestSlot(request2);
-		slotManager.requestSlot(request1);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertEquals(1, slotManager.getAllocatedContainers().size());
-		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
-	}
-
-	/**
-	 * Tests that we send multiple slot requests
-	 */
-	@Test
-	public void testRequestMultipleSlots() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 5);
-
-		// request 3 normal slots
-		for (int i = 0; i < 3; ++i) {
-			slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-		}
-
-		// request 2 big slots
-		for (int i = 0; i < 2; ++i) {
-			slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-		}
-
-		// request 1 normal slot again
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-
-		assertEquals(4, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(2, slotManager.getPendingRequestCount());
-		assertEquals(2, slotManager.getAllocatedContainers().size());
-		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
-		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(1));
-	}
-
-	/**
-	 * Tests that a new slot appeared in SlotReport, and we used it to fulfill a pending request
-	 */
-	@Test
-	public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-		assertEquals(1, slotManager.getPendingRequestCount());
-
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotId));
-	}
-
-	/**
-	 * Tests that a new slot appeared in SlotReport, but we have no pending request
-	 */
-	@Test
-	public void testNewlyAppearedFreeSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-	}
-
-	/**
-	 * Tests that a new slot appeared in SlotReport, but it't not suitable for all the pending requests
-	 */
-	@Test
-	public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-		assertEquals(1, slotManager.getPendingRequestCount());
-
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertFalse(slotManager.isAllocated(slotId));
-	}
-
-	/**
-	 * Tests that a new slot appeared in SlotReport, and it's been reported using by some job
-	 */
-	@Test
-	public void testNewlyAppearedInUseSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertTrue(slotManager.isAllocated(slotId));
-	}
-
-	/**
-	 * Tests that we had a slot in-use, and it's confirmed by SlotReport
-	 */
-	@Test
-	public void testExistingInUseSlotUpdateStatus() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request);
-
-		// make this slot in use
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertTrue(slotManager.isAllocated(slotId));
-
-		// slot status is confirmed
-		SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE,
-			request.getAllocationId(), request.getJobId());
-		slotManager.updateSlotStatus(slotStatus2);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertTrue(slotManager.isAllocated(slotId));
-	}
-
-	/**
-	 * Tests that we had a slot in-use, but it's empty according to the SlotReport
-	 */
-	@Test
-	public void testExistingInUseSlotAdjustedToEmpty() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request1);
-
-		// make this slot in use
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
-
-		// another request pending
-		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request2);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotId));
-		assertTrue(slotManager.isAllocated(request1.getAllocationId()));
-
-
-		// but slot is reported empty again, request2 will be fulfilled, request1 will be missing
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotId));
-		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-	}
-
-	/**
-	 * Tests that we had a slot in use, and it's also reported in use by TaskManager, but the allocation
-	 * information didn't match.
-	 */
-	@Test
-	public void testExistingInUseSlotWithDifferentAllocationInfo() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request);
-
-		// make this slot in use
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotId));
-		assertTrue(slotManager.isAllocated(request.getAllocationId()));
-
-		SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
-		// update slot status with different allocation info
-		slotManager.updateSlotStatus(slotStatus2);
-
-		// original request is missing and won't be allocated
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotId));
-		assertFalse(slotManager.isAllocated(request.getAllocationId()));
-		assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
-	}
-
-	/**
-	 * Tests that we had a free slot, and it's confirmed by SlotReport
-	 */
-	@Test
-	public void testExistingEmptySlotUpdateStatus() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
-		slotManager.addFreeSlot(slot);
-
-		SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-	}
-
-	/**
-	 * Tests that we had a free slot, and it's reported in-use by TaskManager
-	 */
-	@Test
-	public void testExistingEmptySlotAdjustedToInUse() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
-		slotManager.addFreeSlot(slot);
-
-		SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE,
-			new AllocationID(), new JobID());
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slot.getSlotId()));
-	}
-
-	/**
-	 * Tests that we did some allocation but failed / rejected by TaskManager, request will retry
-	 */
-	@Test
-	public void testSlotAllocationFailedAtTaskManager() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
-		slotManager.addFreeSlot(slot);
-
-		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slot.getSlotId()));
-
-		slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-	}
-
-
-	/**
-	 * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request
-	 */
-	@Test
-	public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
-		slotManager.addFreeSlot(slot);
-
-		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request);
-
-		// slot is set empty by heartbeat
-		SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), slot.getResourceProfile());
-		slotManager.updateSlotStatus(slotStatus);
-
-		// another request took this slot
-		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request2);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertFalse(slotManager.isAllocated(request.getAllocationId()));
-		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-
-		// original request should be pended
-		slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertFalse(slotManager.isAllocated(request.getAllocationId()));
-		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-	}
-
-	@Test
-	public void testNotifyTaskManagerFailure() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
-		ResourceID resource1 = ResourceID.generate();
-		ResourceID resource2 = ResourceID.generate();
-
-		ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE);
-		ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE);
-		ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE);
-		ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE);
-
-		slotManager.addFreeSlot(slot11);
-		slotManager.addFreeSlot(slot21);
-
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-
-		assertEquals(2, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-
-		slotManager.addFreeSlot(slot12);
-		slotManager.addFreeSlot(slot22);
-
-		assertEquals(2, slotManager.getAllocatedSlotCount());
-		assertEquals(2, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-
-		slotManager.notifyTaskManagerFailure(resource2);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-
-		// notify an not exist resource failure
-		slotManager.notifyTaskManagerFailure(ResourceID.generate());
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-	}
-
-	// ------------------------------------------------------------------------
-	//  testing utilities
-	// ------------------------------------------------------------------------
-
-	private void directlyProvideFreeSlots(
-		final SlotManager slotManager,
-		final ResourceProfile resourceProfile,
-		final int freeSlotNum)
-	{
-		for (int i = 0; i < freeSlotNum; ++i) {
-			slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile)));
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  testing classes
-	// ------------------------------------------------------------------------
-
-	private static class TestingSlotManager extends SlotManager {
-
-		private final List<ResourceProfile> allocatedContainers;
-
-		TestingSlotManager(ResourceManagerGateway resourceManagerGateway) {
-			super(resourceManagerGateway);
-			this.allocatedContainers = new LinkedList<>();
-		}
-
-		/**
-		 * Choose slot randomly if it matches requirement
-		 *
-		 * @param request   The slot request
-		 * @param freeSlots All slots which can be used
-		 * @return The chosen slot or null if cannot find a match
-		 */
-		@Override
-		protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
-			for (ResourceSlot slot : freeSlots.values()) {
-				if (slot.isMatchingRequirement(request.getResourceProfile())) {
-					return slot;
-				}
-			}
-			return null;
-		}
-
-		/**
-		 * Choose request randomly if offered slot can match its requirement
-		 *
-		 * @param offeredSlot     The free slot
-		 * @param pendingRequests All the pending slot requests
-		 * @return The chosen request's AllocationID or null if cannot find a match
-		 */
-		@Override
-		protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot,
-			Map<AllocationID, SlotRequest> pendingRequests)
-		{
-			for (Map.Entry<AllocationID, SlotRequest> pendingRequest : pendingRequests.entrySet()) {
-				if (offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile())) {
-					return pendingRequest.getValue();
-				}
-			}
-			return null;
-		}
-
-		@Override
-		protected void allocateContainer(ResourceProfile resourceProfile) {
-			allocatedContainers.add(resourceProfile);
-		}
-
-		List<ResourceProfile> getAllocatedContainers() {
-			return allocatedContainers;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
new file mode 100644
index 0000000..80fa19c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+import akka.dispatch.Futures;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the generic retrying registration class, validating the failure, retry, and back-off behavior.
+ */
+public class RetryingRegistrationTest extends TestLogger {
+
+	@Test
+	public void testSimpleSuccessfulRegistration() throws Exception {
+		final String testId = "laissez les bon temps roulez";
+		final String testEndpointAddress = "<test-address>";
+		final UUID leaderId = UUID.randomUUID();
+
+		// an endpoint that immediately returns success
+		TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId));
+		TestingRpcService rpc = new TestingRpcService();
+
+		try {
+			rpc.registerGateway(testEndpointAddress, testGateway);
+
+			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+			registration.startRegistration();
+
+			Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
+			assertNotNull(future);
+
+			// multiple accesses return the same future
+			assertEquals(future, registration.getFuture());
+
+			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success = 
+					Await.result(future, new FiniteDuration(10, SECONDS));
+
+			// validate correct invocation and result
+			assertEquals(testId, success.f1.getCorrelationId());
+			assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
+		}
+		finally {
+			testGateway.stop();
+			rpc.stopService();
+		}
+	}
+	
+	@Test
+	public void testPropagateFailures() throws Exception {
+		final String testExceptionMessage = "testExceptionMessage";
+
+		// RPC service that fails with exception upon the connection
+		RpcService rpc = mock(RpcService.class);
+		when(rpc.connect(anyString(), any(Class.class))).thenThrow(new RuntimeException(testExceptionMessage));
+
+		TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "testaddress", UUID.randomUUID());
+		registration.startRegistration();
+
+		Future<?> future = registration.getFuture();
+		assertTrue(future.failed().isCompleted());
+
+		assertEquals(testExceptionMessage, future.failed().value().get().get().getMessage());
+	}
+
+	@Test
+	public void testRetryConnectOnFailure() throws Exception {
+		final String testId = "laissez les bon temps roulez";
+		final UUID leaderId = UUID.randomUUID();
+
+		ExecutorService executor = Executors.newCachedThreadPool();
+		TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId));
+
+		try {
+			// RPC service that fails upon the first connection, but succeeds on the second
+			RpcService rpc = mock(RpcService.class);
+			when(rpc.connect(anyString(), any(Class.class))).thenReturn(
+					Futures.failed(new Exception("test connect failure")),  // first connection attempt fails
+					Futures.successful(testGateway)                         // second connection attempt succeeds
+			);
+			when(rpc.getExecutionContext()).thenReturn(ExecutionContext$.MODULE$.fromExecutor(executor));
+
+			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId);
+			registration.startRegistration();
+
+			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+					Await.result(registration.getFuture(), new FiniteDuration(10, SECONDS));
+
+			// validate correct invocation and result
+			assertEquals(testId, success.f1.getCorrelationId());
+			assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
+		}
+		finally {
+			testGateway.stop();
+			executor.shutdown();
+		}
+	}
+
+	@Test
+	public void testRetriesOnTimeouts() throws Exception {
+		final String testId = "rien ne va plus";
+		final String testEndpointAddress = "<test-address>";
+		final UUID leaderId = UUID.randomUUID();
+
+		// an endpoint that immediately returns futures with timeouts before returning a successful future
+		TestRegistrationGateway testGateway = new TestRegistrationGateway(
+				null, // timeout
+				null, // timeout
+				new TestRegistrationSuccess(testId) // success
+		);
+
+		TestingRpcService rpc = new TestingRpcService();
+
+		try {
+			rpc.registerGateway(testEndpointAddress, testGateway);
+	
+			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+	
+			long started = System.nanoTime();
+			registration.startRegistration();
+	
+			Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
+			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+					Await.result(future, new FiniteDuration(10, SECONDS));
+	
+			long finished = System.nanoTime();
+			long elapsedMillis = (finished - started) / 1000000;
+	
+			// validate correct invocation and result
+			assertEquals(testId, success.f1.getCorrelationId());
+			assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
+	
+			// validate that some retry-delay / back-off behavior happened
+			assertTrue("retries did not properly back off", elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT);
+		}
+		finally {
+			rpc.stopService();
+			testGateway.stop();
+		}
+	}
+
+	@Test
+	public void testDecline() throws Exception {
+		final String testId = "qui a coupe le fromage";
+		final String testEndpointAddress = "<test-address>";
+		final UUID leaderId = UUID.randomUUID();
+
+		TestingRpcService rpc = new TestingRpcService();
+
+		TestRegistrationGateway testGateway = new TestRegistrationGateway(
+				null, // timeout
+				new RegistrationResponse.Decline("no reason "),
+				null, // timeout
+				new TestRegistrationSuccess(testId) // success
+		);
+
+		try {
+			rpc.registerGateway(testEndpointAddress, testGateway);
+
+			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+
+			long started = System.nanoTime();
+			registration.startRegistration();
+	
+			Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
+			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+					Await.result(future, new FiniteDuration(10, SECONDS));
+
+			long finished = System.nanoTime();
+			long elapsedMillis = (finished - started) / 1000000;
+
+			// validate correct invocation and result
+			assertEquals(testId, success.f1.getCorrelationId());
+			assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
+
+			// validate that some retry-delay / back-off behavior happened
+			assertTrue("retries did not properly back off", elapsedMillis >= 
+					2 * TestRetryingRegistration.INITIAL_TIMEOUT + TestRetryingRegistration.DELAY_ON_DECLINE);
+		}
+		finally {
+			testGateway.stop();
+			rpc.stopService();
+		}
+	}
+	
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testRetryOnError() throws Exception {
+		final String testId = "Petit a petit, l'oiseau fait son nid";
+		final String testEndpointAddress = "<test-address>";
+		final UUID leaderId = UUID.randomUUID();
+
+		TestingRpcService rpc = new TestingRpcService();
+
+		try {
+			// gateway that upon calls first responds with a failure, then with a success
+			TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
+
+			when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(
+					Futures.<RegistrationResponse>failed(new Exception("test exception")),
+					Futures.<RegistrationResponse>successful(new TestRegistrationSuccess(testId)));
+			
+			rpc.registerGateway(testEndpointAddress, testGateway);
+
+			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+
+			long started = System.nanoTime();
+			registration.startRegistration();
+
+			Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
+			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+					Await.result(future, new FiniteDuration(10, SECONDS));
+
+			long finished = System.nanoTime();
+			long elapsedMillis = (finished - started) / 1000000;
+			
+			assertEquals(testId, success.f1.getCorrelationId());
+
+			// validate that some retry-delay / back-off behavior happened
+			assertTrue("retries did not properly back off",
+					elapsedMillis >= TestRetryingRegistration.DELAY_ON_ERROR);
+		}
+		finally {
+			rpc.stopService();
+		}
+	}
+
+	@Test
+	public void testCancellation() throws Exception {
+		final String testEndpointAddress = "my-test-address";
+		final UUID leaderId = UUID.randomUUID();
+
+		TestingRpcService rpc = new TestingRpcService();
+
+		try {
+			Promise<RegistrationResponse> result = Futures.promise();
+
+			TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
+			when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result.future());
+
+			rpc.registerGateway(testEndpointAddress, testGateway);
+
+			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+			registration.startRegistration();
+
+			// cancel and fail the current registration attempt
+			registration.cancel();
+			result.failure(new TimeoutException());
+
+			// there should not be a second registration attempt
+			verify(testGateway, atMost(1)).registrationCall(any(UUID.class), anyLong());
+		}
+		finally {
+			rpc.stopService();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test registration
+	// ------------------------------------------------------------------------
+
+	private static class TestRegistrationSuccess extends RegistrationResponse.Success {
+		private static final long serialVersionUID = 5542698790917150604L;
+
+		private final String correlationId;
+
+		private TestRegistrationSuccess(String correlationId) {
+			this.correlationId = correlationId;
+		}
+
+		public String getCorrelationId() {
+			return correlationId;
+		}
+	}
+
+	private static class TestRetryingRegistration extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
+
+		// we use shorter timeouts here to speed up the tests
+		static final long INITIAL_TIMEOUT = 20;
+		static final long MAX_TIMEOUT = 200;
+		static final long DELAY_ON_ERROR = 200;
+		static final long DELAY_ON_DECLINE = 200;
+
+		public TestRetryingRegistration(RpcService rpc, String targetAddress, UUID leaderId) {
+			super(LoggerFactory.getLogger(RetryingRegistrationTest.class),
+					rpc, "TestEndpoint",
+					TestRegistrationGateway.class,
+					targetAddress, leaderId,
+					INITIAL_TIMEOUT, MAX_TIMEOUT, DELAY_ON_ERROR, DELAY_ON_DECLINE);
+		}
+
+		@Override
+		protected Future<RegistrationResponse> invokeRegistration(
+				TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) {
+			return gateway.registrationCall(leaderId, timeoutMillis);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
new file mode 100644
index 0000000..431fbe8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+import akka.dispatch.Futures;
+
+import org.apache.flink.runtime.rpc.TestingGatewayBase;
+import org.apache.flink.util.Preconditions;
+
+import scala.concurrent.Future;
+
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestRegistrationGateway extends TestingGatewayBase {
+
+	private final BlockingQueue<RegistrationCall> invocations;
+
+	private final RegistrationResponse[] responses;
+
+	private int pos;
+
+	public TestRegistrationGateway(RegistrationResponse... responses) {
+		Preconditions.checkArgument(responses != null && responses.length > 0);
+
+		this.invocations = new LinkedBlockingQueue<>();
+		this.responses = responses;
+		
+	}
+
+	// ------------------------------------------------------------------------
+
+	public Future<RegistrationResponse> registrationCall(UUID leaderId, long timeout) {
+		invocations.add(new RegistrationCall(leaderId, timeout));
+
+		RegistrationResponse response = responses[pos];
+		if (pos < responses.length - 1) {
+			pos++;
+		}
+
+		// return a completed future (for a proper value), or one that never completes and will time out (for null)
+		return response != null ? Futures.successful(response) : this.<RegistrationResponse>futureWithTimeout(timeout);
+	}
+
+	public BlockingQueue<RegistrationCall> getInvocations() {
+		return invocations;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static class RegistrationCall {
+		private final UUID leaderId;
+		private final long timeout;
+
+		public RegistrationCall(UUID leaderId, long timeout) {
+			this.leaderId = leaderId;
+			this.timeout = timeout;
+		}
+
+		public UUID leaderId() {
+			return leaderId;
+		}
+
+		public long timeout() {
+			return timeout;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
deleted file mode 100644
index 32c6cac..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.messages.StopCluster;
-import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.testingUtils.TestingMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.TestingResourceManager;
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import scala.Option;
-
-
-/**
- * Runs tests to ensure that a cluster is shutdown properly.
- */
-public class ClusterShutdownITCase extends TestLogger {
-
-	private static ActorSystem system;
-
-	private static Configuration config = new Configuration();
-
-	@BeforeClass
-	public static void setup() {
-		system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-	}
-
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(system);
-	}
-
-	/**
-	 * Tests a faked cluster shutdown procedure without the ResourceManager.
-	 */
-	@Test
-	public void testClusterShutdownWithoutResourceManager() {
-
-		new JavaTestKit(system){{
-		new Within(duration("30 seconds")) {
-		@Override
-		protected void run() {
-
-			ActorGateway me =
-				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-
-			// start job manager which doesn't shutdown the actor system
-			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "jobmanager1");
-
-			// Tell the JobManager to inform us of shutdown actions
-			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-			// Register a TaskManager
-			ActorGateway taskManager =
-				TestingUtils.createTaskManager(system, jobManager, config, true, true);
-
-			// Tell the TaskManager to inform us of TaskManager shutdowns
-			taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-
-			// No resource manager connected
-			jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
-
-			expectMsgAllOf(
-				new TestingMessages.ComponentShutdown(taskManager.actor()),
-				new TestingMessages.ComponentShutdown(jobManager.actor()),
-				StopClusterSuccessful.getInstance()
-			);
-
-		}};
-		}};
-	}
-
-	/**
-	 * Tests a faked cluster shutdown procedure with the ResourceManager.
-	 */
-	@Test
-	public void testClusterShutdownWithResourceManager() {
-
-		new JavaTestKit(system){{
-		new Within(duration("30 seconds")) {
-		@Override
-		protected void run() {
-
-			ActorGateway me =
-				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-
-			// start job manager which doesn't shutdown the actor system
-			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "jobmanager2");
-
-			// Tell the JobManager to inform us of shutdown actions
-			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-			// Register a TaskManager
-			ActorGateway taskManager =
-				TestingUtils.createTaskManager(system, jobManager, config, true, true);
-
-			// Tell the TaskManager to inform us of TaskManager shutdowns
-			taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-			// Start resource manager and let it register
-			ActorGateway resourceManager =
-				TestingUtils.createResourceManager(system, jobManager.actor(), config);
-
-			// Tell the ResourceManager to inform us of ResourceManager shutdowns
-			resourceManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-			// notify about a resource manager registration at the job manager
-			resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
-
-			// Wait for resource manager
-			expectMsgEquals(Messages.getAcknowledge());
-
-
-			// Shutdown cluster with resource manager connected
-			jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
-
-			expectMsgAllOf(
-				new TestingMessages.ComponentShutdown(taskManager.actor()),
-				new TestingMessages.ComponentShutdown(jobManager.actor()),
-				new TestingMessages.ComponentShutdown(resourceManager.actor()),
-				StopClusterSuccessful.getInstance()
-			);
-
-		}};
-		}};
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
new file mode 100644
index 0000000..5799e62
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * resourceManager HA test, including grant leadership and revoke leadership
+ */
+public class ResourceManagerHATest {
+
+	@Test
+	public void testGrantAndRevokeLeadership() throws Exception {
+		// mock a RpcService which will return a special RpcGateway when call its startServer method, the returned RpcGateway directly execute runAsync call
+		TestingResourceManagerGatewayProxy gateway = mock(TestingResourceManagerGatewayProxy.class);
+		doCallRealMethod().when(gateway).runAsync(any(Runnable.class));
+
+		RpcService rpcService = mock(RpcService.class);
+		when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway);
+
+		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
+
+		final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices);
+		resourceManager.start();
+		// before grant leadership, resourceManager's leaderId is null
+		Assert.assertNull(resourceManager.getLeaderSessionID());
+		final UUID leaderId = UUID.randomUUID();
+		leaderElectionService.isLeader(leaderId);
+		// after grant leadership, resourceManager's leaderId has value
+		Assert.assertEquals(leaderId, resourceManager.getLeaderSessionID());
+		// then revoke leadership, resourceManager's leaderId is null again
+		leaderElectionService.notLeader();
+		Assert.assertNull(resourceManager.getLeaderSessionID());
+	}
+
+	private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutor, StartStoppable, RpcGateway {
+		@Override
+		public void runAsync(Runnable runnable) {
+			runnable.run();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
deleted file mode 100644
index 0c2ca1a..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.messages.RegistrationMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.TestingResourceManager;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import scala.Option;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * It cases which test the interaction of the resource manager with job manager and task managers.
- * Runs all tests in one Actor system.
- */
-public class ResourceManagerITCase extends TestLogger {
-
-	private static ActorSystem system;
-
-	private static Configuration config = new Configuration();
-
-	@BeforeClass
-	public static void setup() {
-		system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-	}
-
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(system);
-	}
-
-	/**
-	 * Tests whether the resource manager connects and reconciles existing task managers.
-	 */
-	@Test
-	public void testResourceManagerReconciliation() {
-
-		new JavaTestKit(system){{
-		new Within(duration("10 seconds")) {
-		@Override
-		protected void run() {
-
-			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "ReconciliationTest");
-			ActorGateway me =
-				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-
-			// !! no resource manager started !!
-
-			ResourceID resourceID = ResourceID.generate();
-
-			TaskManagerLocation location = mock(TaskManagerLocation.class);
-			when(location.getResourceID()).thenReturn(resourceID);
-
-			HardwareDescription resourceProfile = HardwareDescription.extractFromSystem(1_000_000);
-
-			jobManager.tell(
-				new RegistrationMessages.RegisterTaskManager(resourceID, location, resourceProfile, 1),
-				me);
-
-			expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class);
-
-			// now start the resource manager
-			ActorGateway resourceManager =
-				TestingUtils.createResourceManager(system, jobManager.actor(), config);
-
-			// register at testing job manager to receive a message once a resource manager registers
-			resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
-
-			// Wait for resource manager
-			expectMsgEquals(Messages.getAcknowledge());
-
-			// check if we registered the task manager resource
-			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), me);
-
-			TestingResourceManager.GetRegisteredResourcesReply reply =
-				expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-			assertEquals(1, reply.resources.size());
-			assertTrue(reply.resources.contains(resourceID));
-
-		}};
-		}};
-	}
-
-	/**
-	 * Tests whether the resource manager gets informed upon TaskManager registration.
-	 */
-	@Test
-	public void testResourceManagerTaskManagerRegistration() {
-
-		new JavaTestKit(system){{
-		new Within(duration("30 seconds")) {
-		@Override
-		protected void run() {
-
-			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "RegTest");
-			ActorGateway me =
-				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-
-			// start the resource manager
-			ActorGateway resourceManager =
-				TestingUtils.createResourceManager(system, jobManager.actor(), config);
-
-			// notify about a resource manager registration at the job manager
-			resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
-
-			// Wait for resource manager
-			expectMsgEquals(Messages.getAcknowledge());
-
-			// start task manager and wait for registration
-			ActorGateway taskManager =
-				TestingUtils.createTaskManager(system, jobManager.actor(), config, true, true);
-
-			// check if we registered the task manager resource
-			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), me);
-
-			TestingResourceManager.GetRegisteredResourcesReply reply =
-				expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-			assertEquals(1, reply.resources.size());
-
-		}};
-		}};
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
deleted file mode 100644
index 043c81c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
-import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
-import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.TestingResourceManager;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import scala.Option;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-/**
- * General tests for the resource manager component.
- */
-public class ResourceManagerTest {
-
-	private static ActorSystem system;
-
-	private static ActorGateway fakeJobManager;
-	private static ActorGateway resourceManager;
-
-	private static Configuration config = new Configuration();
-
-	@BeforeClass
-	public static void setup() {
-		system = AkkaUtils.createLocalActorSystem(config);
-	}
-
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(system);
-	}
-
-	/**
-	 * Tests the registration and reconciliation of the ResourceManager with the JobManager
-	 */
-	@Test
-	public void testJobManagerRegistrationAndReconciliation() {
-		new JavaTestKit(system){{
-		new Within(duration("10 seconds")) {
-		@Override
-		protected void run() {
-			fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-			resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
-
-			expectMsgClass(RegisterResourceManager.class);
-
-			List<ResourceID> resourceList = new ArrayList<>();
-			resourceList.add(ResourceID.generate());
-			resourceList.add(ResourceID.generate());
-			resourceList.add(ResourceID.generate());
-
-			resourceManager.tell(
-				new RegisterResourceManagerSuccessful(fakeJobManager.actor(), resourceList),
-				fakeJobManager);
-
-			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
-			TestingResourceManager.GetRegisteredResourcesReply reply =
-				expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-			for (ResourceID id : resourceList) {
-				if (!reply.resources.contains(id)) {
-					fail("Expected to find all resources that were provided during registration.");
-				}
-			}
-		}};
-		}};
-	}
-
-	/**
-	 * Tests delayed or erroneous registration of the ResourceManager with the JobManager
-	 */
-	@Test
-	public void testDelayedJobManagerRegistration() {
-		new JavaTestKit(system){{
-		new Within(duration("10 seconds")) {
-		@Override
-		protected void run() {
-
-			// set a short timeout for lookups
-			Configuration shortTimeoutConfig = config.clone();
-			shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "1 s");
-
-			fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-			resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), shortTimeoutConfig);
-
-			// wait for registration message
-			RegisterResourceManager msg = expectMsgClass(RegisterResourceManager.class);
-			// give wrong response
-			getLastSender().tell(new JobManagerMessages.LeaderSessionMessage(null, new Object()),
-				fakeJobManager.actor());
-
-			// expect another retry and let it time out
-			expectMsgClass(RegisterResourceManager.class);
-
-			// wait for next try after timeout
-			expectMsgClass(RegisterResourceManager.class);
-
-		}};
-		}};
-	}
-
-	@Test
-	public void testTriggerReconnect() {
-		new JavaTestKit(system){{
-		new Within(duration("10 seconds")) {
-		@Override
-		protected void run() {
-
-			// set a long timeout for lookups such that the test fails in case of timeouts
-			Configuration shortTimeoutConfig = config.clone();
-			shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "99999 s");
-
-			fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-			resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), shortTimeoutConfig);
-
-			// wait for registration message
-			RegisterResourceManager msg = expectMsgClass(RegisterResourceManager.class);
-			// all went well
-			resourceManager.tell(
-				new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()),
-				fakeJobManager);
-
-			// force a reconnect
-			resourceManager.tell(
-				new TriggerRegistrationAtJobManager(fakeJobManager.actor()),
-				fakeJobManager);
-
-			// new registration attempt should come in
-			expectMsgClass(RegisterResourceManager.class);
-
-		}};
-		}};
-	}
-
-	/**
-	 * Tests the registration and accounting of resources at the ResourceManager.
-	 */
-	@Test
-	public void testTaskManagerRegistration() {
-		new JavaTestKit(system){{
-		new Within(duration("10 seconds")) {
-		@Override
-		protected void run() {
-
-			fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-			resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
-
-			// register with JM
-			expectMsgClass(RegisterResourceManager.class);
-			resourceManager.tell(
-				new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()),
-				fakeJobManager);
-
-			ResourceID resourceID = ResourceID.generate();
-
-			// Send task manager registration
-			resourceManager.tell(new NotifyResourceStarted(resourceID),
-				fakeJobManager);
-
-			expectMsgClass(Acknowledge.class);
-
-			// check for number registration of registered resources
-			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
-			TestingResourceManager.GetRegisteredResourcesReply reply =
-				expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-			assertEquals(1, reply.resources.size());
-
-			// Send task manager registration again
-			resourceManager.tell(new NotifyResourceStarted(resourceID),
-				fakeJobManager);
-
-			expectMsgClass(Acknowledge.class);
-
-			// check for number registration of registered resources
-			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
-			reply = expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-			assertEquals(1, reply.resources.size());
-
-			// Send invalid null resource id to throw an exception during resource registration
-			resourceManager.tell(new NotifyResourceStarted(null),
-				fakeJobManager);
-
-			expectMsgClass(Acknowledge.class);
-
-			// check for number registration of registered resources
-			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
-			reply = expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-			assertEquals(1, reply.resources.size());
-		}};
-		}};
-	}
-
-	@Test
-	public void testResourceRemoval() {
-		new JavaTestKit(system){{
-		new Within(duration("10 seconds")) {
-		@Override
-		protected void run() {
-
-			fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-			resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
-
-			// register with JM
-			expectMsgClass(RegisterResourceManager.class);
-			resourceManager.tell(
-				new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()),
-				fakeJobManager);
-
-			ResourceID resourceID = ResourceID.generate();
-
-			// remove unknown resource
-			resourceManager.tell(new RemoveResource(resourceID), fakeJobManager);
-
-			// Send task manager registration
-			resourceManager.tell(new NotifyResourceStarted(resourceID),
-				fakeJobManager);
-
-			expectMsgClass(Acknowledge.class);
-
-			// check for number registration of registered resources
-			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
-			TestingResourceManager.GetRegisteredResourcesReply reply =
-				expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-			assertEquals(1, reply.resources.size());
-			assertTrue(reply.resources.contains(resourceID));
-
-			// remove resource
-			resourceManager.tell(new RemoveResource(resourceID), fakeJobManager);
-
-			// check for number registration of registered resources
-			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
-			reply =	expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-			assertEquals(0, reply.resources.size());
-
-		}};
-		}};
-	}
-
-	/**
-	 * Tests notification of JobManager about a failed resource.
-	 */
-	@Test
-	public void testResourceFailureNotification() {
-		new JavaTestKit(system){{
-		new Within(duration("10 seconds")) {
-		@Override
-		protected void run() {
-
-			fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-			resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
-
-			// register with JM
-			expectMsgClass(RegisterResourceManager.class);
-			resourceManager.tell(
-				new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()),
-				fakeJobManager);
-
-			ResourceID resourceID1 = ResourceID.generate();
-			ResourceID resourceID2 = ResourceID.generate();
-
-			// Send task manager registration
-			resourceManager.tell(new NotifyResourceStarted(resourceID1),
-				fakeJobManager);
-
-			expectMsgClass(Acknowledge.class);
-
-			// Send task manager registration
-			resourceManager.tell(new NotifyResourceStarted(resourceID2),
-				fakeJobManager);
-
-			expectMsgClass(Acknowledge.class);
-
-			// check for number registration of registered resources
-			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
-			TestingResourceManager.GetRegisteredResourcesReply reply =
-				expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-			assertEquals(2, reply.resources.size());
-			assertTrue(reply.resources.contains(resourceID1));
-			assertTrue(reply.resources.contains(resourceID2));
-
-			// fail resources
-			resourceManager.tell(new TestingResourceManager.FailResource(resourceID1), fakeJobManager);
-			resourceManager.tell(new TestingResourceManager.FailResource(resourceID2), fakeJobManager);
-
-			ResourceRemoved answer = expectMsgClass(ResourceRemoved.class);
-			ResourceRemoved answer2 = expectMsgClass(ResourceRemoved.class);
-
-			assertEquals(resourceID1, answer.resourceId());
-			assertEquals(resourceID2, answer2.resourceId());
-
-		}};
-		}};
-	}
-}