You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/01 08:40:43 UTC

[16/50] [abbrv] flink git commit: [FLINK-4347][FLINK-4348] simplify SlotManager and integrate it with ResourceManager

http://git-wip-us.apache.org/repos/asf/flink/blob/8adceede/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 8f09152..14afd0e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -25,10 +25,8 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SimpleSlotManager;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -160,7 +158,7 @@ public class ResourceManagerJobMasterTest {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
 		highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService);
-		ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+		ResourceManager resourceManager = new TestingResourceManager(rpcService, highAvailabilityServices);
 		resourceManager.start();
 		return resourceManager;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8adceede/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index e6d1ed5..a577c26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -22,10 +22,9 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SimpleSlotManager;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.junit.After;
@@ -44,9 +43,24 @@ public class ResourceManagerTaskExecutorTest {
 
 	private TestingSerialRpcService rpcService;
 
+	private SlotReport slotReport = new SlotReport();
+
+	private static String taskExecutorAddress = "/taskExecutor1";
+
+	private ResourceID taskExecutorResourceID;
+
+	private StandaloneResourceManager resourceManager;
+
+	private UUID leaderSessionId;
+
 	@Before
 	public void setup() throws Exception {
 		rpcService = new TestingSerialRpcService();
+
+		taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
+		TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
+		resourceManager = createAndStartResourceManager(rmLeaderElectionService);
+		leaderSessionId = grantLeadership(rmLeaderElectionService);
 	}
 
 	@After
@@ -59,19 +73,15 @@ public class ResourceManagerTaskExecutorTest {
 	 */
 	@Test
 	public void testRegisterTaskExecutor() throws Exception {
-		String taskExecutorAddress = "/taskExecutor1";
-		ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
-		TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
-		final ResourceManager resourceManager = createAndStartResourceManager(rmLeaderElectionService);
-		final UUID leaderSessionId = grantLeadership(rmLeaderElectionService);
-
 		// test response successful
-		Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID);
+		Future<RegistrationResponse> successfulFuture =
+			resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
 		RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
 		assertTrue(response instanceof TaskExecutorRegistrationSuccess);
 
 		// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
-		Future<RegistrationResponse> duplicateFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID);
+		Future<RegistrationResponse> duplicateFuture =
+			resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
 		RegistrationResponse duplicateResponse = duplicateFuture.get();
 		assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
 		assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
@@ -82,15 +92,10 @@ public class ResourceManagerTaskExecutorTest {
 	 */
 	@Test
 	public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
-		String taskExecutorAddress = "/taskExecutor1";
-		ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
-		TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
-		final ResourceManager resourceManager = createAndStartResourceManager(rmLeaderElectionService);
-		final UUID leaderSessionId = grantLeadership(rmLeaderElectionService);
-
 		// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
 		UUID differentLeaderSessionID = UUID.randomUUID();
-		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID);
+		Future<RegistrationResponse> unMatchedLeaderFuture =
+			resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport);
 		assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 	}
 
@@ -99,15 +104,10 @@ public class ResourceManagerTaskExecutorTest {
 	 */
 	@Test
 	public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
-		String taskExecutorAddress = "/taskExecutor1";
-		ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
-		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
-		final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService);
-		final UUID leaderSessionId = grantLeadership(leaderElectionService);
-
 		// test throw exception when receive a registration from taskExecutor which takes invalid address
 		String invalidAddress = "/taskExecutor2";
-		Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID);
+		Future<RegistrationResponse> invalidAddressFuture =
+			resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport);
 		assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 	}
 
@@ -118,10 +118,11 @@ public class ResourceManagerTaskExecutorTest {
 		return taskExecutorResourceID;
 	}
 
-	private ResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService) {
+	private StandaloneResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService) {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
-		ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+		StandaloneResourceManager resourceManager =
+			new TestingResourceManager(rpcService, highAvailabilityServices);
 		resourceManager.start();
 		return resourceManager;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8adceede/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
new file mode 100644
index 0000000..6b4ca14
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.RpcService;
+
+public class TestingResourceManager extends StandaloneResourceManager {
+
+	public TestingResourceManager(RpcService rpcService) {
+		this(rpcService, new TestingHighAvailabilityServices());
+	}
+
+	public TestingResourceManager(
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices) {
+		this(rpcService, highAvailabilityServices, new TestingSlotManagerFactory());
+	}
+
+	public TestingResourceManager(
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			SlotManagerFactory slotManagerFactory) {
+		super(rpcService, highAvailabilityServices, slotManagerFactory);
+	}
+
+	private static class TestingSlotManagerFactory implements SlotManagerFactory {
+
+		@Override
+		public SlotManager create(ResourceManagerServices rmServices) {
+			return new TestingSlotManager(rmServices);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8adceede/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
new file mode 100644
index 0000000..0b2c42b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.mockito.Mockito;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+public class TestingSlotManager extends SlotManager {
+
+	public TestingSlotManager() {
+		this(new TestingResourceManagerServices());
+	}
+
+	public TestingSlotManager(ResourceManagerServices rmServices) {
+		super(rmServices);
+	}
+
+	@Override
+	protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
+		final Iterator<ResourceSlot> slotIterator = freeSlots.values().iterator();
+		if (slotIterator.hasNext()) {
+			return slotIterator.next();
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, Map<AllocationID, SlotRequest> pendingRequests) {
+		final Iterator<SlotRequest> requestIterator = pendingRequests.values().iterator();
+		if (requestIterator.hasNext()) {
+			return requestIterator.next();
+		} else {
+			return null;
+		}
+	}
+
+	private static class TestingResourceManagerServices implements ResourceManagerServices {
+
+		@Override
+		public void allocateResource(ResourceProfile resourceProfile) {
+
+		}
+
+		@Override
+		public Executor getAsyncExecutor() {
+			return Mockito.mock(Executor.class);
+		}
+
+		@Override
+		public Executor getMainThreadExecutor() {
+			return Mockito.mock(Executor.class);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8adceede/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 0fed79e..0d2b40d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -28,13 +28,16 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -45,7 +48,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
 
 public class SlotManagerTest {
 
@@ -59,13 +61,15 @@ public class SlotManagerTest {
 	private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
 		new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2);
 
-	private static TaskExecutorGateway taskExecutorGateway;
+	private static TaskExecutorRegistration taskExecutorRegistration;
 
 	@BeforeClass
 	public static void setUp() {
-		taskExecutorGateway = Mockito.mock(TaskExecutorGateway.class);
-		Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class)))
-			.thenReturn(new FlinkCompletableFuture<SlotRequestReply>());
+		taskExecutorRegistration = Mockito.mock(TaskExecutorRegistration.class);
+		TaskExecutorGateway gateway = Mockito.mock(TaskExecutorGateway.class);
+		Mockito.when(taskExecutorRegistration.getTaskExecutorGateway()).thenReturn(gateway);
+		Mockito.when(gateway.requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class)))
+			.thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>());
 	}
 
 	/**
@@ -180,9 +184,9 @@ public class SlotManagerTest {
 		assertEquals(1, slotManager.getPendingRequestCount());
 
 		SlotID slotId = SlotID.generate();
-		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
 		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
+		SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus));
+		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport);
 
 		assertEquals(1, slotManager.getAllocatedSlotCount());
 		assertEquals(0, slotManager.getFreeSlotCount());
@@ -198,9 +202,9 @@ public class SlotManagerTest {
 		TestingSlotManager slotManager = new TestingSlotManager();
 
 		SlotID slotId = SlotID.generate();
-		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
 		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
+		SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus));
+		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport);
 
 		assertEquals(0, slotManager.getAllocatedSlotCount());
 		assertEquals(1, slotManager.getFreeSlotCount());
@@ -216,9 +220,9 @@ public class SlotManagerTest {
 		assertEquals(1, slotManager.getPendingRequestCount());
 
 		SlotID slotId = SlotID.generate();
-		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
 		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
+		SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus));
+		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport);
 
 		assertEquals(0, slotManager.getAllocatedSlotCount());
 		assertEquals(1, slotManager.getFreeSlotCount());
@@ -234,9 +238,9 @@ public class SlotManagerTest {
 		TestingSlotManager slotManager = new TestingSlotManager();
 
 		SlotID slotId = SlotID.generate();
-		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
 		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
-		slotManager.updateSlotStatus(slotStatus);
+		SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus));
+		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport);
 
 		assertEquals(1, slotManager.getAllocatedSlotCount());
 		assertEquals(0, slotManager.getFreeSlotCount());
@@ -244,48 +248,44 @@ public class SlotManagerTest {
 	}
 
 	/**
-	 * Tests that we had a slot in-use, and it's confirmed by SlotReport
+	 * Tests that we had a slot in-use and is freed again subsequently.
 	 */
 	@Test
 	public void testExistingInUseSlotUpdateStatus() {
 		TestingSlotManager slotManager = new TestingSlotManager();
-		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request);
 
-		// make this slot in use
 		SlotID slotId = SlotID.generate();
-		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
+		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
+		SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus));
+		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport);
 
 		assertEquals(1, slotManager.getAllocatedSlotCount());
 		assertEquals(0, slotManager.getFreeSlotCount());
 		assertTrue(slotManager.isAllocated(slotId));
 
-		// slot status is confirmed
-		SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE,
-			request.getJobId(), request.getAllocationId());
-		slotManager.updateSlotStatus(slotStatus2);
+		// slot is freed again
+		slotManager.notifySlotAvailable(slotId.getResourceID(), slotId);
 
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertTrue(slotManager.isAllocated(slotId));
+		assertEquals(0, slotManager.getAllocatedSlotCount());
+		assertEquals(1, slotManager.getFreeSlotCount());
+		assertFalse(slotManager.isAllocated(slotId));
 	}
 
 	/**
-	 * Tests that we had a slot in-use, but it's empty according to the SlotReport
+	 * Tests multiple slot requests with one slots.
 	 */
 	@Test
-	public void testExistingInUseSlotAdjustedToEmpty() {
+	public void testMultipleSlotRequestsWithOneSlot() {
 		TestingSlotManager slotManager = new TestingSlotManager();
-		SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		final AllocationID allocationID = new AllocationID();
+
+		SlotRequest request1 = new SlotRequest(new JobID(), allocationID, DEFAULT_TESTING_PROFILE);
 		slotManager.requestSlot(request1);
 
-		// make this slot in use
-		SlotID slotId = SlotID.generate();
-		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
+		final ResourceID resourceID = ResourceID.generate();
+		final SlotStatus slotStatus = new SlotStatus(new SlotID(resourceID, 0), DEFAULT_TESTING_PROFILE);
+		final SlotReport slotReport = new SlotReport(slotStatus);
+		slotManager.registerTaskExecutor(resourceID, taskExecutorRegistration, slotReport);
 
 		// another request pending
 		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
@@ -294,66 +294,20 @@ public class SlotManagerTest {
 		assertEquals(1, slotManager.getAllocatedSlotCount());
 		assertEquals(0, slotManager.getFreeSlotCount());
 		assertEquals(1, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotId));
+		assertTrue(slotManager.isAllocated(allocationID));
 		assertTrue(slotManager.isAllocated(request1.getAllocationId()));
 
-
-		// but slot is reported empty again, request2 will be fulfilled, request1 will be missing
-		slotManager.updateSlotStatus(slotStatus);
+		// but slot is reported empty in a report in the meantime which shouldn't affect the state
+		slotManager.notifySlotAvailable(resourceID, slotStatus.getSlotID());
 
 		assertEquals(1, slotManager.getAllocatedSlotCount());
 		assertEquals(0, slotManager.getFreeSlotCount());
 		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotId));
+		assertTrue(slotManager.isAllocated(slotStatus.getSlotID()));
 		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-	}
-
-	/**
-	 * Tests that we had a slot in use, and it's also reported in use by TaskManager, but the allocation
-	 * information didn't match.
-	 */
-	@Test
-	public void testExistingInUseSlotWithDifferentAllocationInfo() {
-		TestingSlotManager slotManager = new TestingSlotManager();
-		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request);
-
-		// make this slot in use
-		SlotID slotId = SlotID.generate();
-		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway);
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotId));
-		assertTrue(slotManager.isAllocated(request.getAllocationId()));
-
-		SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
-		// update slot status with different allocation info
-		slotManager.updateSlotStatus(slotStatus2);
-
-		// original request is missing and won't be allocated
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotId));
-		assertFalse(slotManager.isAllocated(request.getAllocationId()));
-		assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
-	}
-
-	/**
-	 * Tests that we had a free slot, and it's confirmed by SlotReport
-	 */
-	@Test
-	public void testExistingEmptySlotUpdateStatus() {
-		TestingSlotManager slotManager = new TestingSlotManager();
-		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
-		slotManager.addFreeSlot(slot);
 
-		SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE);
-		slotManager.updateSlotStatus(slotStatus);
+		// but slot is reported empty in a report in the meantime which shouldn't affect the state
+		slotManager.notifySlotAvailable(resourceID, slotStatus.getSlotID());
 
 		assertEquals(0, slotManager.getAllocatedSlotCount());
 		assertEquals(1, slotManager.getFreeSlotCount());
@@ -361,34 +315,12 @@ public class SlotManagerTest {
 	}
 
 	/**
-	 * Tests that we had a free slot, and it's reported in-use by TaskManager
-	 */
-	@Test
-	public void testExistingEmptySlotAdjustedToInUse() {
-		TestingSlotManager slotManager = new TestingSlotManager();
-		final SlotID slotID = SlotID.generate();
-		slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorGateway);
-
-		ResourceSlot slot = new ResourceSlot(slotID, DEFAULT_TESTING_PROFILE, taskExecutorGateway);
-		slotManager.addFreeSlot(slot);
-
-		SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE,
-			new JobID(), new AllocationID());
-		slotManager.updateSlotStatus(slotStatus);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slot.getSlotId()));
-	}
-
-	/**
 	 * Tests that we did some allocation but failed / rejected by TaskManager, request will retry
 	 */
 	@Test
 	public void testSlotAllocationFailedAtTaskManager() {
 		TestingSlotManager slotManager = new TestingSlotManager();
-		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
 		slotManager.addFreeSlot(slot);
 
 		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
@@ -409,24 +341,31 @@ public class SlotManagerTest {
 
 	/**
 	 * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request
+	 * This can only occur after reconnect of the TaskExecutor.
 	 */
 	@Test
 	public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
 		TestingSlotManager slotManager = new TestingSlotManager();
 		final SlotID slotID = SlotID.generate();
-		slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorGateway);
-
-		ResourceSlot slot = new ResourceSlot(slotID, DEFAULT_TESTING_PROFILE, taskExecutorGateway);
-		slotManager.addFreeSlot(slot);
+		SlotStatus slot = new SlotStatus(slotID, DEFAULT_TESTING_PROFILE);
+		SlotReport slotReport = new SlotReport(slot);
+		slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorRegistration, slotReport);
 
 		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
 		slotManager.requestSlot(request);
 
-		// slot is set empty by heartbeat
-		SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), slot.getResourceProfile());
-		slotManager.updateSlotStatus(slotStatus);
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+
+		// slot is set empty by a reconnect of the TaskExecutor
+		slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorRegistration, slotReport);
+
+		assertEquals(0, slotManager.getAllocatedSlotCount());
+		assertEquals(1, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
 
-		// another request took this slot
+		// another request takes the slot
 		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
 		slotManager.requestSlot(request2);
 
@@ -436,12 +375,12 @@ public class SlotManagerTest {
 		assertFalse(slotManager.isAllocated(request.getAllocationId()));
 		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
 
-		// original request should be pended
-		slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
+		// original request should be retried
+		slotManager.handleSlotRequestFailedAtTaskManager(request, slotID);
 
 		assertEquals(1, slotManager.getAllocatedSlotCount());
 		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
 		assertFalse(slotManager.isAllocated(request.getAllocationId()));
 		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
 	}
@@ -453,10 +392,10 @@ public class SlotManagerTest {
 		ResourceID resource1 = ResourceID.generate();
 		ResourceID resource2 = ResourceID.generate();
 
-		ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
-		ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
-		ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
-		ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+		ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
+		ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
+		ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
+		ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
 
 		slotManager.addFreeSlot(slot11);
 		slotManager.addFreeSlot(slot21);
@@ -499,7 +438,7 @@ public class SlotManagerTest {
 		final int freeSlotNum)
 	{
 		for (int i = 0; i < freeSlotNum; ++i) {
-			slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile), taskExecutorGateway));
+			slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile), taskExecutorRegistration));
 		}
 	}
 
@@ -507,13 +446,13 @@ public class SlotManagerTest {
 	//  testing classes
 	// ------------------------------------------------------------------------
 
-	private static class TestingSlotManager extends SlotManager implements ResourceManagerServices {
+	private static class TestingSlotManager extends SlotManager {
 
-		private final List<ResourceProfile> allocatedContainers;
+		private static TestingRmServices testingRmServices = new TestingRmServices();
 
 		TestingSlotManager() {
-			this.allocatedContainers = new LinkedList<>();
-			setupResourceManagerServices(this);
+			super(testingRmServices);
+			testingRmServices.allocatedContainers.clear();
 		}
 
 		/**
@@ -552,24 +491,34 @@ public class SlotManagerTest {
 			return null;
 		}
 
-		@Override
-		public void allocateResource(ResourceProfile resourceProfile) {
-			allocatedContainers.add(resourceProfile);
+		List<ResourceProfile> getAllocatedContainers() {
+			return testingRmServices.allocatedContainers;
 		}
 
-		@Override
-		public Executor getAsyncExecutor() {
-			return Mockito.mock(Executor.class);
-		}
 
-		@Override
-		public Executor getExecutor() {
-			return Mockito.mock(Executor.class);
-		}
+		private static class TestingRmServices implements ResourceManagerServices {
 
-		List<ResourceProfile> getAllocatedContainers() {
-			return allocatedContainers;
-		}
+			private List<ResourceProfile> allocatedContainers;
+
+			public TestingRmServices() {
+				this.allocatedContainers = new LinkedList<>();
+			}
+
+			@Override
+			public void allocateResource(ResourceProfile resourceProfile) {
+				allocatedContainers.add(resourceProfile);
+			}
 
+			@Override
+			public Executor getAsyncExecutor() {
+				return Mockito.mock(Executor.class);
+			}
+
+			@Override
+			public Executor getMainThreadExecutor() {
+				return Mockito.mock(Executor.class);
+			}
+
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8adceede/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index a87fe42..24d959e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -25,12 +25,20 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.resourcemanager.*;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
+import org.apache.flink.runtime.resourcemanager.TestingSlotManager;
+import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
@@ -94,9 +102,9 @@ public class SlotProtocolTest extends TestLogger {
 		TestingLeaderElectionService rmLeaderElectionService =
 			configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
 
-		SlotManager slotManager = Mockito.spy(new SimpleSlotManager());
-		ResourceManager resourceManager =
-			Mockito.spy(new StandaloneResourceManager(testRpcService, testingHaServices, slotManager));
+		final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
+		SpiedResourceManager resourceManager =
+			new SpiedResourceManager(testRpcService, testingHaServices, slotManagerFactory);
 		resourceManager.start();
 		rmLeaderElectionService.isLeader(rmLeaderID);
 
@@ -108,11 +116,13 @@ public class SlotProtocolTest extends TestLogger {
 			Assert.fail("JobManager registration Future didn't become ready.");
 		}
 
+		final SlotManager slotManager = slotManagerFactory.slotManager;
+
 		final AllocationID allocationID = new AllocationID();
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
 
 		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
-		SlotRequestReply slotRequestReply =
+		RMSlotRequestReply slotRequestReply =
 			resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest);
 
 		// 1) SlotRequest is routed to the SlotManager
@@ -124,15 +134,18 @@ public class SlotProtocolTest extends TestLogger {
 			allocationID);
 
 		// 3) SlotRequest leads to a container allocation
-		verify(resourceManager, timeout(5000)).startNewWorker(resourceProfile);
+		Assert.assertEquals(1, resourceManager.startNewWorkerCalled);
 
 		Assert.assertFalse(slotManager.isAllocated(allocationID));
 
 		// slot becomes available
 		final String tmAddress = "/tm1";
 		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class)))
-			.thenReturn(new FlinkCompletableFuture<SlotRequestReply>());
+		Mockito
+			.when(
+				taskExecutorGateway
+					.requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class)))
+			.thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>());
 		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
 		final ResourceID resourceID = ResourceID.generate();
@@ -141,13 +154,14 @@ public class SlotProtocolTest extends TestLogger {
 		final SlotStatus slotStatus =
 			new SlotStatus(slotID, resourceProfile);
 		final SlotReport slotReport =
-			new SlotReport(Collections.singletonList(slotStatus), resourceID);
+			new SlotReport(Collections.singletonList(slotStatus));
 		// register slot at SlotManager
-		slotManager.registerTaskExecutor(resourceID, taskExecutorGateway);
-		slotManager.updateSlotStatus(slotReport);
+		slotManager.registerTaskExecutor(
+			resourceID, new TaskExecutorRegistration(taskExecutorGateway), slotReport);
 
 		// 4) Slot becomes available and TaskExecutor gets a SlotRequest
-		verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class));
+		verify(taskExecutorGateway, timeout(5000))
+			.requestSlot(eq(slotID), eq(allocationID), any(UUID.class), any(Time.class));
 	}
 
 	/**
@@ -173,13 +187,15 @@ public class SlotProtocolTest extends TestLogger {
 			configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
 
 		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class)))
-			.thenReturn(new FlinkCompletableFuture<SlotRequestReply>());
+		Mockito.when(
+			taskExecutorGateway
+				.requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class)))
+			.thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>());
 		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
-		SlotManager slotManager = Mockito.spy(new SimpleSlotManager());
-		ResourceManager resourceManager =
-			Mockito.spy(new StandaloneResourceManager(testRpcService, testingHaServices, slotManager));
+		TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
+		TestingResourceManager resourceManager =
+			Mockito.spy(new TestingResourceManager(testRpcService, testingHaServices, slotManagerFactory));
 		resourceManager.start();
 		rmLeaderElectionService.isLeader(rmLeaderID);
 
@@ -191,6 +207,8 @@ public class SlotProtocolTest extends TestLogger {
 			Assert.fail("JobManager registration Future didn't become ready.");
 		}
 
+		final SlotManager slotManager = slotManagerFactory.slotManager;
+
 		final ResourceID resourceID = ResourceID.generate();
 		final AllocationID allocationID = new AllocationID();
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
@@ -199,13 +217,13 @@ public class SlotProtocolTest extends TestLogger {
 		final SlotStatus slotStatus =
 			new SlotStatus(slotID, resourceProfile);
 		final SlotReport slotReport =
-			new SlotReport(Collections.singletonList(slotStatus), resourceID);
+			new SlotReport(Collections.singletonList(slotStatus));
 		// register slot at SlotManager
-		slotManager.registerTaskExecutor(resourceID, taskExecutorGateway);
-		slotManager.updateSlotStatus(slotReport);
+		slotManager.registerTaskExecutor(
+			resourceID, new TaskExecutorRegistration(taskExecutorGateway), slotReport);
 
 		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
-		SlotRequestReply slotRequestReply =
+		RMSlotRequestReply slotRequestReply =
 			resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest);
 
 		// 1) a SlotRequest is routed to the SlotManager
@@ -220,9 +238,9 @@ public class SlotProtocolTest extends TestLogger {
 		Assert.assertTrue(slotManager.isAllocated(slotID));
 		Assert.assertTrue(slotManager.isAllocated(allocationID));
 
-
 		// 4) a SlotRequest is routed to the TaskExecutor
-		verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class));
+		verify(taskExecutorGateway, timeout(5000))
+			.requestSlot(eq(slotID), eq(allocationID), any(UUID.class), any(Time.class));
 	}
 
 	private static TestingLeaderElectionService configureHA(
@@ -240,4 +258,32 @@ public class SlotProtocolTest extends TestLogger {
 		return rmLeaderElectionService;
 	}
 
+	private static class SpiedResourceManager extends TestingResourceManager {
+
+		private int startNewWorkerCalled = 0;
+
+		public SpiedResourceManager(
+				RpcService rpcService,
+				HighAvailabilityServices highAvailabilityServices,
+				SlotManagerFactory slotManagerFactory) {
+			super(rpcService, highAvailabilityServices, slotManagerFactory);
+		}
+
+
+		@Override
+		public void startNewWorker(ResourceProfile resourceProfile) {
+			startNewWorkerCalled++;
+		}
+	}
+
+	private static class TestingSlotManagerFactory implements SlotManagerFactory {
+
+		private SlotManager slotManager;
+
+		@Override
+		public SlotManager create(ResourceManagerServices rmServices) {
+			this.slotManager = Mockito.spy(new TestingSlotManager(rmServices));
+			return this.slotManager;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8adceede/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 9c1f288..7710fa9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.highavailability.NonHaServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -28,11 +30,15 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
 import org.junit.Test;
 
 import org.powermock.api.mockito.PowerMockito;
@@ -76,7 +82,7 @@ public class TaskExecutorTest extends TestLogger {
 			String taskManagerAddress = taskManager.getAddress();
 
 			verify(rmGateway).registerTaskExecutor(
-					any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(Time.class));
+					any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
 		}
 		finally {
 			rpc.stopService();
@@ -132,7 +138,7 @@ public class TaskExecutorTest extends TestLogger {
 			testLeaderService.notifyListener(address1, leaderId1);
 
 			verify(rmGateway1).registerTaskExecutor(
-					eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(Time.class));
+					eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
 
 			// cancel the leader 
@@ -142,11 +148,95 @@ public class TaskExecutorTest extends TestLogger {
 			testLeaderService.notifyListener(address2, leaderId2);
 
 			verify(rmGateway2).registerTaskExecutor(
-					eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(Time.class));
+					eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
 		}
 		finally {
 			rpc.stopService();
 		}
 	}
+
+	/**
+	 * Tests that all allocation requests for slots are ignored if the slot has been reported as
+	 * free by the TaskExecutor but this report hasn't been confirmed by the ResourceManager.
+	 *
+	 * This is essential for the correctness of the state of the ResourceManager.
+	 */
+	@Test
+	public void testRejectAllocationRequestsForOutOfSyncSlots() {
+		final ResourceID resourceID = ResourceID.generate();
+
+		final String address1 = "/resource/manager/address/one";
+		final UUID leaderId = UUID.randomUUID();
+
+		final TestingSerialRpcService rpc = new TestingSerialRpcService();
+		try {
+			// register the mock resource manager gateways
+			ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
+			rpc.registerGateway(address1, rmGateway1);
+
+			TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService();
+
+			TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+			haServices.setResourceManagerLeaderRetriever(testLeaderService);
+
+			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
+			PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+
+			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
+			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+
+			TaskExecutor taskManager = new TaskExecutor(
+				taskManagerServicesConfiguration,
+				taskManagerLocation,
+				rpc,
+				mock(MemoryManager.class),
+				mock(IOManager.class),
+				mock(NetworkEnvironment.class),
+				haServices,
+				mock(MetricRegistry.class),
+				mock(FatalErrorHandler.class));
+
+			taskManager.start();
+			String taskManagerAddress = taskManager.getAddress();
+
+			// no connection initially, since there is no leader
+			assertNull(taskManager.getResourceManagerConnection());
+
+			// define a leader and see that a registration happens
+			testLeaderService.notifyListener(address1, leaderId);
+
+			verify(rmGateway1).registerTaskExecutor(
+				eq(leaderId), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
+			assertNotNull(taskManager.getResourceManagerConnection());
+
+			// test that allocating a slot works
+			final SlotID slotID = new SlotID(resourceID, 0);
+			TMSlotRequestReply tmSlotRequestReply = taskManager.requestSlot(slotID, new AllocationID(), leaderId);
+			assertTrue(tmSlotRequestReply instanceof TMSlotRequestRegistered);
+
+			// test that we can't allocate slots which are blacklisted due to pending confirmation of the RM
+			final SlotID unconfirmedFreeSlotID = new SlotID(resourceID, 1);
+			taskManager.addUnconfirmedFreeSlotNotification(unconfirmedFreeSlotID);
+			TMSlotRequestReply tmSlotRequestReply2 =
+				taskManager.requestSlot(unconfirmedFreeSlotID, new AllocationID(), leaderId);
+			assertTrue(tmSlotRequestReply2 instanceof TMSlotRequestRejected);
+
+			// re-register
+			verify(rmGateway1).registerTaskExecutor(
+				eq(leaderId), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
+			testLeaderService.notifyListener(address1, leaderId);
+
+			// now we should be successful because the slots status has been synced
+			// test that we can't allocate slots which are blacklisted due to pending confirmation of the RM
+			TMSlotRequestReply tmSlotRequestReply3 =
+				taskManager.requestSlot(unconfirmedFreeSlotID, new AllocationID(), leaderId);
+			assertTrue(tmSlotRequestReply3 instanceof TMSlotRequestRegistered);
+
+		}
+		finally {
+			rpc.stopService();
+		}
+
+	}
 }