You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:46:14 UTC

[32/50] [abbrv] flink git commit: [FLINK-4606] integrate features of old ResourceManager

[FLINK-4606] integrate features of old ResourceManager

This closes #2540


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f198d8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f198d8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f198d8c

Branch: refs/heads/flip-6
Commit: 1f198d8ca56ec6719d112cdc7180aeef6d18477a
Parents: 6e58ebf
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Sep 27 10:38:02 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:42 2016 +0200

----------------------------------------------------------------------
 .../InfoMessageListenerRpcGateway.java          |   1 -
 .../resourcemanager/ResourceManager.java        | 146 ++++++++++++-------
 .../resourcemanager/ResourceManagerGateway.java |   6 +-
 .../ResourceManagerServices.java                |  44 ++++++
 .../StandaloneResourceManager.java              |  19 ++-
 .../TaskExecutorRegistration.java               |  51 -------
 .../registration/TaskExecutorRegistration.java  |  51 +++++++
 .../slotmanager/SimpleSlotManager.java          |   6 -
 .../slotmanager/SlotManager.java                |  63 ++++++--
 .../slotmanager/SlotManagerTest.java            |  25 +++-
 .../slotmanager/SlotProtocolTest.java           |  42 +++---
 11 files changed, 295 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
index c1eeefa..d1373ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
-import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.RpcGateway;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 83dc4db..190a4de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -20,14 +20,18 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -48,11 +52,10 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -64,36 +67,43 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * It offers the following methods as part of its rpc interface to interact with the him remotely:
  * <ul>
  *     <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
- *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
+ *     <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
-public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint implements LeaderContender {
+public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
+		extends RpcEndpoint<ResourceManagerGateway>
+		implements LeaderContender {
 
 	/** The exit code with which the process is stopped in case of a fatal error */
 	protected static final int EXIT_CODE_FATAL_ERROR = -13;
 
 	private final Map<JobID, JobMasterGateway> jobMasterGateways;
 
-	private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners;
+	private final Map<JobID, JobMasterLeaderListener> jobMasterLeaderRetrievalListeners;
 
 	private final Map<ResourceID, WorkerType> taskExecutorGateways;
 
 	private final HighAvailabilityServices highAvailabilityServices;
 
-	private LeaderElectionService leaderElectionService;
-
 	private final SlotManager slotManager;
 
+	private LeaderElectionService leaderElectionService;
+
 	private UUID leaderSessionID;
 
 	private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
 
-	public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices, SlotManager slotManager) {
+	private final Time timeout = Time.seconds(5);
+
+	public ResourceManager(
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			SlotManager slotManager) {
 		super(rpcService);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
 		this.jobMasterGateways = new HashMap<>();
 		this.slotManager = checkNotNull(slotManager);
-		this.jobMasterLeaderRetrievalListeners = new HashSet<>();
+		this.jobMasterLeaderRetrievalListeners = new HashMap<>();
 		this.taskExecutorGateways = new HashMap<>();
 		infoMessageListeners = new HashMap<>();
 	}
@@ -105,6 +115,7 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 			super.start();
 			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
 			leaderElectionService.start(this);
+			slotManager.setupResourceManagerServices(new DefaultResourceManagerServices());
 			// framework specific initialization
 			initialize();
 		} catch (Throwable e) {
@@ -117,7 +128,7 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 	public void shutDown() {
 		try {
 			leaderElectionService.stop();
-			for(JobID jobID : jobMasterGateways.keySet()) {
+			for (JobID jobID : jobMasterGateways.keySet()) {
 				highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
 			}
 			super.shutDown();
@@ -189,15 +200,17 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 					if (throwable != null) {
 						return new RegistrationResponse.Decline(throwable.getMessage());
 					} else {
-						JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
-						try {
-							LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
-							jobMasterLeaderRetriever.start(jobMasterLeaderListener);
-						} catch (Exception e) {
-							log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
-							return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
+						if (!jobMasterLeaderRetrievalListeners.containsKey(jobID)) {
+							JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
+							try {
+								LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
+								jobMasterLeaderRetriever.start(jobMasterLeaderListener);
+							} catch (Exception e) {
+								log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+								return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
+							}
+							jobMasterLeaderRetrievalListeners.put(jobID, jobMasterLeaderListener);
 						}
-						jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
 						final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
 						if (existingGateway != null) {
 							log.info("Replacing gateway for registered JobID {}.", jobID);
@@ -232,7 +245,6 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 						resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
 					throw new Exception("Invalid leader session id");
 				}
-
 				return getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class).get(5, TimeUnit.SECONDS);
 			}
 		}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
@@ -241,24 +253,14 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 				if (throwable != null) {
 					return new RegistrationResponse.Decline(throwable.getMessage());
 				} else {
-					WorkerType startedWorker = taskExecutorGateways.get(resourceID);
-					if(startedWorker != null) {
-						String oldWorkerAddress = startedWorker.getTaskExecutorGateway().getAddress();
-						if (taskExecutorAddress.equals(oldWorkerAddress)) {
-							log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
-						} else {
-							log.warn("Receive a duplicate registration from TaskExecutor {} at different address, previous ({}), new ({})",
-								resourceID, oldWorkerAddress, taskExecutorAddress);
-							// TODO :: suggest old taskExecutor to stop itself
-							slotManager.notifyTaskManagerFailure(resourceID);
-							startedWorker = workerStarted(resourceID, taskExecutorGateway);
-							taskExecutorGateways.put(resourceID, startedWorker);
-						}
-					} else {
-						startedWorker = workerStarted(resourceID, taskExecutorGateway);
-						taskExecutorGateways.put(resourceID, startedWorker);
+					WorkerType oldWorker = taskExecutorGateways.remove(resourceID);
+					if (oldWorker != null) {
+						// TODO :: suggest old taskExecutor to stop itself
+						slotManager.notifyTaskManagerFailure(resourceID);
 					}
-					return new TaskExecutorRegistrationSuccess(startedWorker.getInstanceID(), 5000);
+					WorkerType newWorker = workerStarted(resourceID);
+					taskExecutorGateways.put(resourceID, newWorker);
+					return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
 				}
 			}
 		}, getMainThreadExecutor());
@@ -271,11 +273,20 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 	 * @return Slot assignment
 	 */
 	@RpcMethod
-	public SlotRequestReply requestSlot(SlotRequest slotRequest) {
-		final JobID jobId = slotRequest.getJobId();
-		final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
+	public SlotRequestReply requestSlot(
+			UUID jobMasterLeaderID,
+			UUID resourceManagerLeaderID,
+			SlotRequest slotRequest) {
+
+		JobID jobId = slotRequest.getJobId();
+		JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
+		JobMasterLeaderListener jobMasterLeaderListener = jobMasterLeaderRetrievalListeners.get(jobId);
+
+		UUID leaderID = jobMasterLeaderListener.getLeaderID();
 
-		if (jobMasterGateway != null) {
+		if (jobMasterGateway != null
+				&& jobMasterLeaderID.equals(leaderID)
+				&& resourceManagerLeaderID.equals(leaderSessionID)) {
 			return slotManager.requestSlot(slotRequest);
 		} else {
 			log.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
@@ -379,7 +390,7 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 	}
 
 	/**
-	 * Shutdowns cluster
+	 * Cleanup application and shut down cluster
 	 *
 	 * @param finalStatus
 	 * @param optionalDiagnostics
@@ -446,17 +457,11 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 	protected abstract void initialize() throws Exception;
 
 	/**
-	 * Callback when a task executor register.
+	 * Notifies the resource master of a fatal error.
 	 *
-	 * @param resourceID The worker resource id
-	 * @param taskExecutorGateway the task executor gateway
-	 */
-	protected abstract WorkerType workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway);
-
-	/**
-	 * Callback when a resource manager faced a fatal error
-	 * @param message
-	 * @param error
+	 * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, but exit it in
+	 * such a way that a high-availability setting would restart this or fail over
+	 * to another master.
 	 */
 	protected abstract void fatalError(String message, Throwable error);
 
@@ -472,6 +477,19 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 	 */
 	protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics);
 
+	/**
+	 * Allocates a resource using the resource profile.
+	 * @param resourceProfile The resource description
+	 */
+	@VisibleForTesting
+	public abstract void startNewWorker(ResourceProfile resourceProfile);
+
+	/**
+	 * Callback when a worker was started.
+	 * @param resourceID The worker resource id
+	 */
+	protected abstract WorkerType workerStarted(ResourceID resourceID);
+
 	// ------------------------------------------------------------------------
 	//  Info messaging
 	// ------------------------------------------------------------------------
@@ -489,6 +507,24 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 		});
 	}
 
+	private class DefaultResourceManagerServices implements ResourceManagerServices {
+
+		@Override
+		public void allocateResource(ResourceProfile resourceProfile) {
+			ResourceManager.this.startNewWorker(resourceProfile);
+		}
+
+		@Override
+		public Executor getAsyncExecutor() {
+			return ResourceManager.this.getRpcService().getExecutor();
+		}
+
+		@Override
+		public Executor getExecutor() {
+			return ResourceManager.this.getMainThreadExecutor();
+		}
+	}
+
 	private static class JobMasterLeaderListener implements LeaderRetrievalListener {
 
 		private final JobID jobID;
@@ -498,6 +534,14 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 			this.jobID = jobID;
 		}
 
+		public JobID getJobID() {
+			return jobID;
+		}
+
+		public UUID getLeaderID() {
+			return leaderID;
+		}
+
 		@Override
 		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
 			this.leaderID = leaderSessionID;

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 7c44006..87303a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -59,7 +59,11 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param slotRequest Slot request
 	 * @return Future slot assignment
 	 */
-	Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
+	Future<SlotRequestReply> requestSlot(
+		UUID jobMasterLeaderID,
+		UUID resourceManagerLeaderID,
+		SlotRequest slotRequest,
+		@RpcTimeout Time timeout);
 
 	/**
 	 * Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
new file mode 100644
index 0000000..30994dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Interface which provides access to services of the ResourceManager.
+ */
+public interface ResourceManagerServices {
+
+	/**
+	 * Allocates a resource according to the resource profile.
+	 */
+	void allocateResource(ResourceProfile resourceProfile);
+
+	/**
+	 * Gets the async excutor which executes outside of the main thread of the ResourceManager
+	 */
+	Executor getAsyncExecutor();
+
+	/**
+	 * Gets the executor which executes in the main thread of the ResourceManager
+	 */
+	Executor getExecutor();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 84db1ee..deca8d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -20,17 +20,18 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 
 /**
  * A standalone implementation of the resource manager. Used when the system is started in
  * standalone mode (via scripts), rather than via a resource framework like YARN or Mesos.
+ *
+ * This ResourceManager doesn't acquire new resources.
  */
-public class StandaloneResourceManager extends ResourceManager<ResourceManagerGateway, TaskExecutorRegistration> {
+public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
 	public StandaloneResourceManager(RpcService rpcService,
 		HighAvailabilityServices highAvailabilityServices,
@@ -51,14 +52,16 @@ public class StandaloneResourceManager extends ResourceManager<ResourceManagerGa
 	}
 
 	@Override
-	protected TaskExecutorRegistration workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
-		InstanceID instanceID = new InstanceID();
-		TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(taskExecutorGateway, instanceID);
-		return taskExecutorRegistration;
+	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
 	}
 
 	@Override
-	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+	public void startNewWorker(ResourceProfile resourceProfile) {
+	}
 
+	@Override
+	protected ResourceID workerStarted(ResourceID resourceID) {
+		return resourceID;
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
deleted file mode 100644
index f8dfdc7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-
-import java.io.Serializable;
-
-/**
- * This class is responsible for group the TaskExecutorGateway and the InstanceID of a registered task executor.
- */
-public class TaskExecutorRegistration implements Serializable {
-
-	private static final long serialVersionUID = -2062957799469434614L;
-
-	private TaskExecutorGateway taskExecutorGateway;
-
-	private InstanceID instanceID;
-
-	public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway,
-									InstanceID instanceID) {
-		this.taskExecutorGateway = taskExecutorGateway;
-		this.instanceID = instanceID;
-	}
-
-	public InstanceID getInstanceID() {
-		return instanceID;
-	}
-
-	public TaskExecutorGateway getTaskExecutorGateway() {
-		return taskExecutorGateway;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
new file mode 100644
index 0000000..6b21f5c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.registration;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+import java.io.Serializable;
+
+/**
+ * This class is responsible for group the TaskExecutorGateway and the InstanceID of a registered task executor.
+ */
+public class TaskExecutorRegistration implements Serializable {
+
+	private static final long serialVersionUID = -2062957799469434614L;
+
+	private TaskExecutorGateway taskExecutorGateway;
+
+	private InstanceID instanceID;
+
+	public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway,
+									InstanceID instanceID) {
+		this.taskExecutorGateway = taskExecutorGateway;
+		this.instanceID = instanceID;
+	}
+
+	public InstanceID getInstanceID() {
+		return instanceID;
+	}
+
+	public TaskExecutorGateway getTaskExecutorGateway() {
+		return taskExecutorGateway;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
index ef5ce31..ae1de5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
@@ -18,7 +18,6 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -51,9 +50,4 @@ public class SimpleSlotManager extends SlotManager {
 		}
 	}
 
-	@Override
-	protected void allocateContainer(ResourceProfile resourceProfile) {
-		// TODO
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index a6d2196..a56b2f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -22,16 +22,18 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,6 +84,9 @@ public abstract class SlotManager {
 	/** The current leader id set by the ResourceManager */
 	private UUID leaderID;
 
+	/** The Resource allocation provider */
+	private ResourceManagerServices resourceManagerServices;
+
 	public SlotManager() {
 		this.registeredSlots = new HashMap<>(16);
 		this.pendingSlotRequests = new LinkedHashMap<>(16);
@@ -91,6 +96,16 @@ public abstract class SlotManager {
 		this.timeout = Time.seconds(10);
 	}
 
+	/**
+	 * Initializes the resource supplier which is needed to request new resources.
+	 */
+	public void setupResourceManagerServices(ResourceManagerServices resourceManagerServices) {
+		if (this.resourceManagerServices != null) {
+			throw new IllegalStateException("ResourceManagerServices may only be set once.");
+		}
+		this.resourceManagerServices = resourceManagerServices;
+	}
+
 
 	// ------------------------------------------------------------------------
 	//  slot managements
@@ -120,17 +135,32 @@ public abstract class SlotManager {
 
 			// record this allocation in bookkeeping
 			allocationMap.addAllocation(slot.getSlotId(), allocationId);
-
 			// remove selected slot from free pool
-			freeSlots.remove(slot.getSlotId());
+			final ResourceSlot removedSlot = freeSlots.remove(slot.getSlotId());
 
 			final Future<SlotRequestReply> slotRequestReplyFuture =
 				slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
-			// TODO handle timeouts and response
+
+			slotRequestReplyFuture.handleAsync(new BiFunction<SlotRequestReply, Throwable, Object>() {
+				@Override
+				public Object apply(SlotRequestReply slotRequestReply, Throwable throwable) {
+					if (throwable != null) {
+						// we failed, put the slot and the request back again
+						if (allocationMap.isAllocated(slot.getSlotId())) {
+							// only re-add if the slot hasn't been removed in the meantime
+							freeSlots.put(slot.getSlotId(), removedSlot);
+						}
+						pendingSlotRequests.put(allocationId, request);
+					}
+					return null;
+				}
+			}, resourceManagerServices.getExecutor());
 		} else {
 			LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
 				"AllocationID:{}, JobID:{}", allocationId, request.getJobId());
-			allocateContainer(request.getResourceProfile());
+			Preconditions.checkState(resourceManagerServices != null,
+				"Attempted to allocate resources but no ResourceManagerServices set.");
+			resourceManagerServices.allocateResource(request.getResourceProfile());
 			pendingSlotRequests.put(allocationId, request);
 		}
 
@@ -343,7 +373,7 @@ public abstract class SlotManager {
 
 		if (chosenRequest != null) {
 			final AllocationID allocationId = chosenRequest.getAllocationId();
-			pendingSlotRequests.remove(allocationId);
+			final SlotRequest removedSlotRequest = pendingSlotRequests.remove(allocationId);
 
 			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(),
 				allocationId, chosenRequest.getJobId());
@@ -351,7 +381,19 @@ public abstract class SlotManager {
 
 			final Future<SlotRequestReply> slotRequestReplyFuture =
 				freeSlot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
-			// TODO handle timeouts and response
+
+			slotRequestReplyFuture.handleAsync(new BiFunction<SlotRequestReply, Throwable, Object>() {
+				@Override
+				public Object apply(SlotRequestReply slotRequestReply, Throwable throwable) {
+					if (throwable != null) {
+						// we failed, add the request back again
+						if (allocationMap.isAllocated(freeSlot.getSlotId())) {
+							pendingSlotRequests.put(allocationId, removedSlotRequest);
+						}
+					}
+					return null;
+				}
+			}, resourceManagerServices.getExecutor());
 		} else {
 			freeSlots.put(freeSlot.getSlotId(), freeSlot);
 		}
@@ -417,13 +459,6 @@ public abstract class SlotManager {
 	protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot,
 		final Map<AllocationID, SlotRequest> pendingRequests);
 
-	/**
-	 * The framework specific code for allocating a container for specified resource profile.
-	 *
-	 * @param resourceProfile The resource profile
-	 */
-	protected abstract void allocateContainer(final ResourceProfile resourceProfile);
-
 	// ------------------------------------------------------------------------
 	//  Helper classes
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 9ee9690..0fed79e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -19,12 +19,16 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.junit.BeforeClass;
@@ -34,10 +38,13 @@ import org.mockito.Mockito;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 
 public class SlotManagerTest {
@@ -57,6 +64,8 @@ public class SlotManagerTest {
 	@BeforeClass
 	public static void setUp() {
 		taskExecutorGateway = Mockito.mock(TaskExecutorGateway.class);
+		Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class)))
+			.thenReturn(new FlinkCompletableFuture<SlotRequestReply>());
 	}
 
 	/**
@@ -498,12 +507,13 @@ public class SlotManagerTest {
 	//  testing classes
 	// ------------------------------------------------------------------------
 
-	private static class TestingSlotManager extends SlotManager {
+	private static class TestingSlotManager extends SlotManager implements ResourceManagerServices {
 
 		private final List<ResourceProfile> allocatedContainers;
 
 		TestingSlotManager() {
 			this.allocatedContainers = new LinkedList<>();
+			setupResourceManagerServices(this);
 		}
 
 		/**
@@ -543,12 +553,23 @@ public class SlotManagerTest {
 		}
 
 		@Override
-		protected void allocateContainer(ResourceProfile resourceProfile) {
+		public void allocateResource(ResourceProfile resourceProfile) {
 			allocatedContainers.add(resourceProfile);
 		}
 
+		@Override
+		public Executor getAsyncExecutor() {
+			return Mockito.mock(Executor.class);
+		}
+
+		@Override
+		public Executor getExecutor() {
+			return Mockito.mock(Executor.class);
+		}
+
 		List<ResourceProfile> getAllocatedContainers() {
 			return allocatedContainers;
 		}
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index ff25897..e3018c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -24,18 +24,14 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.*;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
@@ -47,9 +43,12 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
@@ -99,9 +98,9 @@ public class SlotProtocolTest extends TestLogger {
 		TestingLeaderElectionService rmLeaderElectionService =
 			configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
 
-		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
+		SlotManager slotManager = Mockito.spy(new SimpleSlotManager());
 		ResourceManager resourceManager =
-			new StandaloneResourceManager(testRpcService, testingHaServices, slotManager);
+			Mockito.spy(new StandaloneResourceManager(testRpcService, testingHaServices, slotManager));
 		resourceManager.start();
 		rmLeaderElectionService.isLeader(rmLeaderID);
 
@@ -118,7 +117,7 @@ public class SlotProtocolTest extends TestLogger {
 
 		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
 		SlotRequestReply slotRequestReply =
-			resourceManager.requestSlot(slotRequest);
+			resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest);
 
 		// 1) SlotRequest is routed to the SlotManager
 		verify(slotManager).requestSlot(slotRequest);
@@ -129,13 +128,15 @@ public class SlotProtocolTest extends TestLogger {
 			allocationID);
 
 		// 3) SlotRequest leads to a container allocation
-		verify(slotManager, timeout(5000)).allocateContainer(resourceProfile);
+		verify(resourceManager, timeout(5000)).startNewWorker(resourceProfile);
 
 		Assert.assertFalse(slotManager.isAllocated(allocationID));
 
 		// slot becomes available
 		final String tmAddress = "/tm1";
 		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class)))
+			.thenReturn(new FlinkCompletableFuture<SlotRequestReply>());
 		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
 		final ResourceID resourceID = ResourceID.generate();
@@ -176,11 +177,13 @@ public class SlotProtocolTest extends TestLogger {
 			configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
 
 		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class)))
+			.thenReturn(new FlinkCompletableFuture<SlotRequestReply>());
 		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
-		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
+		SlotManager slotManager = Mockito.spy(new SimpleSlotManager());
 		ResourceManager resourceManager =
-			new StandaloneResourceManager(testRpcService, testingHaServices, slotManager);
+			Mockito.spy(new StandaloneResourceManager(testRpcService, testingHaServices, slotManager));
 		resourceManager.start();
 		rmLeaderElectionService.isLeader(rmLeaderID);
 
@@ -207,7 +210,7 @@ public class SlotProtocolTest extends TestLogger {
 
 		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
 		SlotRequestReply slotRequestReply =
-			resourceManager.requestSlot(slotRequest);
+			resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest);
 
 		// 1) a SlotRequest is routed to the SlotManager
 		verify(slotManager).requestSlot(slotRequest);
@@ -241,15 +244,4 @@ public class SlotProtocolTest extends TestLogger {
 		return rmLeaderElectionService;
 	}
 
-	private static class TestingSlotManager extends SimpleSlotManager {
-
-		// change visibility of function to public for testing
-		@Override
-		public void allocateContainer(ResourceProfile resourceProfile) {
-			super.allocateContainer(resourceProfile);
-		}
-
-
-	}
-
 }