You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:21:59 UTC

[19/50] [abbrv] flink git commit: [FLINK-4606] Integrate the new ResourceManager with the existed FlinkResourceManager

[FLINK-4606]  Integrate the new ResourceManager with the existed FlinkResourceManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d2bd7e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d2bd7e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d2bd7e7

Branch: refs/heads/flip-6
Commit: 0d2bd7e7a1d7fcf1160543a6217326818f3832c5
Parents: 1d25ea8
Author: beyond1920 <be...@126.com>
Authored: Fri Sep 9 09:11:24 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 20 19:46:26 2016 +0200

----------------------------------------------------------------------
 .../InfoMessageListenerRpcGateway.java          |  35 +++
 .../resourcemanager/ResourceManager.java        | 214 ++++++++++++++++---
 .../resourcemanager/ResourceManagerGateway.java |  23 ++
 .../StandaloneResourceManager.java              |  64 ++++++
 .../resourcemanager/ResourceManagerHATest.java  |   2 +-
 .../ResourceManagerJobMasterTest.java           |   2 +-
 .../ResourceManagerTaskExecutorTest.java        |   2 +-
 .../slotmanager/SlotProtocolTest.java           |   5 +-
 8 files changed, 318 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0d2bd7e7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
new file mode 100644
index 0000000..c1eeefa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.RpcGateway;
+
+/**
+ * A gateway to listen for info messages from {@link ResourceManager}
+ */
+public interface InfoMessageListenerRpcGateway extends RpcGateway {
+
+	/**
+	 * Notifies when resource manager need to notify listener about InfoMessage
+	 * @param infoMessage
+	 */
+	void notifyInfoMessage(InfoMessage infoMessage);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2bd7e7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 88b8a11..83dc4db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -20,19 +20,22 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -42,8 +45,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.HashMap;
@@ -66,15 +67,16 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
+public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint implements LeaderContender {
 
-	private final Logger LOG = LoggerFactory.getLogger(getClass());
+	/** The exit code with which the process is stopped in case of a fatal error */
+	protected static final int EXIT_CODE_FATAL_ERROR = -13;
 
 	private final Map<JobID, JobMasterGateway> jobMasterGateways;
 
 	private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners;
 
-	private final Map<ResourceID, TaskExecutorRegistration> taskExecutorGateways;
+	private final Map<ResourceID, WorkerType> taskExecutorGateways;
 
 	private final HighAvailabilityServices highAvailabilityServices;
 
@@ -84,16 +86,16 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 
 	private UUID leaderSessionID;
 
-	public ResourceManager(
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			SlotManager slotManager) {
+	private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
+
+	public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices, SlotManager slotManager) {
 		super(rpcService);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
 		this.jobMasterGateways = new HashMap<>();
-		this.slotManager = slotManager;
+		this.slotManager = checkNotNull(slotManager);
 		this.jobMasterLeaderRetrievalListeners = new HashSet<>();
 		this.taskExecutorGateways = new HashMap<>();
+		infoMessageListeners = new HashMap<>();
 	}
 
 	@Override
@@ -103,6 +105,8 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 			super.start();
 			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
 			leaderElectionService.start(this);
+			// framework specific initialization
+			initialize();
 		} catch (Throwable e) {
 			log.error("A fatal error happened when starting the ResourceManager", e);
 			throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
@@ -166,12 +170,12 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 						jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
 							highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
 					} catch (Exception e) {
-						LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+						log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
 						throw new Exception("Failed to retrieve JobMasterLeaderRetriever");
 					}
 
 					if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
-						LOG.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
+						log.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
 						throw new Exception("JobManager is not leading");
 					}
 
@@ -190,7 +194,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 							LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
 							jobMasterLeaderRetriever.start(jobMasterLeaderListener);
 						} catch (Exception e) {
-							LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+							log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
 							return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
 						}
 						jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
@@ -237,13 +241,24 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 				if (throwable != null) {
 					return new RegistrationResponse.Decline(throwable.getMessage());
 				} else {
-					InstanceID id = new InstanceID();
-					TaskExecutorRegistration oldTaskExecutor =
-						taskExecutorGateways.put(resourceID, new TaskExecutorRegistration(taskExecutorGateway, id));
-					if (oldTaskExecutor != null) {
-						log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
+					WorkerType startedWorker = taskExecutorGateways.get(resourceID);
+					if(startedWorker != null) {
+						String oldWorkerAddress = startedWorker.getTaskExecutorGateway().getAddress();
+						if (taskExecutorAddress.equals(oldWorkerAddress)) {
+							log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
+						} else {
+							log.warn("Receive a duplicate registration from TaskExecutor {} at different address, previous ({}), new ({})",
+								resourceID, oldWorkerAddress, taskExecutorAddress);
+							// TODO :: suggest old taskExecutor to stop itself
+							slotManager.notifyTaskManagerFailure(resourceID);
+							startedWorker = workerStarted(resourceID, taskExecutorGateway);
+							taskExecutorGateways.put(resourceID, startedWorker);
+						}
+					} else {
+						startedWorker = workerStarted(resourceID, taskExecutorGateway);
+						taskExecutorGateways.put(resourceID, startedWorker);
 					}
-					return new TaskExecutorRegistrationSuccess(id, 5000);
+					return new TaskExecutorRegistrationSuccess(startedWorker.getInstanceID(), 5000);
 				}
 			}
 		}, getMainThreadExecutor());
@@ -263,14 +278,12 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 		if (jobMasterGateway != null) {
 			return slotManager.requestSlot(slotRequest);
 		} else {
-			LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
+			log.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
 			return new SlotRequestRejected(slotRequest.getAllocationId());
 		}
 	}
 
 
-
-
 	// ------------------------------------------------------------------------
 	//  Leader Contender
 	// ------------------------------------------------------------------------
@@ -324,6 +337,158 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 		shutDown();
 	}
 
+	/**
+	 * Registers an infoMessage listener
+	 *
+	 * @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager
+	 */
+	@RpcMethod
+	public void registerInfoMessageListener(final String infoMessageListenerAddress) {
+		if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
+			log.warn("Receive a duplicate registration from info message listener on ({})", infoMessageListenerAddress);
+		} else {
+			Future<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = getRpcService().connect(infoMessageListenerAddress, InfoMessageListenerRpcGateway.class);
+
+			infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction<InfoMessageListenerRpcGateway>() {
+				@Override
+				public void accept(InfoMessageListenerRpcGateway gateway) {
+					log.info("Receive a registration from info message listener on ({})", infoMessageListenerAddress);
+					infoMessageListeners.put(infoMessageListenerAddress, gateway);
+				}
+			}, getMainThreadExecutor());
+
+			infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+				@Override
+				public Void apply(Throwable failure) {
+					log.warn("Receive a registration from unreachable info message listener on ({})", infoMessageListenerAddress);
+					return null;
+				}
+			}, getMainThreadExecutor());
+		}
+	}
+
+	/**
+	 * Unregisters an infoMessage listener
+	 *
+	 * @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager
+	 *
+	 */
+	@RpcMethod
+	public void unRegisterInfoMessageListener(final String infoMessageListenerAddress) {
+		infoMessageListeners.remove(infoMessageListenerAddress);
+	}
+
+	/**
+	 * Shutdowns cluster
+	 *
+	 * @param finalStatus
+	 * @param optionalDiagnostics
+	 */
+	@RpcMethod
+	public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) {
+		log.info("shut down cluster because application is in {}, diagnostics {}", finalStatus, optionalDiagnostics);
+		shutDownApplication(finalStatus, optionalDiagnostics);
+	}
+
+	/**
+	 * This method should be called by the framework once it detects that a currently registered task executor has failed.
+	 *
+	 * @param resourceID Id of the worker that has failed.
+	 * @param message An informational message that explains why the worker failed.
+	 */
+	public void notifyWorkerFailed(final ResourceID resourceID, String message) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				WorkerType worker = taskExecutorGateways.remove(resourceID);
+				if (worker != null) {
+					// TODO :: suggest failed task executor to stop itself
+					slotManager.notifyTaskManagerFailure(resourceID);
+				}
+			}
+		});
+	}
+
+	/**
+	 * Gets the number of currently started TaskManagers.
+	 *
+	 * @return The number of currently started TaskManagers.
+	 */
+	public int getNumberOfStartedTaskManagers() {
+		return taskExecutorGateways.size();
+	}
+
+	/**
+	 * Notifies the resource manager of a fatal error.
+	 *
+	 * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, but exit it in
+	 * such a way that a high-availability setting would restart this or fail over
+	 * to another master.
+	 */
+	public void onFatalError(final String message, final Throwable error) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				fatalError(message, error);
+			}
+		});
+	}
+
+	// ------------------------------------------------------------------------
+	//  Framework specific behavior
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Initializes the framework specific components.
+	 *
+	 * @throws Exception Exceptions during initialization cause the resource manager to fail.
+	 */
+	protected abstract void initialize() throws Exception;
+
+	/**
+	 * Callback when a task executor register.
+	 *
+	 * @param resourceID The worker resource id
+	 * @param taskExecutorGateway the task executor gateway
+	 */
+	protected abstract WorkerType workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway);
+
+	/**
+	 * Callback when a resource manager faced a fatal error
+	 * @param message
+	 * @param error
+	 */
+	protected abstract void fatalError(String message, Throwable error);
+
+	/**
+	 * The framework specific code for shutting down the application. This should report the
+	 * application's final status and shut down the resource manager cleanly.
+	 *
+	 * This method also needs to make sure all pending containers that are not registered
+	 * yet are returned.
+	 *
+	 * @param finalStatus The application status to report.
+	 * @param optionalDiagnostics An optional diagnostics message.
+	 */
+	protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics);
+
+	// ------------------------------------------------------------------------
+	//  Info messaging
+	// ------------------------------------------------------------------------
+
+	public void sendInfoMessage(final String message) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				InfoMessage infoMessage = new InfoMessage(message);
+				for (InfoMessageListenerRpcGateway listenerRpcGateway : infoMessageListeners.values()) {
+					listenerRpcGateway
+						.notifyInfoMessage(infoMessage);
+				}
+			}
+		});
+	}
+
 	private static class JobMasterLeaderListener implements LeaderRetrievalListener {
 
 		private final JobID jobID;
@@ -343,5 +508,6 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 			// TODO
 		}
 	}
+
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2bd7e7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 484cea7..7c44006 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -75,4 +76,26 @@ public interface ResourceManagerGateway extends RpcGateway {
 		String taskExecutorAddress,
 		ResourceID resourceID,
 		@RpcTimeout Time timeout);
+
+	/**
+	 * Registers an infoMessage listener
+	 *
+	 * @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager
+	 */
+	void registerInfoMessageListener(String infoMessageListenerAddress);
+
+	/**
+	 * Unregisters an infoMessage listener
+	 *
+	 * @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager
+	 *
+	 */
+	void unRegisterInfoMessageListener(String infoMessageListenerAddress);
+
+	/**
+	 * shutdown cluster
+	 * @param finalStatus
+	 * @param optionalDiagnostics
+	 */
+	void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2bd7e7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
new file mode 100644
index 0000000..84db1ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+/**
+ * A standalone implementation of the resource manager. Used when the system is started in
+ * standalone mode (via scripts), rather than via a resource framework like YARN or Mesos.
+ */
+public class StandaloneResourceManager extends ResourceManager<ResourceManagerGateway, TaskExecutorRegistration> {
+
+	public StandaloneResourceManager(RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		SlotManager slotManager) {
+		super(rpcService, highAvailabilityServices, slotManager);
+	}
+
+	@Override
+	protected void initialize() throws Exception {
+		// nothing to initialize
+	}
+
+	@Override
+	protected void fatalError(final String message, final Throwable error) {
+		log.error("FATAL ERROR IN RESOURCE MANAGER: " + message, error);
+		// kill this process
+		System.exit(EXIT_CODE_FATAL_ERROR);
+	}
+
+	@Override
+	protected TaskExecutorRegistration workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
+		InstanceID instanceID = new InstanceID();
+		TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(taskExecutorGateway, instanceID);
+		return taskExecutorRegistration;
+	}
+
+	@Override
+	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2bd7e7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 64a1191..fdb83f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -55,7 +55,7 @@ public class ResourceManagerHATest {
 		highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 
 		SlotManager slotManager = mock(SlotManager.class);
-		final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, slotManager);
+		final ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, slotManager);
 		resourceManager.start();
 		// before grant leadership, resourceManager's leaderId is null
 		Assert.assertNull(resourceManager.getLeaderSessionID());

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2bd7e7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 332c093..8f09152 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -160,7 +160,7 @@ public class ResourceManagerJobMasterTest {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
 		highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService);
-		ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+		ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
 		resourceManager.start();
 		return resourceManager;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2bd7e7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index ed7c7d7..e6d1ed5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -121,7 +121,7 @@ public class ResourceManagerTaskExecutorTest {
 	private ResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService) {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
-		ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+		ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
 		resourceManager.start();
 		return resourceManager;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2bd7e7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 0232fab..ff25897 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
@@ -100,7 +101,7 @@ public class SlotProtocolTest extends TestLogger {
 
 		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
 		ResourceManager resourceManager =
-			new ResourceManager(testRpcService, testingHaServices, slotManager);
+			new StandaloneResourceManager(testRpcService, testingHaServices, slotManager);
 		resourceManager.start();
 		rmLeaderElectionService.isLeader(rmLeaderID);
 
@@ -179,7 +180,7 @@ public class SlotProtocolTest extends TestLogger {
 
 		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
 		ResourceManager resourceManager =
-			new ResourceManager(testRpcService, testingHaServices, slotManager);
+			new StandaloneResourceManager(testRpcService, testingHaServices, slotManager);
 		resourceManager.start();
 		rmLeaderElectionService.isLeader(rmLeaderID);