You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/09/28 15:15:49 UTC

[1/3] flink git commit: [FLINK-4606] Integrate the new ResourceManager with the existed FlinkResourceManager

Repository: flink
Updated Branches:
  refs/heads/flip-6 5513fe61d -> 387663094


[FLINK-4606]  Integrate the new ResourceManager with the existed FlinkResourceManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69d08ca4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69d08ca4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69d08ca4

Branch: refs/heads/flip-6
Commit: 69d08ca4ed877f55279c8177a6b00b8e406eda97
Parents: 5513fe6
Author: beyond1920 <be...@126.com>
Authored: Fri Sep 9 09:11:24 2016 +0800
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 28 12:30:40 2016 +0200

----------------------------------------------------------------------
 .../InfoMessageListenerRpcGateway.java          |  35 +++
 .../resourcemanager/ResourceManager.java        | 214 ++++++++++++++++---
 .../resourcemanager/ResourceManagerGateway.java |  23 ++
 .../StandaloneResourceManager.java              |  64 ++++++
 .../resourcemanager/ResourceManagerHATest.java  |   2 +-
 .../ResourceManagerJobMasterTest.java           |   2 +-
 .../ResourceManagerTaskExecutorTest.java        |   2 +-
 .../slotmanager/SlotProtocolTest.java           |   5 +-
 8 files changed, 318 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/69d08ca4/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
new file mode 100644
index 0000000..c1eeefa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.RpcGateway;
+
+/**
+ * A gateway to listen for info messages from {@link ResourceManager}
+ */
+public interface InfoMessageListenerRpcGateway extends RpcGateway {
+
+	/**
+	 * Notifies when resource manager need to notify listener about InfoMessage
+	 * @param infoMessage
+	 */
+	void notifyInfoMessage(InfoMessage infoMessage);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/69d08ca4/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 88b8a11..83dc4db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -20,19 +20,22 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -42,8 +45,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.HashMap;
@@ -66,15 +67,16 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
+public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint implements LeaderContender {
 
-	private final Logger LOG = LoggerFactory.getLogger(getClass());
+	/** The exit code with which the process is stopped in case of a fatal error */
+	protected static final int EXIT_CODE_FATAL_ERROR = -13;
 
 	private final Map<JobID, JobMasterGateway> jobMasterGateways;
 
 	private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners;
 
-	private final Map<ResourceID, TaskExecutorRegistration> taskExecutorGateways;
+	private final Map<ResourceID, WorkerType> taskExecutorGateways;
 
 	private final HighAvailabilityServices highAvailabilityServices;
 
@@ -84,16 +86,16 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 
 	private UUID leaderSessionID;
 
-	public ResourceManager(
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			SlotManager slotManager) {
+	private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
+
+	public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices, SlotManager slotManager) {
 		super(rpcService);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
 		this.jobMasterGateways = new HashMap<>();
-		this.slotManager = slotManager;
+		this.slotManager = checkNotNull(slotManager);
 		this.jobMasterLeaderRetrievalListeners = new HashSet<>();
 		this.taskExecutorGateways = new HashMap<>();
+		infoMessageListeners = new HashMap<>();
 	}
 
 	@Override
@@ -103,6 +105,8 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 			super.start();
 			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
 			leaderElectionService.start(this);
+			// framework specific initialization
+			initialize();
 		} catch (Throwable e) {
 			log.error("A fatal error happened when starting the ResourceManager", e);
 			throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
@@ -166,12 +170,12 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 						jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
 							highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
 					} catch (Exception e) {
-						LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+						log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
 						throw new Exception("Failed to retrieve JobMasterLeaderRetriever");
 					}
 
 					if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
-						LOG.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
+						log.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
 						throw new Exception("JobManager is not leading");
 					}
 
@@ -190,7 +194,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 							LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
 							jobMasterLeaderRetriever.start(jobMasterLeaderListener);
 						} catch (Exception e) {
-							LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+							log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
 							return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
 						}
 						jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
@@ -237,13 +241,24 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 				if (throwable != null) {
 					return new RegistrationResponse.Decline(throwable.getMessage());
 				} else {
-					InstanceID id = new InstanceID();
-					TaskExecutorRegistration oldTaskExecutor =
-						taskExecutorGateways.put(resourceID, new TaskExecutorRegistration(taskExecutorGateway, id));
-					if (oldTaskExecutor != null) {
-						log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
+					WorkerType startedWorker = taskExecutorGateways.get(resourceID);
+					if(startedWorker != null) {
+						String oldWorkerAddress = startedWorker.getTaskExecutorGateway().getAddress();
+						if (taskExecutorAddress.equals(oldWorkerAddress)) {
+							log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
+						} else {
+							log.warn("Receive a duplicate registration from TaskExecutor {} at different address, previous ({}), new ({})",
+								resourceID, oldWorkerAddress, taskExecutorAddress);
+							// TODO :: suggest old taskExecutor to stop itself
+							slotManager.notifyTaskManagerFailure(resourceID);
+							startedWorker = workerStarted(resourceID, taskExecutorGateway);
+							taskExecutorGateways.put(resourceID, startedWorker);
+						}
+					} else {
+						startedWorker = workerStarted(resourceID, taskExecutorGateway);
+						taskExecutorGateways.put(resourceID, startedWorker);
 					}
-					return new TaskExecutorRegistrationSuccess(id, 5000);
+					return new TaskExecutorRegistrationSuccess(startedWorker.getInstanceID(), 5000);
 				}
 			}
 		}, getMainThreadExecutor());
@@ -263,14 +278,12 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 		if (jobMasterGateway != null) {
 			return slotManager.requestSlot(slotRequest);
 		} else {
-			LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
+			log.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
 			return new SlotRequestRejected(slotRequest.getAllocationId());
 		}
 	}
 
 
-
-
 	// ------------------------------------------------------------------------
 	//  Leader Contender
 	// ------------------------------------------------------------------------
@@ -324,6 +337,158 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 		shutDown();
 	}
 
+	/**
+	 * Registers an infoMessage listener
+	 *
+	 * @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager
+	 */
+	@RpcMethod
+	public void registerInfoMessageListener(final String infoMessageListenerAddress) {
+		if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
+			log.warn("Receive a duplicate registration from info message listener on ({})", infoMessageListenerAddress);
+		} else {
+			Future<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = getRpcService().connect(infoMessageListenerAddress, InfoMessageListenerRpcGateway.class);
+
+			infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction<InfoMessageListenerRpcGateway>() {
+				@Override
+				public void accept(InfoMessageListenerRpcGateway gateway) {
+					log.info("Receive a registration from info message listener on ({})", infoMessageListenerAddress);
+					infoMessageListeners.put(infoMessageListenerAddress, gateway);
+				}
+			}, getMainThreadExecutor());
+
+			infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+				@Override
+				public Void apply(Throwable failure) {
+					log.warn("Receive a registration from unreachable info message listener on ({})", infoMessageListenerAddress);
+					return null;
+				}
+			}, getMainThreadExecutor());
+		}
+	}
+
+	/**
+	 * Unregisters an infoMessage listener
+	 *
+	 * @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager
+	 *
+	 */
+	@RpcMethod
+	public void unRegisterInfoMessageListener(final String infoMessageListenerAddress) {
+		infoMessageListeners.remove(infoMessageListenerAddress);
+	}
+
+	/**
+	 * Shutdowns cluster
+	 *
+	 * @param finalStatus
+	 * @param optionalDiagnostics
+	 */
+	@RpcMethod
+	public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) {
+		log.info("shut down cluster because application is in {}, diagnostics {}", finalStatus, optionalDiagnostics);
+		shutDownApplication(finalStatus, optionalDiagnostics);
+	}
+
+	/**
+	 * This method should be called by the framework once it detects that a currently registered task executor has failed.
+	 *
+	 * @param resourceID Id of the worker that has failed.
+	 * @param message An informational message that explains why the worker failed.
+	 */
+	public void notifyWorkerFailed(final ResourceID resourceID, String message) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				WorkerType worker = taskExecutorGateways.remove(resourceID);
+				if (worker != null) {
+					// TODO :: suggest failed task executor to stop itself
+					slotManager.notifyTaskManagerFailure(resourceID);
+				}
+			}
+		});
+	}
+
+	/**
+	 * Gets the number of currently started TaskManagers.
+	 *
+	 * @return The number of currently started TaskManagers.
+	 */
+	public int getNumberOfStartedTaskManagers() {
+		return taskExecutorGateways.size();
+	}
+
+	/**
+	 * Notifies the resource manager of a fatal error.
+	 *
+	 * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, but exit it in
+	 * such a way that a high-availability setting would restart this or fail over
+	 * to another master.
+	 */
+	public void onFatalError(final String message, final Throwable error) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				fatalError(message, error);
+			}
+		});
+	}
+
+	// ------------------------------------------------------------------------
+	//  Framework specific behavior
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Initializes the framework specific components.
+	 *
+	 * @throws Exception Exceptions during initialization cause the resource manager to fail.
+	 */
+	protected abstract void initialize() throws Exception;
+
+	/**
+	 * Callback when a task executor register.
+	 *
+	 * @param resourceID The worker resource id
+	 * @param taskExecutorGateway the task executor gateway
+	 */
+	protected abstract WorkerType workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway);
+
+	/**
+	 * Callback when a resource manager faced a fatal error
+	 * @param message
+	 * @param error
+	 */
+	protected abstract void fatalError(String message, Throwable error);
+
+	/**
+	 * The framework specific code for shutting down the application. This should report the
+	 * application's final status and shut down the resource manager cleanly.
+	 *
+	 * This method also needs to make sure all pending containers that are not registered
+	 * yet are returned.
+	 *
+	 * @param finalStatus The application status to report.
+	 * @param optionalDiagnostics An optional diagnostics message.
+	 */
+	protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics);
+
+	// ------------------------------------------------------------------------
+	//  Info messaging
+	// ------------------------------------------------------------------------
+
+	public void sendInfoMessage(final String message) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				InfoMessage infoMessage = new InfoMessage(message);
+				for (InfoMessageListenerRpcGateway listenerRpcGateway : infoMessageListeners.values()) {
+					listenerRpcGateway
+						.notifyInfoMessage(infoMessage);
+				}
+			}
+		});
+	}
+
 	private static class JobMasterLeaderListener implements LeaderRetrievalListener {
 
 		private final JobID jobID;
@@ -343,5 +508,6 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 			// TODO
 		}
 	}
+
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/69d08ca4/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 484cea7..7c44006 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -75,4 +76,26 @@ public interface ResourceManagerGateway extends RpcGateway {
 		String taskExecutorAddress,
 		ResourceID resourceID,
 		@RpcTimeout Time timeout);
+
+	/**
+	 * Registers an infoMessage listener
+	 *
+	 * @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager
+	 */
+	void registerInfoMessageListener(String infoMessageListenerAddress);
+
+	/**
+	 * Unregisters an infoMessage listener
+	 *
+	 * @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager
+	 *
+	 */
+	void unRegisterInfoMessageListener(String infoMessageListenerAddress);
+
+	/**
+	 * shutdown cluster
+	 * @param finalStatus
+	 * @param optionalDiagnostics
+	 */
+	void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/69d08ca4/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
new file mode 100644
index 0000000..84db1ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+/**
+ * A standalone implementation of the resource manager. Used when the system is started in
+ * standalone mode (via scripts), rather than via a resource framework like YARN or Mesos.
+ */
+public class StandaloneResourceManager extends ResourceManager<ResourceManagerGateway, TaskExecutorRegistration> {
+
+	public StandaloneResourceManager(RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		SlotManager slotManager) {
+		super(rpcService, highAvailabilityServices, slotManager);
+	}
+
+	@Override
+	protected void initialize() throws Exception {
+		// nothing to initialize
+	}
+
+	@Override
+	protected void fatalError(final String message, final Throwable error) {
+		log.error("FATAL ERROR IN RESOURCE MANAGER: " + message, error);
+		// kill this process
+		System.exit(EXIT_CODE_FATAL_ERROR);
+	}
+
+	@Override
+	protected TaskExecutorRegistration workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
+		InstanceID instanceID = new InstanceID();
+		TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(taskExecutorGateway, instanceID);
+		return taskExecutorRegistration;
+	}
+
+	@Override
+	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/69d08ca4/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 64a1191..fdb83f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -55,7 +55,7 @@ public class ResourceManagerHATest {
 		highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 
 		SlotManager slotManager = mock(SlotManager.class);
-		final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, slotManager);
+		final ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, slotManager);
 		resourceManager.start();
 		// before grant leadership, resourceManager's leaderId is null
 		Assert.assertNull(resourceManager.getLeaderSessionID());

http://git-wip-us.apache.org/repos/asf/flink/blob/69d08ca4/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 332c093..8f09152 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -160,7 +160,7 @@ public class ResourceManagerJobMasterTest {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
 		highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService);
-		ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+		ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
 		resourceManager.start();
 		return resourceManager;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/69d08ca4/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index ed7c7d7..e6d1ed5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -121,7 +121,7 @@ public class ResourceManagerTaskExecutorTest {
 	private ResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService) {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
-		ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+		ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
 		resourceManager.start();
 		return resourceManager;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/69d08ca4/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 0232fab..ff25897 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
@@ -100,7 +101,7 @@ public class SlotProtocolTest extends TestLogger {
 
 		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
 		ResourceManager resourceManager =
-			new ResourceManager(testRpcService, testingHaServices, slotManager);
+			new StandaloneResourceManager(testRpcService, testingHaServices, slotManager);
 		resourceManager.start();
 		rmLeaderElectionService.isLeader(rmLeaderID);
 
@@ -179,7 +180,7 @@ public class SlotProtocolTest extends TestLogger {
 
 		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
 		ResourceManager resourceManager =
-			new ResourceManager(testRpcService, testingHaServices, slotManager);
+			new StandaloneResourceManager(testRpcService, testingHaServices, slotManager);
 		resourceManager.start();
 		rmLeaderElectionService.isLeader(rmLeaderID);
 


[3/3] flink git commit: [FLINK-4606] integrate features of old ResourceManager

Posted by mx...@apache.org.
[FLINK-4606] integrate features of old ResourceManager

This closes #2540


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38766309
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38766309
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38766309

Branch: refs/heads/flip-6
Commit: 387663094adbffb898d3c3cc33eb33c65e18ba64
Parents: 87767ab
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Sep 27 10:38:02 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 28 17:13:33 2016 +0200

----------------------------------------------------------------------
 .../InfoMessageListenerRpcGateway.java          |   1 -
 .../resourcemanager/ResourceManager.java        | 146 ++++++++++++-------
 .../resourcemanager/ResourceManagerGateway.java |   6 +-
 .../ResourceManagerServices.java                |  44 ++++++
 .../StandaloneResourceManager.java              |  19 ++-
 .../TaskExecutorRegistration.java               |  51 -------
 .../registration/TaskExecutorRegistration.java  |  51 +++++++
 .../slotmanager/SimpleSlotManager.java          |   6 -
 .../slotmanager/SlotManager.java                |  63 ++++++--
 .../slotmanager/SlotManagerTest.java            |  25 +++-
 .../slotmanager/SlotProtocolTest.java           |  42 +++---
 11 files changed, 295 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/38766309/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
index c1eeefa..d1373ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
-import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.RpcGateway;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/38766309/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 83dc4db..190a4de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -20,14 +20,18 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -48,11 +52,10 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -64,36 +67,43 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * It offers the following methods as part of its rpc interface to interact with the him remotely:
  * <ul>
  *     <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
- *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
+ *     <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
-public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint implements LeaderContender {
+public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
+		extends RpcEndpoint<ResourceManagerGateway>
+		implements LeaderContender {
 
 	/** The exit code with which the process is stopped in case of a fatal error */
 	protected static final int EXIT_CODE_FATAL_ERROR = -13;
 
 	private final Map<JobID, JobMasterGateway> jobMasterGateways;
 
-	private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners;
+	private final Map<JobID, JobMasterLeaderListener> jobMasterLeaderRetrievalListeners;
 
 	private final Map<ResourceID, WorkerType> taskExecutorGateways;
 
 	private final HighAvailabilityServices highAvailabilityServices;
 
-	private LeaderElectionService leaderElectionService;
-
 	private final SlotManager slotManager;
 
+	private LeaderElectionService leaderElectionService;
+
 	private UUID leaderSessionID;
 
 	private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
 
-	public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices, SlotManager slotManager) {
+	private final Time timeout = Time.seconds(5);
+
+	public ResourceManager(
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			SlotManager slotManager) {
 		super(rpcService);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
 		this.jobMasterGateways = new HashMap<>();
 		this.slotManager = checkNotNull(slotManager);
-		this.jobMasterLeaderRetrievalListeners = new HashSet<>();
+		this.jobMasterLeaderRetrievalListeners = new HashMap<>();
 		this.taskExecutorGateways = new HashMap<>();
 		infoMessageListeners = new HashMap<>();
 	}
@@ -105,6 +115,7 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 			super.start();
 			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
 			leaderElectionService.start(this);
+			slotManager.setupResourceManagerServices(new DefaultResourceManagerServices());
 			// framework specific initialization
 			initialize();
 		} catch (Throwable e) {
@@ -117,7 +128,7 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 	public void shutDown() {
 		try {
 			leaderElectionService.stop();
-			for(JobID jobID : jobMasterGateways.keySet()) {
+			for (JobID jobID : jobMasterGateways.keySet()) {
 				highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
 			}
 			super.shutDown();
@@ -189,15 +200,17 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 					if (throwable != null) {
 						return new RegistrationResponse.Decline(throwable.getMessage());
 					} else {
-						JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
-						try {
-							LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
-							jobMasterLeaderRetriever.start(jobMasterLeaderListener);
-						} catch (Exception e) {
-							log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
-							return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
+						if (!jobMasterLeaderRetrievalListeners.containsKey(jobID)) {
+							JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
+							try {
+								LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
+								jobMasterLeaderRetriever.start(jobMasterLeaderListener);
+							} catch (Exception e) {
+								log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+								return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
+							}
+							jobMasterLeaderRetrievalListeners.put(jobID, jobMasterLeaderListener);
 						}
-						jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
 						final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
 						if (existingGateway != null) {
 							log.info("Replacing gateway for registered JobID {}.", jobID);
@@ -232,7 +245,6 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 						resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
 					throw new Exception("Invalid leader session id");
 				}
-
 				return getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class).get(5, TimeUnit.SECONDS);
 			}
 		}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
@@ -241,24 +253,14 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 				if (throwable != null) {
 					return new RegistrationResponse.Decline(throwable.getMessage());
 				} else {
-					WorkerType startedWorker = taskExecutorGateways.get(resourceID);
-					if(startedWorker != null) {
-						String oldWorkerAddress = startedWorker.getTaskExecutorGateway().getAddress();
-						if (taskExecutorAddress.equals(oldWorkerAddress)) {
-							log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
-						} else {
-							log.warn("Receive a duplicate registration from TaskExecutor {} at different address, previous ({}), new ({})",
-								resourceID, oldWorkerAddress, taskExecutorAddress);
-							// TODO :: suggest old taskExecutor to stop itself
-							slotManager.notifyTaskManagerFailure(resourceID);
-							startedWorker = workerStarted(resourceID, taskExecutorGateway);
-							taskExecutorGateways.put(resourceID, startedWorker);
-						}
-					} else {
-						startedWorker = workerStarted(resourceID, taskExecutorGateway);
-						taskExecutorGateways.put(resourceID, startedWorker);
+					WorkerType oldWorker = taskExecutorGateways.remove(resourceID);
+					if (oldWorker != null) {
+						// TODO :: suggest old taskExecutor to stop itself
+						slotManager.notifyTaskManagerFailure(resourceID);
 					}
-					return new TaskExecutorRegistrationSuccess(startedWorker.getInstanceID(), 5000);
+					WorkerType newWorker = workerStarted(resourceID);
+					taskExecutorGateways.put(resourceID, newWorker);
+					return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
 				}
 			}
 		}, getMainThreadExecutor());
@@ -271,11 +273,20 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 	 * @return Slot assignment
 	 */
 	@RpcMethod
-	public SlotRequestReply requestSlot(SlotRequest slotRequest) {
-		final JobID jobId = slotRequest.getJobId();
-		final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
+	public SlotRequestReply requestSlot(
+			UUID jobMasterLeaderID,
+			UUID resourceManagerLeaderID,
+			SlotRequest slotRequest) {
+
+		JobID jobId = slotRequest.getJobId();
+		JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
+		JobMasterLeaderListener jobMasterLeaderListener = jobMasterLeaderRetrievalListeners.get(jobId);
+
+		UUID leaderID = jobMasterLeaderListener.getLeaderID();
 
-		if (jobMasterGateway != null) {
+		if (jobMasterGateway != null
+				&& jobMasterLeaderID.equals(leaderID)
+				&& resourceManagerLeaderID.equals(leaderSessionID)) {
 			return slotManager.requestSlot(slotRequest);
 		} else {
 			log.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
@@ -379,7 +390,7 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 	}
 
 	/**
-	 * Shutdowns cluster
+	 * Cleanup application and shut down cluster
 	 *
 	 * @param finalStatus
 	 * @param optionalDiagnostics
@@ -446,17 +457,11 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 	protected abstract void initialize() throws Exception;
 
 	/**
-	 * Callback when a task executor register.
+	 * Notifies the resource master of a fatal error.
 	 *
-	 * @param resourceID The worker resource id
-	 * @param taskExecutorGateway the task executor gateway
-	 */
-	protected abstract WorkerType workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway);
-
-	/**
-	 * Callback when a resource manager faced a fatal error
-	 * @param message
-	 * @param error
+	 * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, but exit it in
+	 * such a way that a high-availability setting would restart this or fail over
+	 * to another master.
 	 */
 	protected abstract void fatalError(String message, Throwable error);
 
@@ -472,6 +477,19 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 	 */
 	protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics);
 
+	/**
+	 * Allocates a resource using the resource profile.
+	 * @param resourceProfile The resource description
+	 */
+	@VisibleForTesting
+	public abstract void startNewWorker(ResourceProfile resourceProfile);
+
+	/**
+	 * Callback when a worker was started.
+	 * @param resourceID The worker resource id
+	 */
+	protected abstract WorkerType workerStarted(ResourceID resourceID);
+
 	// ------------------------------------------------------------------------
 	//  Info messaging
 	// ------------------------------------------------------------------------
@@ -489,6 +507,24 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 		});
 	}
 
+	private class DefaultResourceManagerServices implements ResourceManagerServices {
+
+		@Override
+		public void allocateResource(ResourceProfile resourceProfile) {
+			ResourceManager.this.startNewWorker(resourceProfile);
+		}
+
+		@Override
+		public Executor getAsyncExecutor() {
+			return ResourceManager.this.getRpcService().getExecutor();
+		}
+
+		@Override
+		public Executor getExecutor() {
+			return ResourceManager.this.getMainThreadExecutor();
+		}
+	}
+
 	private static class JobMasterLeaderListener implements LeaderRetrievalListener {
 
 		private final JobID jobID;
@@ -498,6 +534,14 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
 			this.jobID = jobID;
 		}
 
+		public JobID getJobID() {
+			return jobID;
+		}
+
+		public UUID getLeaderID() {
+			return leaderID;
+		}
+
 		@Override
 		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
 			this.leaderID = leaderSessionID;

http://git-wip-us.apache.org/repos/asf/flink/blob/38766309/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 7c44006..87303a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -59,7 +59,11 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param slotRequest Slot request
 	 * @return Future slot assignment
 	 */
-	Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
+	Future<SlotRequestReply> requestSlot(
+		UUID jobMasterLeaderID,
+		UUID resourceManagerLeaderID,
+		SlotRequest slotRequest,
+		@RpcTimeout Time timeout);
 
 	/**
 	 * Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/38766309/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
new file mode 100644
index 0000000..30994dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Interface which provides access to services of the ResourceManager.
+ */
+public interface ResourceManagerServices {
+
+	/**
+	 * Allocates a resource according to the resource profile.
+	 */
+	void allocateResource(ResourceProfile resourceProfile);
+
+	/**
+	 * Gets the async excutor which executes outside of the main thread of the ResourceManager
+	 */
+	Executor getAsyncExecutor();
+
+	/**
+	 * Gets the executor which executes in the main thread of the ResourceManager
+	 */
+	Executor getExecutor();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/38766309/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 84db1ee..deca8d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -20,17 +20,18 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 
 /**
  * A standalone implementation of the resource manager. Used when the system is started in
  * standalone mode (via scripts), rather than via a resource framework like YARN or Mesos.
+ *
+ * This ResourceManager doesn't acquire new resources.
  */
-public class StandaloneResourceManager extends ResourceManager<ResourceManagerGateway, TaskExecutorRegistration> {
+public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
 	public StandaloneResourceManager(RpcService rpcService,
 		HighAvailabilityServices highAvailabilityServices,
@@ -51,14 +52,16 @@ public class StandaloneResourceManager extends ResourceManager<ResourceManagerGa
 	}
 
 	@Override
-	protected TaskExecutorRegistration workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
-		InstanceID instanceID = new InstanceID();
-		TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(taskExecutorGateway, instanceID);
-		return taskExecutorRegistration;
+	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
 	}
 
 	@Override
-	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+	public void startNewWorker(ResourceProfile resourceProfile) {
+	}
 
+	@Override
+	protected ResourceID workerStarted(ResourceID resourceID) {
+		return resourceID;
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/38766309/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
deleted file mode 100644
index f8dfdc7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-
-import java.io.Serializable;
-
-/**
- * This class is responsible for group the TaskExecutorGateway and the InstanceID of a registered task executor.
- */
-public class TaskExecutorRegistration implements Serializable {
-
-	private static final long serialVersionUID = -2062957799469434614L;
-
-	private TaskExecutorGateway taskExecutorGateway;
-
-	private InstanceID instanceID;
-
-	public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway,
-									InstanceID instanceID) {
-		this.taskExecutorGateway = taskExecutorGateway;
-		this.instanceID = instanceID;
-	}
-
-	public InstanceID getInstanceID() {
-		return instanceID;
-	}
-
-	public TaskExecutorGateway getTaskExecutorGateway() {
-		return taskExecutorGateway;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/38766309/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
new file mode 100644
index 0000000..6b21f5c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.registration;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+import java.io.Serializable;
+
+/**
+ * This class is responsible for group the TaskExecutorGateway and the InstanceID of a registered task executor.
+ */
+public class TaskExecutorRegistration implements Serializable {
+
+	private static final long serialVersionUID = -2062957799469434614L;
+
+	private TaskExecutorGateway taskExecutorGateway;
+
+	private InstanceID instanceID;
+
+	public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway,
+									InstanceID instanceID) {
+		this.taskExecutorGateway = taskExecutorGateway;
+		this.instanceID = instanceID;
+	}
+
+	public InstanceID getInstanceID() {
+		return instanceID;
+	}
+
+	public TaskExecutorGateway getTaskExecutorGateway() {
+		return taskExecutorGateway;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/38766309/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
index ef5ce31..ae1de5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
@@ -18,7 +18,6 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -51,9 +50,4 @@ public class SimpleSlotManager extends SlotManager {
 		}
 	}
 
-	@Override
-	protected void allocateContainer(ResourceProfile resourceProfile) {
-		// TODO
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/38766309/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index a6d2196..a56b2f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -22,16 +22,18 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,6 +84,9 @@ public abstract class SlotManager {
 	/** The current leader id set by the ResourceManager */
 	private UUID leaderID;
 
+	/** The Resource allocation provider */
+	private ResourceManagerServices resourceManagerServices;
+
 	public SlotManager() {
 		this.registeredSlots = new HashMap<>(16);
 		this.pendingSlotRequests = new LinkedHashMap<>(16);
@@ -91,6 +96,16 @@ public abstract class SlotManager {
 		this.timeout = Time.seconds(10);
 	}
 
+	/**
+	 * Initializes the resource supplier which is needed to request new resources.
+	 */
+	public void setupResourceManagerServices(ResourceManagerServices resourceManagerServices) {
+		if (this.resourceManagerServices != null) {
+			throw new IllegalStateException("ResourceManagerServices may only be set once.");
+		}
+		this.resourceManagerServices = resourceManagerServices;
+	}
+
 
 	// ------------------------------------------------------------------------
 	//  slot managements
@@ -120,17 +135,32 @@ public abstract class SlotManager {
 
 			// record this allocation in bookkeeping
 			allocationMap.addAllocation(slot.getSlotId(), allocationId);
-
 			// remove selected slot from free pool
-			freeSlots.remove(slot.getSlotId());
+			final ResourceSlot removedSlot = freeSlots.remove(slot.getSlotId());
 
 			final Future<SlotRequestReply> slotRequestReplyFuture =
 				slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
-			// TODO handle timeouts and response
+
+			slotRequestReplyFuture.handleAsync(new BiFunction<SlotRequestReply, Throwable, Object>() {
+				@Override
+				public Object apply(SlotRequestReply slotRequestReply, Throwable throwable) {
+					if (throwable != null) {
+						// we failed, put the slot and the request back again
+						if (allocationMap.isAllocated(slot.getSlotId())) {
+							// only re-add if the slot hasn't been removed in the meantime
+							freeSlots.put(slot.getSlotId(), removedSlot);
+						}
+						pendingSlotRequests.put(allocationId, request);
+					}
+					return null;
+				}
+			}, resourceManagerServices.getExecutor());
 		} else {
 			LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
 				"AllocationID:{}, JobID:{}", allocationId, request.getJobId());
-			allocateContainer(request.getResourceProfile());
+			Preconditions.checkState(resourceManagerServices != null,
+				"Attempted to allocate resources but no ResourceManagerServices set.");
+			resourceManagerServices.allocateResource(request.getResourceProfile());
 			pendingSlotRequests.put(allocationId, request);
 		}
 
@@ -343,7 +373,7 @@ public abstract class SlotManager {
 
 		if (chosenRequest != null) {
 			final AllocationID allocationId = chosenRequest.getAllocationId();
-			pendingSlotRequests.remove(allocationId);
+			final SlotRequest removedSlotRequest = pendingSlotRequests.remove(allocationId);
 
 			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(),
 				allocationId, chosenRequest.getJobId());
@@ -351,7 +381,19 @@ public abstract class SlotManager {
 
 			final Future<SlotRequestReply> slotRequestReplyFuture =
 				freeSlot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
-			// TODO handle timeouts and response
+
+			slotRequestReplyFuture.handleAsync(new BiFunction<SlotRequestReply, Throwable, Object>() {
+				@Override
+				public Object apply(SlotRequestReply slotRequestReply, Throwable throwable) {
+					if (throwable != null) {
+						// we failed, add the request back again
+						if (allocationMap.isAllocated(freeSlot.getSlotId())) {
+							pendingSlotRequests.put(allocationId, removedSlotRequest);
+						}
+					}
+					return null;
+				}
+			}, resourceManagerServices.getExecutor());
 		} else {
 			freeSlots.put(freeSlot.getSlotId(), freeSlot);
 		}
@@ -417,13 +459,6 @@ public abstract class SlotManager {
 	protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot,
 		final Map<AllocationID, SlotRequest> pendingRequests);
 
-	/**
-	 * The framework specific code for allocating a container for specified resource profile.
-	 *
-	 * @param resourceProfile The resource profile
-	 */
-	protected abstract void allocateContainer(final ResourceProfile resourceProfile);
-
 	// ------------------------------------------------------------------------
 	//  Helper classes
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/38766309/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 9ee9690..0fed79e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -19,12 +19,16 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.junit.BeforeClass;
@@ -34,10 +38,13 @@ import org.mockito.Mockito;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 
 public class SlotManagerTest {
@@ -57,6 +64,8 @@ public class SlotManagerTest {
 	@BeforeClass
 	public static void setUp() {
 		taskExecutorGateway = Mockito.mock(TaskExecutorGateway.class);
+		Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class)))
+			.thenReturn(new FlinkCompletableFuture<SlotRequestReply>());
 	}
 
 	/**
@@ -498,12 +507,13 @@ public class SlotManagerTest {
 	//  testing classes
 	// ------------------------------------------------------------------------
 
-	private static class TestingSlotManager extends SlotManager {
+	private static class TestingSlotManager extends SlotManager implements ResourceManagerServices {
 
 		private final List<ResourceProfile> allocatedContainers;
 
 		TestingSlotManager() {
 			this.allocatedContainers = new LinkedList<>();
+			setupResourceManagerServices(this);
 		}
 
 		/**
@@ -543,12 +553,23 @@ public class SlotManagerTest {
 		}
 
 		@Override
-		protected void allocateContainer(ResourceProfile resourceProfile) {
+		public void allocateResource(ResourceProfile resourceProfile) {
 			allocatedContainers.add(resourceProfile);
 		}
 
+		@Override
+		public Executor getAsyncExecutor() {
+			return Mockito.mock(Executor.class);
+		}
+
+		@Override
+		public Executor getExecutor() {
+			return Mockito.mock(Executor.class);
+		}
+
 		List<ResourceProfile> getAllocatedContainers() {
 			return allocatedContainers;
 		}
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/38766309/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index ff25897..e3018c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -24,18 +24,14 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.*;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
@@ -47,9 +43,12 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
@@ -99,9 +98,9 @@ public class SlotProtocolTest extends TestLogger {
 		TestingLeaderElectionService rmLeaderElectionService =
 			configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
 
-		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
+		SlotManager slotManager = Mockito.spy(new SimpleSlotManager());
 		ResourceManager resourceManager =
-			new StandaloneResourceManager(testRpcService, testingHaServices, slotManager);
+			Mockito.spy(new StandaloneResourceManager(testRpcService, testingHaServices, slotManager));
 		resourceManager.start();
 		rmLeaderElectionService.isLeader(rmLeaderID);
 
@@ -118,7 +117,7 @@ public class SlotProtocolTest extends TestLogger {
 
 		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
 		SlotRequestReply slotRequestReply =
-			resourceManager.requestSlot(slotRequest);
+			resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest);
 
 		// 1) SlotRequest is routed to the SlotManager
 		verify(slotManager).requestSlot(slotRequest);
@@ -129,13 +128,15 @@ public class SlotProtocolTest extends TestLogger {
 			allocationID);
 
 		// 3) SlotRequest leads to a container allocation
-		verify(slotManager, timeout(5000)).allocateContainer(resourceProfile);
+		verify(resourceManager, timeout(5000)).startNewWorker(resourceProfile);
 
 		Assert.assertFalse(slotManager.isAllocated(allocationID));
 
 		// slot becomes available
 		final String tmAddress = "/tm1";
 		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class)))
+			.thenReturn(new FlinkCompletableFuture<SlotRequestReply>());
 		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
 		final ResourceID resourceID = ResourceID.generate();
@@ -176,11 +177,13 @@ public class SlotProtocolTest extends TestLogger {
 			configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
 
 		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class)))
+			.thenReturn(new FlinkCompletableFuture<SlotRequestReply>());
 		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
-		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
+		SlotManager slotManager = Mockito.spy(new SimpleSlotManager());
 		ResourceManager resourceManager =
-			new StandaloneResourceManager(testRpcService, testingHaServices, slotManager);
+			Mockito.spy(new StandaloneResourceManager(testRpcService, testingHaServices, slotManager));
 		resourceManager.start();
 		rmLeaderElectionService.isLeader(rmLeaderID);
 
@@ -207,7 +210,7 @@ public class SlotProtocolTest extends TestLogger {
 
 		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
 		SlotRequestReply slotRequestReply =
-			resourceManager.requestSlot(slotRequest);
+			resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest);
 
 		// 1) a SlotRequest is routed to the SlotManager
 		verify(slotManager).requestSlot(slotRequest);
@@ -241,15 +244,4 @@ public class SlotProtocolTest extends TestLogger {
 		return rmLeaderElectionService;
 	}
 
-	private static class TestingSlotManager extends SimpleSlotManager {
-
-		// change visibility of function to public for testing
-		@Override
-		public void allocateContainer(ResourceProfile resourceProfile) {
-			super.allocateContainer(resourceProfile);
-		}
-
-
-	}
-
 }


[2/3] flink git commit: [FLINK-4703] RpcCompletenessTest: Add support for type arguments and subclasses

Posted by mx...@apache.org.
[FLINK-4703] RpcCompletenessTest: Add support for type arguments and subclasses

This closes #2561


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87767ab4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87767ab4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87767ab4

Branch: refs/heads/flip-6
Commit: 87767ab4daf06300aac0e5d969e6e44808657187
Parents: 69d08ca
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 28 12:39:30 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 28 17:13:33 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcEndpoint.java   | 23 +++++-
 .../flink/runtime/rpc/RpcCompletenessTest.java  | 80 ++++++++++++++++++--
 2 files changed, 94 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87767ab4/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 4e5e49a..79961f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -85,9 +85,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 
 		// IMPORTANT: Don't change order of selfGatewayType and self because rpcService.startServer
 		// requires that selfGatewayType has been initialized
-		this.selfGatewayType = ReflectionUtil.getTemplateType1(getClass());
+		this.selfGatewayType = determineSelfGatewayType();
 		this.self = rpcService.startServer(this);
-		
+
 		this.mainThreadExecutor = new MainThreadExecutor((MainThreadExecutable) self);
 	}
 
@@ -255,4 +255,23 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 			gateway.runAsync(runnable);
 		}
 	}
+
+	/**
+	 * Determines the self gateway type specified in one of the subclasses which extend this class.
+	 * May traverse multiple class hierarchies until a Gateway type is found as a first type argument.
+	 * @return Class<C> The determined self gateway type
+	 */
+	private Class<C> determineSelfGatewayType() {
+
+		// determine self gateway type
+		Class c = getClass();
+		Class<C> determinedSelfGatewayType;
+		do {
+			determinedSelfGatewayType = ReflectionUtil.getTemplateType1(c);
+			// check if super class contains self gateway type in next loop
+			c = c.getSuperclass();
+		} while (!RpcGateway.class.isAssignableFrom(determinedSelfGatewayType));
+
+		return determinedSelfGatewayType;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/87767ab4/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index 53355e8..e7143ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -26,9 +26,14 @@ import org.apache.flink.util.ReflectionUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -41,8 +46,33 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Test which ensures that all classes of subtype {@link RpcEndpoint} implement
+ * the methods specified in the generic gateway type argument.
+ *
+ * {@code
+ * 	    RpcEndpoint<GatewayTypeParameter extends RpcGateway>
+ * }
+ *
+ * Note, that the class hierarchy can also be nested. In this case the type argument
+ * always has to be the first argument, e.g. {@code
+ *
+ * 	    // RpcClass needs to implement RpcGatewayClass' methods
+ * 	    RpcClass extends RpcEndpoint<RpcGatewayClass>
+ *
+ * 	    // RpcClass2 or its subclass needs to implement RpcGatewayClass' methods
+ *      RpcClass<GatewayTypeParameter extends RpcGateway,...> extends RpcEndpoint<GatewayTypeParameter>
+ *      RpcClass2 extends RpcClass<RpcGatewayClass,...>
+ *
+ *      // needless to say, this can even be nested further
+ *      ...
+ * }
+ *
+ */
 public class RpcCompletenessTest extends TestLogger {
 
+	private static Logger LOG = LoggerFactory.getLogger(RpcCompletenessTest.class);
+
 	private static final Class<?> futureClass = Future.class;
 	private static final Class<?> timeoutClass = Time.class;
 
@@ -55,16 +85,52 @@ public class RpcCompletenessTest extends TestLogger {
 
 		Class<? extends RpcEndpoint> c;
 
-		for (Class<? extends RpcEndpoint> rpcEndpoint :classes){
+		mainloop:
+		for (Class<? extends RpcEndpoint> rpcEndpoint : classes) {
 			c = rpcEndpoint;
 
-			Class<?> rpcGatewayType = ReflectionUtil.getTemplateType1(c);
+			LOG.debug("-------------");
+			LOG.debug("c: {}", c);
 
-			if (rpcGatewayType != null) {
-				checkCompleteness(rpcEndpoint, (Class<? extends RpcGateway>) rpcGatewayType);
-			} else {
-				fail("Could not retrieve the rpc gateway class for the given rpc endpoint class " + rpcEndpoint.getName());
+			// skip abstract classes
+			if (Modifier.isAbstract(c.getModifiers())) {
+				LOG.debug("Skipping abstract class");
+				continue;
 			}
+
+			// check for type parameter bound to RpcGateway
+			// skip if one is found because a subclass will provide the concrete argument
+			TypeVariable<? extends Class<? extends RpcEndpoint>>[] typeParameters = c.getTypeParameters();
+			LOG.debug("Checking {} parameters.", typeParameters.length);
+			for (int i = 0; i < typeParameters.length; i++) {
+				for (Type bound : typeParameters[i].getBounds()) {
+					LOG.debug("checking bound {} of type parameter {}", bound, typeParameters[i]);
+					if (bound.toString().equals("interface " + RpcGateway.class.getName())) {
+						if (i > 0) {
+							fail("Type parameter for RpcGateway should come first in " + c);
+						}
+						LOG.debug("Skipping class with type parameter bound to RpcGateway.");
+						// Type parameter is bound to RpcGateway which a subclass will provide
+						continue mainloop;
+					}
+				}
+			}
+
+			// check if this class or any super class contains the RpcGateway argument
+			Class<?> rpcGatewayType;
+			do {
+				LOG.debug("checking type argument of class: {}", c);
+				rpcGatewayType = ReflectionUtil.getTemplateType1(c);
+				LOG.debug("type argument is: {}", rpcGatewayType);
+
+				c = (Class<? extends RpcEndpoint>) c.getSuperclass();
+
+			} while (!RpcGateway.class.isAssignableFrom(rpcGatewayType));
+
+			LOG.debug("Checking RRC completeness of endpoint '{}' with gateway '{}'",
+				rpcEndpoint.getSimpleName(), rpcGatewayType.getSimpleName());
+
+			checkCompleteness(rpcEndpoint, (Class<? extends RpcGateway>) rpcGatewayType);
 		}
 	}
 
@@ -352,7 +418,7 @@ public class RpcCompletenessTest extends TestLogger {
 	 */
 	private List<Method> getRpcMethodsFromGateway(Class<? extends RpcGateway> interfaceClass) {
 		if(!interfaceClass.isInterface()) {
-			fail(interfaceClass.getName() + "is not a interface");
+			fail(interfaceClass.getName() + " is not a interface");
 		}
 
 		ArrayList<Method> allMethods = new ArrayList<>();