You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/09/20 08:54:54 UTC

[1/2] flink git commit: [FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol

Repository: flink
Updated Branches:
  refs/heads/flip-6 17b83f11b -> d159de62f


http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
deleted file mode 100644
index 52d9d06..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
+++ /dev/null
@@ -1,538 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-public class SlotManagerTest {
-
-	private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
-
-	private static final long DEFAULT_TESTING_MEMORY = 512;
-
-	private static final ResourceProfile DEFAULT_TESTING_PROFILE =
-		new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY);
-
-	private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
-		new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2);
-
-	private ResourceManagerGateway resourceManagerGateway;
-
-	@Before
-	public void setUp() {
-		resourceManagerGateway = mock(ResourceManagerGateway.class);
-	}
-
-	/**
-	 * Tests that there are no free slots when we request, need to allocate from cluster manager master
-	 */
-	@Test
-	public void testRequestSlotWithoutFreeSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertEquals(1, slotManager.getAllocatedContainers().size());
-		assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0));
-	}
-
-	/**
-	 * Tests that there are some free slots when we request, and the request is fulfilled immediately
-	 */
-	@Test
-	public void testRequestSlotWithFreeSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
-		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
-		assertEquals(1, slotManager.getFreeSlotCount());
-
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertEquals(0, slotManager.getAllocatedContainers().size());
-	}
-
-	/**
-	 * Tests that there are some free slots when we request, but none of them are suitable
-	 */
-	@Test
-	public void testRequestSlotWithoutSuitableSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
-		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 2);
-		assertEquals(2, slotManager.getFreeSlotCount());
-
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(2, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertEquals(1, slotManager.getAllocatedContainers().size());
-		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
-	}
-
-	/**
-	 * Tests that we send duplicated slot request
-	 */
-	@Test
-	public void testDuplicatedSlotRequest() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
-
-		SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
-
-		slotManager.requestSlot(request1);
-		slotManager.requestSlot(request2);
-		slotManager.requestSlot(request2);
-		slotManager.requestSlot(request1);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertEquals(1, slotManager.getAllocatedContainers().size());
-		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
-	}
-
-	/**
-	 * Tests that we send multiple slot requests
-	 */
-	@Test
-	public void testRequestMultipleSlots() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 5);
-
-		// request 3 normal slots
-		for (int i = 0; i < 3; ++i) {
-			slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-		}
-
-		// request 2 big slots
-		for (int i = 0; i < 2; ++i) {
-			slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-		}
-
-		// request 1 normal slot again
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-
-		assertEquals(4, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(2, slotManager.getPendingRequestCount());
-		assertEquals(2, slotManager.getAllocatedContainers().size());
-		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
-		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(1));
-	}
-
-	/**
-	 * Tests that a new slot appeared in SlotReport, and we used it to fulfill a pending request
-	 */
-	@Test
-	public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-		assertEquals(1, slotManager.getPendingRequestCount());
-
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotId));
-	}
-
-	/**
-	 * Tests that a new slot appeared in SlotReport, but we have no pending request
-	 */
-	@Test
-	public void testNewlyAppearedFreeSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-	}
-
-	/**
-	 * Tests that a new slot appeared in SlotReport, but it't not suitable for all the pending requests
-	 */
-	@Test
-	public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-		assertEquals(1, slotManager.getPendingRequestCount());
-
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertFalse(slotManager.isAllocated(slotId));
-	}
-
-	/**
-	 * Tests that a new slot appeared in SlotReport, and it's been reported using by some job
-	 */
-	@Test
-	public void testNewlyAppearedInUseSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertTrue(slotManager.isAllocated(slotId));
-	}
-
-	/**
-	 * Tests that we had a slot in-use, and it's confirmed by SlotReport
-	 */
-	@Test
-	public void testExistingInUseSlotUpdateStatus() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request);
-
-		// make this slot in use
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertTrue(slotManager.isAllocated(slotId));
-
-		// slot status is confirmed
-		SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE,
-			request.getAllocationId(), request.getJobId());
-		slotManager.updateSlotStatus(slotStatus2);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertTrue(slotManager.isAllocated(slotId));
-	}
-
-	/**
-	 * Tests that we had a slot in-use, but it's empty according to the SlotReport
-	 */
-	@Test
-	public void testExistingInUseSlotAdjustedToEmpty() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request1);
-
-		// make this slot in use
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
-
-		// another request pending
-		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request2);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotId));
-		assertTrue(slotManager.isAllocated(request1.getAllocationId()));
-
-
-		// but slot is reported empty again, request2 will be fulfilled, request1 will be missing
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotId));
-		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-	}
-
-	/**
-	 * Tests that we had a slot in use, and it's also reported in use by TaskManager, but the allocation
-	 * information didn't match.
-	 */
-	@Test
-	public void testExistingInUseSlotWithDifferentAllocationInfo() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request);
-
-		// make this slot in use
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotId));
-		assertTrue(slotManager.isAllocated(request.getAllocationId()));
-
-		SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
-		// update slot status with different allocation info
-		slotManager.updateSlotStatus(slotStatus2);
-
-		// original request is missing and won't be allocated
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotId));
-		assertFalse(slotManager.isAllocated(request.getAllocationId()));
-		assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
-	}
-
-	/**
-	 * Tests that we had a free slot, and it's confirmed by SlotReport
-	 */
-	@Test
-	public void testExistingEmptySlotUpdateStatus() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
-		slotManager.addFreeSlot(slot);
-
-		SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-	}
-
-	/**
-	 * Tests that we had a free slot, and it's reported in-use by TaskManager
-	 */
-	@Test
-	public void testExistingEmptySlotAdjustedToInUse() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
-		slotManager.addFreeSlot(slot);
-
-		SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE,
-			new AllocationID(), new JobID());
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slot.getSlotId()));
-	}
-
-	/**
-	 * Tests that we did some allocation but failed / rejected by TaskManager, request will retry
-	 */
-	@Test
-	public void testSlotAllocationFailedAtTaskManager() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
-		slotManager.addFreeSlot(slot);
-
-		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slot.getSlotId()));
-
-		slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-	}
-
-
-	/**
-	 * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request
-	 */
-	@Test
-	public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
-		slotManager.addFreeSlot(slot);
-
-		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request);
-
-		// slot is set empty by heartbeat
-		SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), slot.getResourceProfile());
-		slotManager.updateSlotStatus(slotStatus);
-
-		// another request took this slot
-		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request2);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertFalse(slotManager.isAllocated(request.getAllocationId()));
-		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-
-		// original request should be pended
-		slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertFalse(slotManager.isAllocated(request.getAllocationId()));
-		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-	}
-
-	@Test
-	public void testNotifyTaskManagerFailure() {
-		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
-
-		ResourceID resource1 = ResourceID.generate();
-		ResourceID resource2 = ResourceID.generate();
-
-		ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE);
-		ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE);
-		ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE);
-		ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE);
-
-		slotManager.addFreeSlot(slot11);
-		slotManager.addFreeSlot(slot21);
-
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-
-		assertEquals(2, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-
-		slotManager.addFreeSlot(slot12);
-		slotManager.addFreeSlot(slot22);
-
-		assertEquals(2, slotManager.getAllocatedSlotCount());
-		assertEquals(2, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-
-		slotManager.notifyTaskManagerFailure(resource2);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-
-		// notify an not exist resource failure
-		slotManager.notifyTaskManagerFailure(ResourceID.generate());
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-	}
-
-	// ------------------------------------------------------------------------
-	//  testing utilities
-	// ------------------------------------------------------------------------
-
-	private void directlyProvideFreeSlots(
-		final SlotManager slotManager,
-		final ResourceProfile resourceProfile,
-		final int freeSlotNum)
-	{
-		for (int i = 0; i < freeSlotNum; ++i) {
-			slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile)));
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  testing classes
-	// ------------------------------------------------------------------------
-
-	private static class TestingSlotManager extends SlotManager {
-
-		private final List<ResourceProfile> allocatedContainers;
-
-		TestingSlotManager(ResourceManagerGateway resourceManagerGateway) {
-			super(resourceManagerGateway);
-			this.allocatedContainers = new LinkedList<>();
-		}
-
-		/**
-		 * Choose slot randomly if it matches requirement
-		 *
-		 * @param request   The slot request
-		 * @param freeSlots All slots which can be used
-		 * @return The chosen slot or null if cannot find a match
-		 */
-		@Override
-		protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
-			for (ResourceSlot slot : freeSlots.values()) {
-				if (slot.isMatchingRequirement(request.getResourceProfile())) {
-					return slot;
-				}
-			}
-			return null;
-		}
-
-		/**
-		 * Choose request randomly if offered slot can match its requirement
-		 *
-		 * @param offeredSlot     The free slot
-		 * @param pendingRequests All the pending slot requests
-		 * @return The chosen request's AllocationID or null if cannot find a match
-		 */
-		@Override
-		protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot,
-			Map<AllocationID, SlotRequest> pendingRequests)
-		{
-			for (Map.Entry<AllocationID, SlotRequest> pendingRequest : pendingRequests.entrySet()) {
-				if (offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile())) {
-					return pendingRequest.getValue();
-				}
-			}
-			return null;
-		}
-
-		@Override
-		protected void allocateContainer(ResourceProfile resourceProfile) {
-			allocatedContainers.add(resourceProfile);
-		}
-
-		List<ResourceProfile> getAllocatedContainers() {
-			return allocatedContainers;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
new file mode 100644
index 0000000..9ee9690
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class SlotManagerTest {
+
+	private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
+
+	private static final long DEFAULT_TESTING_MEMORY = 512;
+
+	private static final ResourceProfile DEFAULT_TESTING_PROFILE =
+		new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY);
+
+	private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
+		new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2);
+
+	private static TaskExecutorGateway taskExecutorGateway;
+
+	@BeforeClass
+	public static void setUp() {
+		taskExecutorGateway = Mockito.mock(TaskExecutorGateway.class);
+	}
+
+	/**
+	 * Tests that there are no free slots when we request, need to allocate from cluster manager master
+	 */
+	@Test
+	public void testRequestSlotWithoutFreeSlot() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+		assertEquals(0, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(1, slotManager.getPendingRequestCount());
+		assertEquals(1, slotManager.getAllocatedContainers().size());
+		assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0));
+	}
+
+	/**
+	 * Tests that there are some free slots when we request, and the request is fulfilled immediately
+	 */
+	@Test
+	public void testRequestSlotWithFreeSlot() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+
+		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
+		assertEquals(1, slotManager.getFreeSlotCount());
+
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertEquals(0, slotManager.getAllocatedContainers().size());
+	}
+
+	/**
+	 * Tests that there are some free slots when we request, but none of them are suitable
+	 */
+	@Test
+	public void testRequestSlotWithoutSuitableSlot() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+
+		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 2);
+		assertEquals(2, slotManager.getFreeSlotCount());
+
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+		assertEquals(0, slotManager.getAllocatedSlotCount());
+		assertEquals(2, slotManager.getFreeSlotCount());
+		assertEquals(1, slotManager.getPendingRequestCount());
+		assertEquals(1, slotManager.getAllocatedContainers().size());
+		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+	}
+
+	/**
+	 * Tests that we send duplicated slot request
+	 */
+	@Test
+	public void testDuplicatedSlotRequest() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
+
+		SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
+
+		slotManager.requestSlot(request1);
+		slotManager.requestSlot(request2);
+		slotManager.requestSlot(request2);
+		slotManager.requestSlot(request1);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(1, slotManager.getPendingRequestCount());
+		assertEquals(1, slotManager.getAllocatedContainers().size());
+		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+	}
+
+	/**
+	 * Tests that we send multiple slot requests
+	 */
+	@Test
+	public void testRequestMultipleSlots() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 5);
+
+		// request 3 normal slots
+		for (int i = 0; i < 3; ++i) {
+			slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+		}
+
+		// request 2 big slots
+		for (int i = 0; i < 2; ++i) {
+			slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+		}
+
+		// request 1 normal slot again
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+		assertEquals(4, slotManager.getAllocatedSlotCount());
+		assertEquals(1, slotManager.getFreeSlotCount());
+		assertEquals(2, slotManager.getPendingRequestCount());
+		assertEquals(2, slotManager.getAllocatedContainers().size());
+		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(1));
+	}
+
+	/**
+	 * Tests that a new slot appeared in SlotReport, and we used it to fulfill a pending request
+	 */
+	@Test
+	public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+		assertEquals(1, slotManager.getPendingRequestCount());
+
+		SlotID slotId = SlotID.generate();
+		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
+		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertTrue(slotManager.isAllocated(slotId));
+	}
+
+	/**
+	 * Tests that a new slot appeared in SlotReport, but we have no pending request
+	 */
+	@Test
+	public void testNewlyAppearedFreeSlot() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+
+		SlotID slotId = SlotID.generate();
+		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
+		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(0, slotManager.getAllocatedSlotCount());
+		assertEquals(1, slotManager.getFreeSlotCount());
+	}
+
+	/**
+	 * Tests that a new slot appeared in SlotReport, but it't not suitable for all the pending requests
+	 */
+	@Test
+	public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+		assertEquals(1, slotManager.getPendingRequestCount());
+
+		SlotID slotId = SlotID.generate();
+		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
+		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(0, slotManager.getAllocatedSlotCount());
+		assertEquals(1, slotManager.getFreeSlotCount());
+		assertEquals(1, slotManager.getPendingRequestCount());
+		assertFalse(slotManager.isAllocated(slotId));
+	}
+
+	/**
+	 * Tests that a new slot appeared in SlotReport, and it's been reported using by some job
+	 */
+	@Test
+	public void testNewlyAppearedInUseSlot() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+
+		SlotID slotId = SlotID.generate();
+		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
+		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertTrue(slotManager.isAllocated(slotId));
+	}
+
+	/**
+	 * Tests that we had a slot in-use, and it's confirmed by SlotReport
+	 */
+	@Test
+	public void testExistingInUseSlotUpdateStatus() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		slotManager.requestSlot(request);
+
+		// make this slot in use
+		SlotID slotId = SlotID.generate();
+		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
+		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertTrue(slotManager.isAllocated(slotId));
+
+		// slot status is confirmed
+		SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE,
+			request.getJobId(), request.getAllocationId());
+		slotManager.updateSlotStatus(slotStatus2);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertTrue(slotManager.isAllocated(slotId));
+	}
+
+	/**
+	 * Tests that we had a slot in-use, but it's empty according to the SlotReport
+	 */
+	@Test
+	public void testExistingInUseSlotAdjustedToEmpty() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+		SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		slotManager.requestSlot(request1);
+
+		// make this slot in use
+		SlotID slotId = SlotID.generate();
+		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
+		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+		slotManager.updateSlotStatus(slotStatus);
+
+		// another request pending
+		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		slotManager.requestSlot(request2);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(1, slotManager.getPendingRequestCount());
+		assertTrue(slotManager.isAllocated(slotId));
+		assertTrue(slotManager.isAllocated(request1.getAllocationId()));
+
+
+		// but slot is reported empty again, request2 will be fulfilled, request1 will be missing
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertTrue(slotManager.isAllocated(slotId));
+		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+	}
+
+	/**
+	 * Tests that we had a slot in use, and it's also reported in use by TaskManager, but the allocation
+	 * information didn't match.
+	 */
+	@Test
+	public void testExistingInUseSlotWithDifferentAllocationInfo() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		slotManager.requestSlot(request);
+
+		// make this slot in use
+		SlotID slotId = SlotID.generate();
+		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
+		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertTrue(slotManager.isAllocated(slotId));
+		assertTrue(slotManager.isAllocated(request.getAllocationId()));
+
+		SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
+		// update slot status with different allocation info
+		slotManager.updateSlotStatus(slotStatus2);
+
+		// original request is missing and won't be allocated
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertTrue(slotManager.isAllocated(slotId));
+		assertFalse(slotManager.isAllocated(request.getAllocationId()));
+		assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
+	}
+
+	/**
+	 * Tests that we had a free slot, and it's confirmed by SlotReport
+	 */
+	@Test
+	public void testExistingEmptySlotUpdateStatus() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+		slotManager.addFreeSlot(slot);
+
+		SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE);
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(0, slotManager.getAllocatedSlotCount());
+		assertEquals(1, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+	}
+
+	/**
+	 * Tests that we had a free slot, and it's reported in-use by TaskManager
+	 */
+	@Test
+	public void testExistingEmptySlotAdjustedToInUse() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+		final SlotID slotID = SlotID.generate();
+		slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorGateway);
+
+		ResourceSlot slot = new ResourceSlot(slotID, DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+		slotManager.addFreeSlot(slot);
+
+		SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE,
+			new JobID(), new AllocationID());
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertTrue(slotManager.isAllocated(slot.getSlotId()));
+	}
+
+	/**
+	 * Tests that we did some allocation but failed / rejected by TaskManager, request will retry
+	 */
+	@Test
+	public void testSlotAllocationFailedAtTaskManager() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+		slotManager.addFreeSlot(slot);
+
+		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		slotManager.requestSlot(request);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertTrue(slotManager.isAllocated(slot.getSlotId()));
+
+		slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+	}
+
+
+	/**
+	 * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request
+	 */
+	@Test
+	public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+		final SlotID slotID = SlotID.generate();
+		slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorGateway);
+
+		ResourceSlot slot = new ResourceSlot(slotID, DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+		slotManager.addFreeSlot(slot);
+
+		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		slotManager.requestSlot(request);
+
+		// slot is set empty by heartbeat
+		SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), slot.getResourceProfile());
+		slotManager.updateSlotStatus(slotStatus);
+
+		// another request took this slot
+		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		slotManager.requestSlot(request2);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertFalse(slotManager.isAllocated(request.getAllocationId()));
+		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+
+		// original request should be pended
+		slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(1, slotManager.getPendingRequestCount());
+		assertFalse(slotManager.isAllocated(request.getAllocationId()));
+		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+	}
+
+	@Test
+	public void testNotifyTaskManagerFailure() {
+		TestingSlotManager slotManager = new TestingSlotManager();
+
+		ResourceID resource1 = ResourceID.generate();
+		ResourceID resource2 = ResourceID.generate();
+
+		ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+		ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+		ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+		ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+
+		slotManager.addFreeSlot(slot11);
+		slotManager.addFreeSlot(slot21);
+
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+		assertEquals(2, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+
+		slotManager.addFreeSlot(slot12);
+		slotManager.addFreeSlot(slot22);
+
+		assertEquals(2, slotManager.getAllocatedSlotCount());
+		assertEquals(2, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+
+		slotManager.notifyTaskManagerFailure(resource2);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(1, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+
+		// notify an not exist resource failure
+		slotManager.notifyTaskManagerFailure(ResourceID.generate());
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(1, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+	}
+
+	// ------------------------------------------------------------------------
+	//  testing utilities
+	// ------------------------------------------------------------------------
+
+	private void directlyProvideFreeSlots(
+		final SlotManager slotManager,
+		final ResourceProfile resourceProfile,
+		final int freeSlotNum)
+	{
+		for (int i = 0; i < freeSlotNum; ++i) {
+			slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile), taskExecutorGateway));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  testing classes
+	// ------------------------------------------------------------------------
+
+	private static class TestingSlotManager extends SlotManager {
+
+		private final List<ResourceProfile> allocatedContainers;
+
+		TestingSlotManager() {
+			this.allocatedContainers = new LinkedList<>();
+		}
+
+		/**
+		 * Choose slot randomly if it matches requirement
+		 *
+		 * @param request   The slot request
+		 * @param freeSlots All slots which can be used
+		 * @return The chosen slot or null if cannot find a match
+		 */
+		@Override
+		protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
+			for (ResourceSlot slot : freeSlots.values()) {
+				if (slot.isMatchingRequirement(request.getResourceProfile())) {
+					return slot;
+				}
+			}
+			return null;
+		}
+
+		/**
+		 * Choose request randomly if offered slot can match its requirement
+		 *
+		 * @param offeredSlot     The free slot
+		 * @param pendingRequests All the pending slot requests
+		 * @return The chosen request's AllocationID or null if cannot find a match
+		 */
+		@Override
+		protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot,
+			Map<AllocationID, SlotRequest> pendingRequests)
+		{
+			for (Map.Entry<AllocationID, SlotRequest> pendingRequest : pendingRequests.entrySet()) {
+				if (offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile())) {
+					return pendingRequest.getValue();
+				}
+			}
+			return null;
+		}
+
+		@Override
+		protected void allocateContainer(ResourceProfile resourceProfile) {
+			allocatedContainers.add(resourceProfile);
+		}
+
+		List<ResourceProfile> getAllocatedContainers() {
+			return allocatedContainers;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
new file mode 100644
index 0000000..85d2880
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest extends TestLogger {
+
+	private static TestingSerialRpcService testRpcService;
+
+	@BeforeClass
+	public static void beforeClass() {
+		testRpcService = new TestingSerialRpcService();
+	}
+
+	@AfterClass
+	public static void afterClass() {
+		testRpcService.stopService();
+		testRpcService = null;
+	}
+
+	@Before
+	public void beforeTest(){
+		testRpcService.clearGateways();
+	}
+
+	/**
+	 * Tests whether
+	 * 1) SlotRequest is routed to the SlotManager
+	 * 2) SlotRequest is confirmed
+	 * 3) SlotRequest leads to a container allocation
+	 * 4) Slot becomes available and TaskExecutor gets a SlotRequest
+	 */
+	@Test
+	public void testSlotsUnavailableRequest() throws Exception {
+		final String rmAddress = "/rm1";
+		final String jmAddress = "/jm1";
+		final JobID jobID = new JobID();
+
+		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
+
+
+		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
+		ResourceManager resourceManager =
+			new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
+		resourceManager.start();
+
+		Future<RegistrationResponse> registrationFuture =
+			resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
+		try {
+			Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
+		} catch (Exception e) {
+			Assert.fail("JobManager registration Future didn't become ready.");
+		}
+
+		final AllocationID allocationID = new AllocationID();
+		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
+
+		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
+		SlotRequestReply slotRequestReply =
+			resourceManager.requestSlot(slotRequest);
+
+		// 1) SlotRequest is routed to the SlotManager
+		verify(slotManager).requestSlot(slotRequest);
+
+		// 2) SlotRequest is confirmed
+		Assert.assertEquals(
+			slotRequestReply.getAllocationID(),
+			allocationID);
+
+		// 3) SlotRequest leads to a container allocation
+		verify(slotManager, timeout(5000)).allocateContainer(resourceProfile);
+
+		Assert.assertFalse(slotManager.isAllocated(allocationID));
+
+		// slot becomes available
+		final String tmAddress = "/tm1";
+		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
+
+		final ResourceID resourceID = ResourceID.generate();
+		final SlotID slotID = new SlotID(resourceID, 0);
+
+		final SlotStatus slotStatus =
+			new SlotStatus(slotID, resourceProfile);
+		final SlotReport slotReport =
+			new SlotReport(Collections.singletonList(slotStatus), resourceID);
+		// register slot at SlotManager
+		slotManager.registerTaskExecutor(resourceID, taskExecutorGateway);
+		slotManager.updateSlotStatus(slotReport);
+
+		// 4) Slot becomes available and TaskExecutor gets a SlotRequest
+		verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(FiniteDuration.class));
+	}
+
+	/**
+	 * Tests whether
+	 * 1) a SlotRequest is routed to the SlotManager
+	 * 2) a SlotRequest is confirmed
+	 * 3) a SlotRequest leads to an allocation of a registered slot
+	 * 4) a SlotRequest is routed to the TaskExecutor
+	 */
+	@Test
+	public void testSlotAvailableRequest() throws Exception {
+		final String rmAddress = "/rm1";
+		final String jmAddress = "/jm1";
+		final String tmAddress = "/tm1";
+		final JobID jobID = new JobID();
+
+		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
+
+		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
+
+		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
+		ResourceManager resourceManager =
+			new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
+		resourceManager.start();
+
+		Future<RegistrationResponse> registrationFuture =
+			resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
+		try {
+			Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
+		} catch (Exception e) {
+			Assert.fail("JobManager registration Future didn't become ready.");
+		}
+
+		final ResourceID resourceID = ResourceID.generate();
+		final AllocationID allocationID = new AllocationID();
+		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
+		final SlotID slotID = new SlotID(resourceID, 0);
+
+		final SlotStatus slotStatus =
+			new SlotStatus(slotID, resourceProfile);
+		final SlotReport slotReport =
+			new SlotReport(Collections.singletonList(slotStatus), resourceID);
+		// register slot at SlotManager
+		slotManager.registerTaskExecutor(resourceID, taskExecutorGateway);
+		slotManager.updateSlotStatus(slotReport);
+
+		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
+		SlotRequestReply slotRequestReply =
+			resourceManager.requestSlot(slotRequest);
+
+		// 1) a SlotRequest is routed to the SlotManager
+		verify(slotManager).requestSlot(slotRequest);
+
+		// 2) a SlotRequest is confirmed
+		Assert.assertEquals(
+			slotRequestReply.getAllocationID(),
+			allocationID);
+
+		// 3) a SlotRequest leads to an allocation of a registered slot
+		Assert.assertTrue(slotManager.isAllocated(slotID));
+		Assert.assertTrue(slotManager.isAllocated(allocationID));
+
+
+		// 4) a SlotRequest is routed to the TaskExecutor
+		verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(FiniteDuration.class));
+	}
+
+
+	private static class TestingSlotManager extends SimpleSlotManager {
+
+		// change visibility of function to public for testing
+		@Override
+		public void allocateContainer(ResourceProfile resourceProfile) {
+			super.allocateContainer(resourceProfile);
+		}
+
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index 7e92e8d..2212680 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -112,4 +112,8 @@ public class TestingRpcService extends AkkaRpcService {
 			return Futures.failed(new Exception("No gateway registered under that name"));
 		}
 	}
-}
\ No newline at end of file
+
+	public void clearGateways() {
+		registeredConnections.clear();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 955edcc..01776ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -137,6 +137,10 @@ public class TestingSerialRpcService implements RpcService {
 		}
 	}
 
+	public void clearGateways() {
+		registeredConnections.clear();
+	}
+
 	private static final class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable {
 
 		private final T rpcEndpoint;


[2/2] flink git commit: [FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol

Posted by mx...@apache.org.
[FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol

- associates JobMasters with JobID instead of InstanceID
- adds TaskExecutorGateway to slot
- adds SlotManager as RM constructor parameter
- adds LeaderRetrievalListener to SlotManager to keep track of the leader id

- tests the interaction JM->RM requestSlot
- tests the interaction RM->TM requestSlot

This closes #2463


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d159de62
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d159de62
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d159de62

Branch: refs/heads/flip-6
Commit: d159de62f3bb0877fbc046256e17a6a0d94cd1ee
Parents: 17b83f1
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Sep 1 16:53:31 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 20 10:54:38 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/types/ResourceProfile.java |   2 +-
 .../clusterframework/types/ResourceSlot.java    |  14 +-
 .../resourcemanager/JobMasterRegistration.java  |  10 +-
 .../resourcemanager/RegistrationResponse.java   |   9 +-
 .../resourcemanager/ResourceManager.java        | 167 +++---
 .../resourcemanager/ResourceManagerGateway.java |   2 +-
 .../runtime/resourcemanager/SlotAssignment.java |  25 -
 .../runtime/resourcemanager/SlotManager.java    | 523 -----------------
 .../resourcemanager/SlotRequestRegistered.java  |  33 ++
 .../resourcemanager/SlotRequestRejected.java    |  34 ++
 .../resourcemanager/SlotRequestReply.java       |  41 ++
 .../slotmanager/SimpleSlotManager.java          |  59 ++
 .../slotmanager/SlotManager.java                | 579 +++++++++++++++++++
 .../flink/runtime/taskexecutor/SlotStatus.java  |   5 +-
 .../taskexecutor/TaskExecutorGateway.java       |  17 +
 .../resourcemanager/ResourceManagerHATest.java  |   4 +-
 .../resourcemanager/SlotManagerTest.java        | 538 -----------------
 .../slotmanager/SlotManagerTest.java            | 554 ++++++++++++++++++
 .../slotmanager/SlotProtocolTest.java           | 225 +++++++
 .../flink/runtime/rpc/TestingRpcService.java    |   6 +-
 .../runtime/rpc/TestingSerialRpcService.java    |   4 +
 21 files changed, 1677 insertions(+), 1174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index ff1c4bf..fa3aabc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -68,6 +68,6 @@ public class ResourceProfile implements Serializable {
 	 * @return true if the requirement is matched, otherwise false
 	 */
 	public boolean isMatching(ResourceProfile required) {
-		return Double.compare(cpuCores, required.getCpuCores()) >= 0 && memoryInMB >= required.getMemoryInMB();
+		return cpuCores >= required.getCpuCores() && memoryInMB >= required.getMemoryInMB();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
index 8a6db5f..5fb8aee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
 import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -26,7 +28,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * A ResourceSlot represents a slot located in TaskManager from ResourceManager's view. It has a unique
  * identification and resource profile which we can compare to the resource request.
  */
-public class ResourceSlot implements ResourceIDRetrievable, Serializable {
+public class ResourceSlot implements ResourceIDRetrievable {
 
 	private static final long serialVersionUID = -5853720153136840674L;
 
@@ -36,9 +38,13 @@ public class ResourceSlot implements ResourceIDRetrievable, Serializable {
 	/** The resource profile of this slot */
 	private final ResourceProfile resourceProfile;
 
-	public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile) {
+	/** Gateway to the TaskExecutor which owns the slot */
+	private final TaskExecutorGateway taskExecutorGateway;
+
+	public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorGateway taskExecutorGateway) {
 		this.slotId = checkNotNull(slotId);
 		this.resourceProfile = checkNotNull(resourceProfile);
+		this.taskExecutorGateway = taskExecutorGateway;
 	}
 
 	@Override
@@ -54,6 +60,10 @@ public class ResourceSlot implements ResourceIDRetrievable, Serializable {
 		return resourceProfile;
 	}
 
+	public TaskExecutorGateway getTaskExecutorGateway() {
+		return taskExecutorGateway;
+	}
+
 	/**
 	 * Check whether required resource profile can be matched by this slot.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
index 309dcc1..439e56b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
@@ -18,18 +18,26 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.api.common.JobID;
+
 import java.io.Serializable;
 
 public class JobMasterRegistration implements Serializable {
 	private static final long serialVersionUID = 8411214999193765202L;
 
 	private final String address;
+	private final JobID jobID;
 
-	public JobMasterRegistration(String address) {
+	public JobMasterRegistration(String address, JobID jobID) {
 		this.address = address;
+		this.jobID = jobID;
 	}
 
 	public String getAddress() {
 		return address;
 	}
+
+	public JobID getJobID() {
+		return jobID;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
index fb6c401..796e634 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
@@ -18,26 +18,19 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import org.apache.flink.runtime.instance.InstanceID;
-
 import java.io.Serializable;
 
 public class RegistrationResponse implements Serializable {
 	private static final long serialVersionUID = -2379003255993119993L;
 
 	private final boolean isSuccess;
-	private final InstanceID instanceID;
 
-	public RegistrationResponse(boolean isSuccess, InstanceID instanceID) {
+	public RegistrationResponse(boolean isSuccess) {
 		this.isSuccess = isSuccess;
-		this.instanceID = instanceID;
 	}
 
 	public boolean isSuccess() {
 		return isSuccess;
 	}
 
-	public InstanceID getInstanceID() {
-		return instanceID;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 44c022b..29aba1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -21,11 +21,13 @@ package org.apache.flink.runtime.resourcemanager;
 import akka.dispatch.Mapper;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -33,6 +35,8 @@ import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
 import java.util.HashMap;
@@ -51,16 +55,28 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
-	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
+
+	private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+	private final Map<JobID, JobMasterGateway> jobMasterGateways;
+
 	private final HighAvailabilityServices highAvailabilityServices;
-	private LeaderElectionService leaderElectionService = null;
-	private UUID leaderSessionID = null;
 
-	public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) {
+	private LeaderElectionService leaderElectionService;
+
+	private final SlotManager slotManager;
+
+	private UUID leaderSessionID;
+
+	public ResourceManager(
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			SlotManager slotManager) {
 		super(rpcService);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
 		this.jobMasterGateways = new HashMap<>();
+		this.slotManager = slotManager;
 	}
 
 	@Override
@@ -69,7 +85,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 		try {
 			super.start();
 			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
-			leaderElectionService.start(new ResourceManagerLeaderContender());
+			leaderElectionService.start(this);
 		} catch (Throwable e) {
 			log.error("A fatal error happened when starting the ResourceManager", e);
 			throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
@@ -94,7 +110,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	 */
 	@VisibleForTesting
 	UUID getLeaderSessionID() {
-		return leaderSessionID;
+		return this.leaderSessionID;
 	}
 
 	/**
@@ -105,21 +121,20 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	 */
 	@RpcMethod
 	public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
-		Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
+		final Future<JobMasterGateway> jobMasterFuture =
+			getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
+		final JobID jobID = jobMasterRegistration.getJobID();
 
 		return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() {
 			@Override
 			public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
-				InstanceID instanceID;
 
-				if (jobMasterGateways.containsKey(jobMasterGateway)) {
-					instanceID = jobMasterGateways.get(jobMasterGateway);
-				} else {
-					instanceID = new InstanceID();
-					jobMasterGateways.put(jobMasterGateway, instanceID);
+				final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
+				if (existingGateway != null) {
+					LOG.info("Replacing existing gateway {} for JobID {} with  {}.",
+						existingGateway, jobID, jobMasterGateway);
 				}
-
-				return new RegistrationResponse(true, instanceID);
+				return new RegistrationResponse(true);
 			}
 		}, getMainThreadExecutionContext());
 	}
@@ -131,9 +146,16 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	 * @return Slot assignment
 	 */
 	@RpcMethod
-	public SlotAssignment requestSlot(SlotRequest slotRequest) {
-		System.out.println("SlotRequest: " + slotRequest);
-		return new SlotAssignment();
+	public SlotRequestReply requestSlot(SlotRequest slotRequest) {
+		final JobID jobId = slotRequest.getJobId();
+		final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
+
+		if (jobMasterGateway != null) {
+			return slotManager.requestSlot(slotRequest);
+		} else {
+			LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
+			return new SlotRequestRejected(slotRequest.getAllocationId());
+		}
 	}
 
 
@@ -154,61 +176,62 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 		return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
 	}
 
-	private class ResourceManagerLeaderContender implements LeaderContender {
-
-		/**
-		 * Callback method when current resourceManager is granted leadership
-		 *
-		 * @param leaderSessionID unique leadershipID
-		 */
-		@Override
-		public void grantLeadership(final UUID leaderSessionID) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
-					ResourceManager.this.leaderSessionID = leaderSessionID;
-					// confirming the leader session ID might be blocking,
-					leaderElectionService.confirmLeaderSessionID(leaderSessionID);
-				}
-			});
-		}
 
-		/**
-		 * Callback method when current resourceManager lose leadership.
-		 */
-		@Override
-		public void revokeLeadership() {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("ResourceManager {} was revoked leadership.", getAddress());
-					jobMasterGateways.clear();
-					leaderSessionID = null;
-				}
-			});
-		}
+	// ------------------------------------------------------------------------
+	//  Leader Contender
+	// ------------------------------------------------------------------------
 
-		@Override
-		public String getAddress() {
-			return ResourceManager.this.getAddress();
-		}
+	/**
+	 * Callback method when current resourceManager is granted leadership
+	 *
+	 * @param leaderSessionID unique leadershipID
+	 */
+	@Override
+	public void grantLeadership(final UUID leaderSessionID) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
+				// confirming the leader session ID might be blocking,
+				leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+				// notify SlotManager
+				slotManager.notifyLeaderAddress(getAddress(), leaderSessionID);
+				ResourceManager.this.leaderSessionID = leaderSessionID;
+			}
+		});
+	}
 
-		/**
-		 * Handles error occurring in the leader election service
-		 *
-		 * @param exception Exception being thrown in the leader election service
-		 */
-		@Override
-		public void handleError(final Exception exception) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.error("ResourceManager received an error from the LeaderElectionService.", exception);
-					// terminate ResourceManager in case of an error
-					shutDown();
-				}
-			});
-		}
+	/**
+	 * Callback method when current resourceManager lose leadership.
+	 */
+	@Override
+	public void revokeLeadership() {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				log.info("ResourceManager {} was revoked leadership.", getAddress());
+				jobMasterGateways.clear();
+				ResourceManager.this.leaderSessionID = null;
+			}
+		});
+	}
+
+	/**
+	 * Handles error occurring in the leader election service
+	 *
+	 * @param exception Exception being thrown in the leader election service
+	 */
+	@Override
+	public void handleError(final Exception exception) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				log.error("ResourceManager received an error from the LeaderElectionService.", exception);
+				// notify SlotManager
+				slotManager.handleError(exception);
+				// terminate ResourceManager in case of an error
+				shutDown();
+			}
+		});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index b5782b0..e5c8b64 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -58,7 +58,7 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param slotRequest Slot request
 	 * @return Future slot assignment
 	 */
-	Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
+	Future<SlotRequestRegistered> requestSlot(SlotRequest slotRequest);
 
 	/**
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
deleted file mode 100644
index 695204d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import java.io.Serializable;
-
-public class SlotAssignment implements Serializable{
-	private static final long serialVersionUID = -6990813455942742322L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
deleted file mode 100644
index 5c06648..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
+++ /dev/null
@@ -1,523 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.taskexecutor.SlotReport;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request
- * slots from registered TaskManagers and issues container allocation requests in case of there are not
- * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat.
- * <p>
- * The main operation principle of SlotManager is:
- * <ul>
- * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li>
- * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li>
- * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be
- * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should
- * be handled outside SlotManager. SlotManager will make each decision based on the information it currently
- * holds.</li>
- * </ul>
- * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
- */
-public abstract class SlotManager {
-
-	private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
-
-	/** Gateway to communicate with ResourceManager */
-	private final ResourceManagerGateway resourceManagerGateway;
-
-	/** All registered slots, including free and allocated slots */
-	private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots;
-
-	/** All pending slot requests, waiting available slots to fulfil */
-	private final Map<AllocationID, SlotRequest> pendingSlotRequests;
-
-	/** All free slots that can be used to be allocated */
-	private final Map<SlotID, ResourceSlot> freeSlots;
-
-	/** All allocations, we can lookup allocations either by SlotID or AllocationID */
-	private final AllocationMap allocationMap;
-
-	public SlotManager(ResourceManagerGateway resourceManagerGateway) {
-		this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
-		this.registeredSlots = new HashMap<>(16);
-		this.pendingSlotRequests = new LinkedHashMap<>(16);
-		this.freeSlots = new HashMap<>(16);
-		this.allocationMap = new AllocationMap();
-	}
-
-	// ------------------------------------------------------------------------
-	//  slot managements
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container
-	 * allocation if we don't have enough resource. If we have free slot which can match the request, record
-	 * this allocation and forward the request to TaskManager through ResourceManager (we want this done by
-	 * RPC's main thread to avoid race condition).
-	 *
-	 * @param request The detailed request of the slot
-	 */
-	public void requestSlot(final SlotRequest request) {
-		if (isRequestDuplicated(request)) {
-			LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId());
-			return;
-		}
-
-		// try to fulfil the request with current free slots
-		ResourceSlot slot = chooseSlotToUse(request, freeSlots);
-		if (slot != null) {
-			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(),
-				request.getAllocationId(), request.getJobId());
-
-			// record this allocation in bookkeeping
-			allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId());
-
-			// remove selected slot from free pool
-			freeSlots.remove(slot.getSlotId());
-
-			// TODO: send slot request to TaskManager
-		} else {
-			LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
-				"AllocationID:{}, JobID:{}", request.getAllocationId(), request.getJobId());
-			allocateContainer(request.getResourceProfile());
-			pendingSlotRequests.put(request.getAllocationId(), request);
-		}
-	}
-
-	/**
-	 * Sync slot status with TaskManager's SlotReport.
-	 */
-	public void updateSlotStatus(final SlotReport slotReport) {
-		for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
-			updateSlotStatus(slotStatus);
-		}
-	}
-
-	/**
-	 * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.)
-	 * or really rejected by TaskManager. We shall retry this request by:
-	 * <ul>
-	 * <li>1. verify and clear all the previous allocate information for this request
-	 * <li>2. try to request slot again
-	 * </ul>
-	 * <p>
-	 * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response
-	 * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request,
-	 * but it can be taken care of by rejecting registration at JobManager.
-	 *
-	 * @param originalRequest The original slot request
-	 * @param slotId          The target SlotID
-	 */
-	public void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) {
-		final AllocationID originalAllocationId = originalRequest.getAllocationId();
-		LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}",
-			slotId, originalAllocationId, originalRequest.getJobId());
-
-		// verify the allocation info before we do anything
-		if (freeSlots.containsKey(slotId)) {
-			// this slot is currently empty, no need to de-allocate it from our allocations
-			LOG.info("Original slot is somehow empty, retrying this request");
-
-			// before retry, we should double check whether this request was allocated by some other ways
-			if (!allocationMap.isAllocated(originalAllocationId)) {
-				requestSlot(originalRequest);
-			} else {
-				LOG.info("The failed request has somehow been allocated, SlotID:{}",
-					allocationMap.getSlotID(originalAllocationId));
-			}
-		} else if (allocationMap.isAllocated(slotId)) {
-			final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
-
-			// check whether we have an agreement on whom this slot belongs to
-			if (originalAllocationId.equals(currentAllocationId)) {
-				LOG.info("De-allocate this request and retry");
-				allocationMap.removeAllocation(currentAllocationId);
-
-				// put this slot back to free pool
-				ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId));
-				freeSlots.put(slotId, slot);
-
-				// retry the request
-				requestSlot(originalRequest);
-			} else {
-				// the slot is taken by someone else, no need to de-allocate it from our allocations
-				LOG.info("Original slot is taken by someone else, current AllocationID:{}", currentAllocationId);
-
-				// before retry, we should double check whether this request was allocated by some other ways
-				if (!allocationMap.isAllocated(originalAllocationId)) {
-					requestSlot(originalRequest);
-				} else {
-					LOG.info("The failed request is somehow been allocated, SlotID:{}",
-						allocationMap.getSlotID(originalAllocationId));
-				}
-			}
-		} else {
-			LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
-		}
-	}
-
-	/**
-	 * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean up all its slots.
-	 *
-	 * @param resourceId The ResourceID of the TaskManager
-	 */
-	public void notifyTaskManagerFailure(final ResourceID resourceId) {
-		LOG.info("Resource:{} been notified failure", resourceId);
-		final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId);
-		if (slotIdsToRemove != null) {
-			for (SlotID slotId : slotIdsToRemove.keySet()) {
-				LOG.info("Removing Slot:{} upon resource failure", slotId);
-				if (freeSlots.containsKey(slotId)) {
-					freeSlots.remove(slotId);
-				} else if (allocationMap.isAllocated(slotId)) {
-					allocationMap.removeAllocation(slotId);
-				} else {
-					LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
-				}
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  internal behaviors
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Update slot status based on TaskManager's report. There are mainly two situations when we receive the report:
-	 * <ul>
-	 * <li>1. The slot is newly registered.</li>
-	 * <li>2. The slot has registered, it contains its current status.</li>
-	 * </ul>
-	 * <p>
-	 * Regarding 1: It's fairly simple, we just record this slot's status, and trigger schedule if slot is empty.
-	 * <p>
-	 * Regarding 2: It will cause some weird situation since we may have some time-gap on how the slot's status really
-	 * is. We may have some updates on the slot's allocation, but it doesn't reflected by TaskManager's heartbeat yet,
-	 * and we may make some wrong decision if we cannot guarantee we have the exact status about all the slots. So
-	 * the principle here is: We always trust TaskManager's heartbeat, we will correct our information based on that
-	 * and take next action based on the diff between our information and heartbeat status.
-	 *
-	 * @param reportedStatus Reported slot status
-	 */
-	void updateSlotStatus(final SlotStatus reportedStatus) {
-		final SlotID slotId = reportedStatus.getSlotID();
-		final ResourceSlot slot = new ResourceSlot(slotId, reportedStatus.getProfiler());
-
-		if (registerNewSlot(slot)) {
-			// we have a newly registered slot
-			LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, reportedStatus.getAllocationID());
-
-			if (reportedStatus.getAllocationID() != null) {
-				// slot in use, record this in bookkeeping
-				allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
-			} else {
-				handleFreeSlot(new ResourceSlot(slotId, reportedStatus.getProfiler()));
-			}
-		} else {
-			// slot exists, update current information
-			if (reportedStatus.getAllocationID() != null) {
-				// slot is reported in use
-				final AllocationID reportedAllocationId = reportedStatus.getAllocationID();
-
-				// check whether we also thought this slot is in use
-				if (allocationMap.isAllocated(slotId)) {
-					// we also think that slot is in use, check whether the AllocationID matches
-					final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
-
-					if (!reportedAllocationId.equals(currentAllocationId)) {
-						LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:{}",
-							slotId, currentAllocationId, reportedAllocationId);
-
-						// seems we have a disagreement about the slot assignments, need to correct it
-						allocationMap.removeAllocation(slotId);
-						allocationMap.addAllocation(slotId, reportedAllocationId);
-					}
-				} else {
-					LOG.info("Slot allocation info mismatch! SlotID:{}, current:null, reported:{}",
-						slotId, reportedAllocationId);
-
-					// we thought the slot is free, should correct this information
-					allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
-
-					// remove this slot from free slots pool
-					freeSlots.remove(slotId);
-				}
-			} else {
-				// slot is reported empty
-
-				// check whether we also thought this slot is empty
-				if (allocationMap.isAllocated(slotId)) {
-					LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null",
-						slotId, allocationMap.getAllocationID(slotId));
-
-					// we thought the slot is in use, correct it
-					allocationMap.removeAllocation(slotId);
-
-					// we have a free slot!
-					handleFreeSlot(new ResourceSlot(slotId, reportedStatus.getProfiler()));
-				}
-			}
-		}
-	}
-
-	/**
-	 * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled,
-	 * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot
-	 * to the free pool.
-	 *
-	 * @param freeSlot The free slot
-	 */
-	private void handleFreeSlot(final ResourceSlot freeSlot) {
-		SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests);
-
-		if (chosenRequest != null) {
-			pendingSlotRequests.remove(chosenRequest.getAllocationId());
-
-			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(),
-				chosenRequest.getAllocationId(), chosenRequest.getJobId());
-			allocationMap.addAllocation(freeSlot.getSlotId(), chosenRequest.getAllocationId());
-
-			// TODO: send slot request to TaskManager
-		} else {
-			freeSlots.put(freeSlot.getSlotId(), freeSlot);
-		}
-	}
-
-	/**
-	 * Check whether the request is duplicated. We use AllocationID to identify slot request, for each
-	 * formerly received slot request, it is either in pending list or already been allocated.
-	 *
-	 * @param request The slot request
-	 * @return <tt>true</tt> if the request is duplicated
-	 */
-	private boolean isRequestDuplicated(final SlotRequest request) {
-		final AllocationID allocationId = request.getAllocationId();
-		return pendingSlotRequests.containsKey(allocationId)
-			|| allocationMap.isAllocated(allocationId);
-	}
-
-	/**
-	 * Try to register slot, and tell if this slot is newly registered.
-	 *
-	 * @param slot The ResourceSlot which will be checked and registered
-	 * @return <tt>true</tt> if we meet a new slot
-	 */
-	private boolean registerNewSlot(final ResourceSlot slot) {
-		final SlotID slotId = slot.getSlotId();
-		final ResourceID resourceId = slotId.getResourceID();
-		if (!registeredSlots.containsKey(resourceId)) {
-			registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
-		}
-		return registeredSlots.get(resourceId).put(slotId, slot) == null;
-	}
-
-	private ResourceSlot getRegisteredSlot(final SlotID slotId) {
-		final ResourceID resourceId = slotId.getResourceID();
-		if (!registeredSlots.containsKey(resourceId)) {
-			return null;
-		}
-		return registeredSlots.get(resourceId).get(slotId);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Framework specific behavior
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Choose a slot to use among all free slots, the behavior is framework specified.
-	 *
-	 * @param request   The slot request
-	 * @param freeSlots All slots which can be used
-	 * @return The slot we choose to use, <tt>null</tt> if we did not find a match
-	 */
-	protected abstract ResourceSlot chooseSlotToUse(final SlotRequest request,
-		final Map<SlotID, ResourceSlot> freeSlots);
-
-	/**
-	 * Choose a pending request to fulfill when we have a free slot, the behavior is framework specified.
-	 *
-	 * @param offeredSlot     The free slot
-	 * @param pendingRequests All the pending slot requests
-	 * @return The chosen SlotRequest, <tt>null</tt> if we did not find a match
-	 */
-	protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot,
-		final Map<AllocationID, SlotRequest> pendingRequests);
-
-	/**
-	 * The framework specific code for allocating a container for specified resource profile.
-	 *
-	 * @param resourceProfile The resource profile
-	 */
-	protected abstract void allocateContainer(final ResourceProfile resourceProfile);
-
-
-	// ------------------------------------------------------------------------
-	//  Helper classes
-	// ------------------------------------------------------------------------
-
-	/**
-	 * We maintain all the allocations with SlotID and AllocationID. We are able to get or remove the allocation info
-	 * either by SlotID or AllocationID.
-	 */
-	private static class AllocationMap {
-
-		/** All allocated slots (by SlotID) */
-		private final Map<SlotID, AllocationID> allocatedSlots;
-
-		/** All allocated slots (by AllocationID), it'a a inverse view of allocatedSlots */
-		private final Map<AllocationID, SlotID> allocatedSlotsByAllocationId;
-
-		AllocationMap() {
-			this.allocatedSlots = new HashMap<>(16);
-			this.allocatedSlotsByAllocationId = new HashMap<>(16);
-		}
-
-		/**
-		 * Add a allocation
-		 *
-		 * @param slotId       The slot id
-		 * @param allocationId The allocation id
-		 */
-		void addAllocation(final SlotID slotId, final AllocationID allocationId) {
-			allocatedSlots.put(slotId, allocationId);
-			allocatedSlotsByAllocationId.put(allocationId, slotId);
-		}
-
-		/**
-		 * De-allocation with slot id
-		 *
-		 * @param slotId The slot id
-		 */
-		void removeAllocation(final SlotID slotId) {
-			if (allocatedSlots.containsKey(slotId)) {
-				final AllocationID allocationId = allocatedSlots.get(slotId);
-				allocatedSlots.remove(slotId);
-				allocatedSlotsByAllocationId.remove(allocationId);
-			}
-		}
-
-		/**
-		 * De-allocation with allocation id
-		 *
-		 * @param allocationId The allocation id
-		 */
-		void removeAllocation(final AllocationID allocationId) {
-			if (allocatedSlotsByAllocationId.containsKey(allocationId)) {
-				SlotID slotId = allocatedSlotsByAllocationId.get(allocationId);
-				allocatedSlotsByAllocationId.remove(allocationId);
-				allocatedSlots.remove(slotId);
-			}
-		}
-
-		/**
-		 * Check whether allocation exists by slot id
-		 *
-		 * @param slotId The slot id
-		 * @return true if the allocation exists
-		 */
-		boolean isAllocated(final SlotID slotId) {
-			return allocatedSlots.containsKey(slotId);
-		}
-
-		/**
-		 * Check whether allocation exists by allocation id
-		 *
-		 * @param allocationId The allocation id
-		 * @return true if the allocation exists
-		 */
-		boolean isAllocated(final AllocationID allocationId) {
-			return allocatedSlotsByAllocationId.containsKey(allocationId);
-		}
-
-		AllocationID getAllocationID(final SlotID slotId) {
-			return allocatedSlots.get(slotId);
-		}
-
-		SlotID getSlotID(final AllocationID allocationId) {
-			return allocatedSlotsByAllocationId.get(allocationId);
-		}
-
-		public int size() {
-			return allocatedSlots.size();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Testing utilities
-	// ------------------------------------------------------------------------
-
-	@VisibleForTesting
-	boolean isAllocated(final SlotID slotId) {
-		return allocationMap.isAllocated(slotId);
-	}
-
-	@VisibleForTesting
-	boolean isAllocated(final AllocationID allocationId) {
-		return allocationMap.isAllocated(allocationId);
-	}
-
-	/**
-	 * Add free slots directly to the free pool, this will not trigger pending requests allocation
-	 *
-	 * @param slot The resource slot
-	 */
-	@VisibleForTesting
-	void addFreeSlot(final ResourceSlot slot) {
-		final ResourceID resourceId = slot.getResourceID();
-		final SlotID slotId = slot.getSlotId();
-
-		if (!registeredSlots.containsKey(resourceId)) {
-			registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
-		}
-		registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
-		freeSlots.put(slotId, slot);
-	}
-
-	@VisibleForTesting
-	int getAllocatedSlotCount() {
-		return allocationMap.size();
-	}
-
-	@VisibleForTesting
-	int getFreeSlotCount() {
-		return freeSlots.size();
-	}
-
-	@VisibleForTesting
-	int getPendingRequestCount() {
-		return pendingSlotRequests.size();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
new file mode 100644
index 0000000..6b7f6dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.io.Serializable;
+
+/**
+ * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
+ */
+public class SlotRequestRegistered extends SlotRequestReply {
+
+	public SlotRequestRegistered(AllocationID allocationID) {
+		super(allocationID);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
new file mode 100644
index 0000000..cb3ec72
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.io.Serializable;
+
+/**
+ * Rejection message by the ResourceManager for a SlotRequest from the JobManager
+ */
+public class SlotRequestRejected extends SlotRequestReply {
+
+	public SlotRequestRejected(AllocationID allocationID) {
+		super(allocationID);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
new file mode 100644
index 0000000..1b85d0c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.io.Serializable;
+
+/**
+ * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
+ */
+public abstract class SlotRequestReply implements Serializable {
+
+	private static final long serialVersionUID = 42;
+
+	private final AllocationID allocationID;
+
+	public SlotRequestReply(AllocationID allocationID) {
+		this.allocationID = allocationID;
+	}
+
+	public AllocationID getAllocationID() {
+		return allocationID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
new file mode 100644
index 0000000..ef5ce31
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A simple SlotManager which ignores resource profiles.
+ */
+public class SimpleSlotManager extends SlotManager {
+
+	@Override
+	protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
+		final Iterator<ResourceSlot> slotIterator = freeSlots.values().iterator();
+		if (slotIterator.hasNext()) {
+			return slotIterator.next();
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, Map<AllocationID, SlotRequest> pendingRequests) {
+		final Iterator<SlotRequest> requestIterator = pendingRequests.values().iterator();
+		if (requestIterator.hasNext()) {
+			return requestIterator.next();
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	protected void allocateContainer(ResourceProfile resourceProfile) {
+		// TODO
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
new file mode 100644
index 0000000..96fde7d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request
+ * slots from registered TaskManagers and issues container allocation requests in case of there are not
+ * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat.
+ * <p>
+ * The main operation principle of SlotManager is:
+ * <ul>
+ * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li>
+ * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li>
+ * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be
+ * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should
+ * be handled outside SlotManager. SlotManager will make each decision based on the information it currently
+ * holds.</li>
+ * </ul>
+ * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
+ */
+public abstract class SlotManager implements LeaderRetrievalListener {
+
+	protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+	/** All registered task managers with ResourceID and gateway. */
+	private final Map<ResourceID, TaskExecutorGateway> taskManagerGateways;
+
+	/** All registered slots, including free and allocated slots */
+	private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots;
+
+	/** All pending slot requests, waiting available slots to fulfil */
+	private final Map<AllocationID, SlotRequest> pendingSlotRequests;
+
+	/** All free slots that can be used to be allocated */
+	private final Map<SlotID, ResourceSlot> freeSlots;
+
+	/** All allocations, we can lookup allocations either by SlotID or AllocationID */
+	private final AllocationMap allocationMap;
+
+	private final FiniteDuration timeout;
+
+	/** The current leader id set by the ResourceManager */
+	private UUID leaderID;
+
+	public SlotManager() {
+		this.registeredSlots = new HashMap<>(16);
+		this.pendingSlotRequests = new LinkedHashMap<>(16);
+		this.freeSlots = new HashMap<>(16);
+		this.allocationMap = new AllocationMap();
+		this.taskManagerGateways = new HashMap<>();
+		this.timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  slot managements
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container
+	 * allocation if we don't have enough resource. If we have free slot which can match the request, record
+	 * this allocation and forward the request to TaskManager through ResourceManager (we want this done by
+	 * RPC's main thread to avoid race condition).
+	 *
+	 * @param request The detailed request of the slot
+	 * @return SlotRequestRegistered The confirmation message to be send to the caller
+	 */
+	public SlotRequestRegistered requestSlot(final SlotRequest request) {
+		final AllocationID allocationId = request.getAllocationId();
+		if (isRequestDuplicated(request)) {
+			LOG.warn("Duplicated slot request, AllocationID:{}", allocationId);
+			return null;
+		}
+
+		// try to fulfil the request with current free slots
+		final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+		if (slot != null) {
+			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(),
+				allocationId, request.getJobId());
+
+			// record this allocation in bookkeeping
+			allocationMap.addAllocation(slot.getSlotId(), allocationId);
+
+			// remove selected slot from free pool
+			freeSlots.remove(slot.getSlotId());
+
+			final Future<SlotRequestReply> slotRequestReplyFuture =
+				slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
+			// TODO handle timeouts and response
+		} else {
+			LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
+				"AllocationID:{}, JobID:{}", allocationId, request.getJobId());
+			allocateContainer(request.getResourceProfile());
+			pendingSlotRequests.put(allocationId, request);
+		}
+
+		return new SlotRequestRegistered(allocationId);
+	}
+
+	/**
+	 * Sync slot status with TaskManager's SlotReport.
+	 */
+	public void updateSlotStatus(final SlotReport slotReport) {
+		for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
+			updateSlotStatus(slotStatus);
+		}
+	}
+
+	/**
+	 * Registers a TaskExecutor
+	 * @param resourceID TaskExecutor's ResourceID
+	 * @param gateway TaskExcutor's gateway
+	 */
+	public void registerTaskExecutor(ResourceID resourceID, TaskExecutorGateway gateway) {
+		this.taskManagerGateways.put(resourceID, gateway);
+	}
+
+	/**
+	 * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.)
+	 * or really rejected by TaskManager. We shall retry this request by:
+	 * <ul>
+	 * <li>1. verify and clear all the previous allocate information for this request
+	 * <li>2. try to request slot again
+	 * </ul>
+	 * <p>
+	 * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response
+	 * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request,
+	 * but it can be taken care of by rejecting registration at JobManager.
+	 *
+	 * @param originalRequest The original slot request
+	 * @param slotId          The target SlotID
+	 */
+	public void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) {
+		final AllocationID originalAllocationId = originalRequest.getAllocationId();
+		LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}",
+			slotId, originalAllocationId, originalRequest.getJobId());
+
+		// verify the allocation info before we do anything
+		if (freeSlots.containsKey(slotId)) {
+			// this slot is currently empty, no need to de-allocate it from our allocations
+			LOG.info("Original slot is somehow empty, retrying this request");
+
+			// before retry, we should double check whether this request was allocated by some other ways
+			if (!allocationMap.isAllocated(originalAllocationId)) {
+				requestSlot(originalRequest);
+			} else {
+				LOG.info("The failed request has somehow been allocated, SlotID:{}",
+					allocationMap.getSlotID(originalAllocationId));
+			}
+		} else if (allocationMap.isAllocated(slotId)) {
+			final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
+
+			// check whether we have an agreement on whom this slot belongs to
+			if (originalAllocationId.equals(currentAllocationId)) {
+				LOG.info("De-allocate this request and retry");
+				allocationMap.removeAllocation(currentAllocationId);
+
+				// put this slot back to free pool
+				ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId));
+				freeSlots.put(slotId, slot);
+
+				// retry the request
+				requestSlot(originalRequest);
+			} else {
+				// the slot is taken by someone else, no need to de-allocate it from our allocations
+				LOG.info("Original slot is taken by someone else, current AllocationID:{}", currentAllocationId);
+
+				// before retry, we should double check whether this request was allocated by some other ways
+				if (!allocationMap.isAllocated(originalAllocationId)) {
+					requestSlot(originalRequest);
+				} else {
+					LOG.info("The failed request is somehow been allocated, SlotID:{}",
+						allocationMap.getSlotID(originalAllocationId));
+				}
+			}
+		} else {
+			LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
+		}
+	}
+
+	/**
+	 * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean up all its slots.
+	 *
+	 * @param resourceId The ResourceID of the TaskManager
+	 */
+	public void notifyTaskManagerFailure(final ResourceID resourceId) {
+		LOG.info("Resource:{} been notified failure", resourceId);
+		taskManagerGateways.remove(resourceId);
+		final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId);
+		if (slotIdsToRemove != null) {
+			for (SlotID slotId : slotIdsToRemove.keySet()) {
+				LOG.info("Removing Slot: {} upon resource failure", slotId);
+				if (freeSlots.containsKey(slotId)) {
+					freeSlots.remove(slotId);
+				} else if (allocationMap.isAllocated(slotId)) {
+					allocationMap.removeAllocation(slotId);
+				} else {
+					LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
+				}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  internal behaviors
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Update slot status based on TaskManager's report. There are mainly two situations when we receive the report:
+	 * <ul>
+	 * <li>1. The slot is newly registered.</li>
+	 * <li>2. The slot has registered, it contains its current status.</li>
+	 * </ul>
+	 * <p>
+	 * Regarding 1: It's fairly simple, we just record this slot's status, and trigger schedule if slot is empty.
+	 * <p>
+	 * Regarding 2: It will cause some weird situation since we may have some time-gap on how the slot's status really
+	 * is. We may have some updates on the slot's allocation, but it doesn't reflected by TaskManager's heartbeat yet,
+	 * and we may make some wrong decision if we cannot guarantee we have the exact status about all the slots. So
+	 * the principle here is: We always trust TaskManager's heartbeat, we will correct our information based on that
+	 * and take next action based on the diff between our information and heartbeat status.
+	 *
+	 * @param reportedStatus Reported slot status
+	 */
+	void updateSlotStatus(final SlotStatus reportedStatus) {
+		final SlotID slotId = reportedStatus.getSlotID();
+
+		final TaskExecutorGateway taskExecutorGateway = taskManagerGateways.get(slotId.getResourceID());
+		if (taskExecutorGateway == null) {
+			LOG.info("Received SlotStatus but ResourceID {} is unknown to the SlotManager",
+				slotId.getResourceID());
+			return;
+		}
+
+		final ResourceSlot slot = new ResourceSlot(slotId, reportedStatus.getProfiler(), taskExecutorGateway);
+
+		if (registerNewSlot(slot)) {
+			// we have a newly registered slot
+			LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, reportedStatus.getAllocationID());
+
+			if (reportedStatus.getAllocationID() != null) {
+				// slot in use, record this in bookkeeping
+				allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
+			} else {
+				handleFreeSlot(slot);
+			}
+		} else {
+			// slot exists, update current information
+			if (reportedStatus.getAllocationID() != null) {
+				// slot is reported in use
+				final AllocationID reportedAllocationId = reportedStatus.getAllocationID();
+
+				// check whether we also thought this slot is in use
+				if (allocationMap.isAllocated(slotId)) {
+					// we also think that slot is in use, check whether the AllocationID matches
+					final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
+
+					if (!reportedAllocationId.equals(currentAllocationId)) {
+						LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:{}",
+							slotId, currentAllocationId, reportedAllocationId);
+
+						// seems we have a disagreement about the slot assignments, need to correct it
+						allocationMap.removeAllocation(slotId);
+						allocationMap.addAllocation(slotId, reportedAllocationId);
+					}
+				} else {
+					LOG.info("Slot allocation info mismatch! SlotID:{}, current:null, reported:{}",
+						slotId, reportedAllocationId);
+
+					// we thought the slot is free, should correct this information
+					allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
+
+					// remove this slot from free slots pool
+					freeSlots.remove(slotId);
+				}
+			} else {
+				// slot is reported empty
+
+				// check whether we also thought this slot is empty
+				if (allocationMap.isAllocated(slotId)) {
+					LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null",
+						slotId, allocationMap.getAllocationID(slotId));
+
+					// we thought the slot is in use, correct it
+					allocationMap.removeAllocation(slotId);
+
+					// we have a free slot!
+					handleFreeSlot(slot);
+				}
+			}
+		}
+	}
+
+	/**
+	 * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled,
+	 * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot
+	 * to the free pool.
+	 *
+	 * @param freeSlot The free slot
+	 */
+	private void handleFreeSlot(final ResourceSlot freeSlot) {
+		SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests);
+
+		if (chosenRequest != null) {
+			final AllocationID allocationId = chosenRequest.getAllocationId();
+			pendingSlotRequests.remove(allocationId);
+
+			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(),
+				allocationId, chosenRequest.getJobId());
+			allocationMap.addAllocation(freeSlot.getSlotId(), allocationId);
+
+			final Future<SlotRequestReply> slotRequestReplyFuture =
+				freeSlot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
+			// TODO handle timeouts and response
+		} else {
+			freeSlots.put(freeSlot.getSlotId(), freeSlot);
+		}
+	}
+
+	/**
+	 * Check whether the request is duplicated. We use AllocationID to identify slot request, for each
+	 * formerly received slot request, it is either in pending list or already been allocated.
+	 *
+	 * @param request The slot request
+	 * @return <tt>true</tt> if the request is duplicated
+	 */
+	private boolean isRequestDuplicated(final SlotRequest request) {
+		final AllocationID allocationId = request.getAllocationId();
+		return pendingSlotRequests.containsKey(allocationId)
+			|| allocationMap.isAllocated(allocationId);
+	}
+
+	/**
+	 * Try to register slot, and tell if this slot is newly registered.
+	 *
+	 * @param slot The ResourceSlot which will be checked and registered
+	 * @return <tt>true</tt> if we meet a new slot
+	 */
+	private boolean registerNewSlot(final ResourceSlot slot) {
+		final SlotID slotId = slot.getSlotId();
+		final ResourceID resourceId = slotId.getResourceID();
+		if (!registeredSlots.containsKey(resourceId)) {
+			registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
+		}
+		return registeredSlots.get(resourceId).put(slotId, slot) == null;
+	}
+
+	private ResourceSlot getRegisteredSlot(final SlotID slotId) {
+		final ResourceID resourceId = slotId.getResourceID();
+		if (!registeredSlots.containsKey(resourceId)) {
+			return null;
+		}
+		return registeredSlots.get(resourceId).get(slotId);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Framework specific behavior
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Choose a slot to use among all free slots, the behavior is framework specified.
+	 *
+	 * @param request   The slot request
+	 * @param freeSlots All slots which can be used
+	 * @return The slot we choose to use, <tt>null</tt> if we did not find a match
+	 */
+	protected abstract ResourceSlot chooseSlotToUse(final SlotRequest request,
+		final Map<SlotID, ResourceSlot> freeSlots);
+
+	/**
+	 * Choose a pending request to fulfill when we have a free slot, the behavior is framework specified.
+	 *
+	 * @param offeredSlot     The free slot
+	 * @param pendingRequests All the pending slot requests
+	 * @return The chosen SlotRequest, <tt>null</tt> if we did not find a match
+	 */
+	protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot,
+		final Map<AllocationID, SlotRequest> pendingRequests);
+
+	/**
+	 * The framework specific code for allocating a container for specified resource profile.
+	 *
+	 * @param resourceProfile The resource profile
+	 */
+	protected abstract void allocateContainer(final ResourceProfile resourceProfile);
+
+	// ------------------------------------------------------------------------
+	//  Helper classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * We maintain all the allocations with SlotID and AllocationID. We are able to get or remove the allocation info
+	 * either by SlotID or AllocationID.
+	 */
+	private static class AllocationMap {
+
+		/** All allocated slots (by SlotID) */
+		private final Map<SlotID, AllocationID> allocatedSlots;
+
+		/** All allocated slots (by AllocationID), it'a a inverse view of allocatedSlots */
+		private final Map<AllocationID, SlotID> allocatedSlotsByAllocationId;
+
+		AllocationMap() {
+			this.allocatedSlots = new HashMap<>(16);
+			this.allocatedSlotsByAllocationId = new HashMap<>(16);
+		}
+
+		/**
+		 * Add a allocation
+		 *
+		 * @param slotId       The slot id
+		 * @param allocationId The allocation id
+		 */
+		void addAllocation(final SlotID slotId, final AllocationID allocationId) {
+			allocatedSlots.put(slotId, allocationId);
+			allocatedSlotsByAllocationId.put(allocationId, slotId);
+		}
+
+		/**
+		 * De-allocation with slot id
+		 *
+		 * @param slotId The slot id
+		 */
+		void removeAllocation(final SlotID slotId) {
+			if (allocatedSlots.containsKey(slotId)) {
+				final AllocationID allocationId = allocatedSlots.get(slotId);
+				allocatedSlots.remove(slotId);
+				allocatedSlotsByAllocationId.remove(allocationId);
+			}
+		}
+
+		/**
+		 * De-allocation with allocation id
+		 *
+		 * @param allocationId The allocation id
+		 */
+		void removeAllocation(final AllocationID allocationId) {
+			if (allocatedSlotsByAllocationId.containsKey(allocationId)) {
+				SlotID slotId = allocatedSlotsByAllocationId.get(allocationId);
+				allocatedSlotsByAllocationId.remove(allocationId);
+				allocatedSlots.remove(slotId);
+			}
+		}
+
+		/**
+		 * Check whether allocation exists by slot id
+		 *
+		 * @param slotId The slot id
+		 * @return true if the allocation exists
+		 */
+		boolean isAllocated(final SlotID slotId) {
+			return allocatedSlots.containsKey(slotId);
+		}
+
+		/**
+		 * Check whether allocation exists by allocation id
+		 *
+		 * @param allocationId The allocation id
+		 * @return true if the allocation exists
+		 */
+		boolean isAllocated(final AllocationID allocationId) {
+			return allocatedSlotsByAllocationId.containsKey(allocationId);
+		}
+
+		AllocationID getAllocationID(final SlotID slotId) {
+			return allocatedSlots.get(slotId);
+		}
+
+		SlotID getSlotID(final AllocationID allocationId) {
+			return allocatedSlotsByAllocationId.get(allocationId);
+		}
+
+		public int size() {
+			return allocatedSlots.size();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  High availability
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+		this.leaderID = leaderSessionID;
+	}
+
+	@Override
+	public void handleError(Exception exception) {
+		LOG.error("Slot Manager received an error from the leader service", exception);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Testing utilities
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	boolean isAllocated(final SlotID slotId) {
+		return allocationMap.isAllocated(slotId);
+	}
+
+	@VisibleForTesting
+	boolean isAllocated(final AllocationID allocationId) {
+		return allocationMap.isAllocated(allocationId);
+	}
+
+	/**
+	 * Add free slots directly to the free pool, this will not trigger pending requests allocation
+	 *
+	 * @param slot The resource slot
+	 */
+	@VisibleForTesting
+	void addFreeSlot(final ResourceSlot slot) {
+		final ResourceID resourceId = slot.getResourceID();
+		final SlotID slotId = slot.getSlotId();
+
+		if (!registeredSlots.containsKey(resourceId)) {
+			registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
+		}
+		registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
+		freeSlots.put(slotId, slot);
+	}
+
+	@VisibleForTesting
+	int getAllocatedSlotCount() {
+		return allocationMap.size();
+	}
+
+	@VisibleForTesting
+	int getFreeSlotCount() {
+		return freeSlots.size();
+	}
+
+	@VisibleForTesting
+	int getPendingRequestCount() {
+		return pendingSlotRequests.size();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
index 744b674..0f57bb1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
@@ -50,7 +50,10 @@ public class SlotStatus implements Serializable {
 		this(slotID, profiler, null, null);
 	}
 
-	public SlotStatus(SlotID slotID, ResourceProfile profiler, AllocationID allocationID, JobID jobID) {
+	public SlotStatus(
+			SlotID slotID, ResourceProfile profiler,
+			JobID jobID,
+			AllocationID allocationID) {
 		this.slotID = checkNotNull(slotID, "slotID cannot be null");
 		this.profiler = checkNotNull(profiler, "profile cannot be null");
 		this.allocationID = allocationID;

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 6c99706..7257436 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -18,7 +18,12 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.UUID;
 
@@ -32,4 +37,16 @@ public interface TaskExecutorGateway extends RpcGateway {
 	// ------------------------------------------------------------------------
 
 	void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId);
+
+	/**
+	 * Send by the ResourceManager to the TaskExecutor
+	 * @param allocationID id for the request
+	 * @param resourceManagerLeaderID current leader id of the ResourceManager
+	 * @return SlotRequestReply Answer to the request
+	 */
+
+	Future<SlotRequestReply> requestSlot(
+		AllocationID allocationID,
+		UUID resourceManagerLeaderID,
+		@RpcTimeout FiniteDuration timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d159de62/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 5799e62..8183c0a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.MainThreadExecutor;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -53,7 +54,8 @@ public class ResourceManagerHATest {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 
-		final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices);
+		SlotManager slotManager = mock(SlotManager.class);
+		final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, slotManager);
 		resourceManager.start();
 		// before grant leadership, resourceManager's leaderId is null
 		Assert.assertNull(resourceManager.getLeaderSessionID());