You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/04/25 18:51:44 UTC

flink git commit: [FLINK-6155] Introduce an endpoint id for RpcEndpoints

Repository: flink
Updated Branches:
  refs/heads/master 651e8561f -> 433a345ed


[FLINK-6155] Introduce an endpoint id for RpcEndpoints

An endpoint id allows to assign a unique name to a RpcEndpoint. This name can be used
to look up the rpc endpoint within the RpcService.

Remove component endpoint name methods from the HighAvailabilityServices

Remove JobMaster endpoint id

This closes #3596.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/433a345e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/433a345e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/433a345e

Branch: refs/heads/master
Commit: 433a345edccdee29385957841e6513679690a5e9
Parents: 651e856
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Mar 22 09:09:02 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Apr 25 20:50:40 2017 +0200

----------------------------------------------------------------------
 .../highavailability/EmbeddedNonHaServices.java |  6 ----
 .../HighAvailabilityServices.java               |  6 ----
 .../runtime/highavailability/NonHaServices.java | 10 ------
 .../highavailability/ZookeeperHaServices.java   | 10 ------
 .../runtime/jobmaster/JobManagerRunner.java     |  2 +-
 .../flink/runtime/jobmaster/JobMaster.java      |  7 ++--
 .../flink/runtime/minicluster/MiniCluster.java  |  2 ++
 .../resourcemanager/ResourceManager.java        |  5 ++-
 .../resourcemanager/ResourceManagerRunner.java  |  2 ++
 .../StandaloneResourceManager.java              |  2 ++
 .../apache/flink/runtime/rpc/RpcEndpoint.java   | 30 ++++++++++++++--
 .../flink/runtime/rpc/RpcServiceUtils.java      | 25 ++++++++++++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |  2 +-
 .../runtime/taskexecutor/TaskExecutor.java      | 38 ++++++++++----------
 .../runtime/taskexecutor/TaskManagerRunner.java |  2 +-
 .../clusterframework/ResourceManagerTest.java   |  4 ++-
 .../TestingHighAvailabilityServices.java        |  5 ---
 .../flink/runtime/jobmaster/JobMasterTest.java  |  2 +-
 .../resourcemanager/ResourceManagerHATest.java  |  8 +++--
 .../ResourceManagerJobMasterTest.java           |  5 ++-
 .../ResourceManagerTaskExecutorTest.java        |  2 ++
 .../slotmanager/SlotProtocolTest.java           |  9 +++--
 .../taskexecutor/TaskExecutorITCase.java        |  7 ++--
 .../runtime/taskexecutor/TaskExecutorTest.java  | 18 +++++-----
 .../yarn/YarnFlinkApplicationMasterRunner.java  |  5 ++-
 .../apache/flink/yarn/YarnResourceManager.java  |  2 ++
 .../yarn/configuration/YarnConfigOptions.java   |  2 +-
 .../AbstractYarnNonHaServices.java              | 12 -------
 .../YarnPreConfiguredMasterNonHaServices.java   |  3 +-
 .../YarnPreConfiguredMasterHaServicesTest.java  | 30 ----------------
 30 files changed, 135 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
index a417599..8bf81eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -43,12 +43,6 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High
 	// ------------------------------------------------------------------------
 
 	@Override
-	public String getResourceManagerEndpointName() {
-		// dynamic actor name
-		return null;
-	}
-
-	@Override
 	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
 		return resourceManagerLeaderService.createLeaderRetrievalService();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 4169204..3de75d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -57,12 +57,6 @@ public interface HighAvailabilityServices extends AutoCloseable {
 	UUID DEFAULT_LEADER_ID = new UUID(0, 0);
 
 	// ------------------------------------------------------------------------
-	//  Endpoint Naming
-	// ------------------------------------------------------------------------
-
-	String getResourceManagerEndpointName();
-
-	// ------------------------------------------------------------------------
 	//  Services
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index d644fb9..beb5963 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -54,16 +54,6 @@ public class NonHaServices extends AbstractNonHaServices implements HighAvailabi
 	}
 
 	// ------------------------------------------------------------------------
-	//  Names
-	// ------------------------------------------------------------------------
-
-	@Override
-	public String getResourceManagerEndpointName() {
-		return RESOURCE_MANAGER_RPC_ENDPOINT_NAME;
-	}
-
-
-	// ------------------------------------------------------------------------
 	//  Services
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index 741f9e6..4d0db0a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -112,16 +112,6 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public String getResourceManagerEndpointName() {
-		// since the resource manager name must be dynamic, we return null here
-		return null;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Services
-	// ------------------------------------------------------------------------
-
-	@Override
 	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
 		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 33ee29d..d7fae35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -188,10 +188,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 
 			// now start the JobManager
 			this.jobManager = new JobMaster(
+				rpcService,
 				resourceId,
 				jobGraph,
 				configuration,
-				rpcService,
 				haServices,
 				heartbeatServices,
 				jobManagerServices.executorService,

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 080b48e..6fe8cb3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
@@ -87,6 +88,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
 import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -188,10 +190,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	// ------------------------------------------------------------------------
 
 	public JobMaster(
+			RpcService rpcService,
 			ResourceID resourceId,
 			JobGraph jobGraph,
 			Configuration configuration,
-			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityService,
 			HeartbeatServices heartbeatServices,
 			ScheduledExecutorService executor,
@@ -202,7 +204,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			OnCompletionActions jobCompletionActions,
 			FatalErrorHandler errorHandler,
 			ClassLoader userCodeLoader) throws Exception {
-		super(rpcService);
+
+		super(rpcService, RpcServiceUtils.createRandomName(JobManager.JOB_MANAGER_NAME()));
 
 		this.resourceId = checkNotNull(resourceId);
 		this.jobGraph = checkNotNull(jobGraph);

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 9d5f9d1..03fbef5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -524,6 +525,7 @@ public class MiniCluster {
 
 			resourceManagerRunners[i] = new ResourceManagerRunner(
 				ResourceID.generate(),
+				FlinkResourceManager.RESOURCE_MANAGER_NAME + '_' + i,
 				configuration,
 				resourceManagerRpcServices[i],
 				haServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 5467177..c0ff412 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -131,6 +131,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 	public ResourceManager(
 			RpcService rpcService,
+			String resourceManagerEndpointId,
 			ResourceID resourceId,
 			ResourceManagerConfiguration resourceManagerConfiguration,
 			HighAvailabilityServices highAvailabilityServices,
@@ -140,7 +141,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler) {
 
-		super(rpcService);
+		super(rpcService, resourceManagerEndpointId);
 
 		this.resourceId = checkNotNull(resourceId);
 		this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration);
@@ -162,6 +163,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		infoMessageListeners = new ConcurrentHashMap<>(8);
 	}
 
+
+
 	// ------------------------------------------------------------------------
 	//  RPC lifecycle methods
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 3a8baa6..c6d64db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -46,6 +46,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 
 	public ResourceManagerRunner(
 			final ResourceID resourceId,
+			final String resourceManagerEndpointId,
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices highAvailabilityServices,
@@ -70,6 +71,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 
 		this.resourceManager = new StandaloneResourceManager(
 			rpcService,
+			resourceManagerEndpointId,
 			resourceId,
 			resourceManagerConfiguration,
 			highAvailabilityServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index fd5a001..389ca5c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -39,6 +39,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
 	public StandaloneResourceManager(
 			RpcService rpcService,
+			String resourceManagerEndpointId,
 			ResourceID resourceId,
 			ResourceManagerConfiguration resourceManagerConfiguration,
 			HighAvailabilityServices highAvailabilityServices,
@@ -49,6 +50,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 			FatalErrorHandler fatalErrorHandler) {
 		super(
 			rpcService,
+			resourceManagerEndpointId,
 			resourceId,
 			resourceManagerConfiguration,
 			highAvailabilityServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index f30e345..40b9568 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -63,6 +64,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	/** RPC service to be used to start the RPC server and to obtain rpc gateways */
 	private final RpcService rpcService;
 
+	/** Unique identifier for this rpc endpoint */
+	private final String endpointId;
+
 	/** Class of the self gateway */
 	private final Class<C> selfGatewayType;
 
@@ -79,10 +83,12 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	/**
 	 * Initializes the RPC endpoint.
 	 * 
-	 * @param rpcService The RPC server that dispatches calls to this RPC endpoint. 
+	 * @param rpcService The RPC server that dispatches calls to this RPC endpoint.
+	 * @param endpointId Unique identifier for this endpoint
 	 */
-	protected RpcEndpoint(final RpcService rpcService) {
+	protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
 		this.rpcService = checkNotNull(rpcService, "rpcService");
+		this.endpointId = checkNotNull(endpointId, "endpointId");
 
 		// IMPORTANT: Don't change order of selfGatewayType and self because rpcService.startServer
 		// requires that selfGatewayType has been initialized
@@ -93,6 +99,15 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	}
 
 	/**
+	 * Initializes the RPC endpoint with a random endpoint id.
+	 *
+	 * @param rpcService The RPC server that dispatches calls to this RPC endpoint.
+	 */
+	protected RpcEndpoint(final RpcService rpcService) {
+		this(rpcService, UUID.randomUUID().toString());
+	}
+
+	/**
 	 * Returns the class of the self gateway type.
 	 *
 	 * @return Class of the self gateway type
@@ -100,7 +115,16 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	public final Class<C> getSelfGatewayType() {
 		return selfGatewayType;
 	}
-	
+
+	/**
+	 * Returns the rpc endpoint's identifier.
+	 *
+	 * @return Rpc endpoint's identifier.
+	 */
+	public String getEndpointId() {
+		return endpointId;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Start & Shutdown
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
index 018c3ed..e555e7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.util.NetUtils;
 
+import org.apache.flink.util.Preconditions;
 import org.jboss.netty.channel.ChannelException;
 
 import org.slf4j.Logger;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -47,6 +49,8 @@ public class RpcServiceUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RpcServiceUtils.class);
 
+	private static final AtomicLong nextNameOffset = new AtomicLong(0L);
+
 	// ------------------------------------------------------------------------
 	//  RPC instantiation
 	// ------------------------------------------------------------------------
@@ -105,7 +109,7 @@ public class RpcServiceUtils {
 	 * @param hostname     The hostname or address where the target RPC service is listening.
 	 * @param port         The port where the target RPC service is listening.
 	 * @param endpointName The name of the RPC endpoint.
-	 * @param config       Teh configuration from which to deduce further settings.
+	 * @param config       The configuration from which to deduce further settings.
 	 *
 	 * @return The RPC URL of the specified RPC endpoint.
 	 */
@@ -144,6 +148,25 @@ public class RpcServiceUtils {
 		return String.format("%s://flink@%s/user/%s", protocol, hostPort, endpointName);
 	}
 
+	/**
+	 * Creates a random name of the form prefix_X, where X is an increasing number.
+	 *
+	 * @param prefix Prefix string to prepend to the monotonically increasing name offset number
+	 * @return A random name of the form prefix_X where X is an increasing number
+	 */
+	public static String createRandomName(String prefix) {
+		Preconditions.checkNotNull(prefix, "Prefix must not be null.");
+
+		long nameOffset;
+
+		// obtain the next name offset by incrementing it atomically
+		do {
+			nameOffset = nextNameOffset.get();
+		} while (!nextNameOffset.compareAndSet(nameOffset, nameOffset + 1L));
+
+		return prefix + '_' + nameOffset;
+	}
+
 	// ------------------------------------------------------------------------
 
 	/** This class is not meant to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index f5ccdbb..d182b03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -187,7 +187,7 @@ public class AkkaRpcService implements RpcService {
 
 		synchronized (lock) {
 			checkState(!stopped, "RpcService is stopped");
-			actorRef = actorSystem.actorOf(akkaRpcActorProps);
+			actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
 			actors.add(actorRef);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 6ad0bd9..d04cabb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -67,6 +67,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
 import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
 import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
@@ -85,6 +86,7 @@ import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
@@ -167,24 +169,24 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	// ------------------------------------------------------------------------
 
 	public TaskExecutor(
-		TaskManagerConfiguration taskManagerConfiguration,
-		TaskManagerLocation taskManagerLocation,
-		RpcService rpcService,
-		MemoryManager memoryManager,
-		IOManager ioManager,
-		NetworkEnvironment networkEnvironment,
-		HighAvailabilityServices haServices,
-		HeartbeatServices heartbeatServices,
-		MetricRegistry metricRegistry,
-		TaskManagerMetricGroup taskManagerMetricGroup,
-		BroadcastVariableManager broadcastVariableManager,
-		FileCache fileCache,
-		TaskSlotTable taskSlotTable,
-		JobManagerTable jobManagerTable,
-		JobLeaderService jobLeaderService,
-		FatalErrorHandler fatalErrorHandler) {
-
-		super(rpcService);
+			RpcService rpcService,
+			TaskManagerConfiguration taskManagerConfiguration,
+			TaskManagerLocation taskManagerLocation,
+			MemoryManager memoryManager,
+			IOManager ioManager,
+			NetworkEnvironment networkEnvironment,
+			HighAvailabilityServices haServices,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			TaskManagerMetricGroup taskManagerMetricGroup,
+			BroadcastVariableManager broadcastVariableManager,
+			FileCache fileCache,
+			TaskSlotTable taskSlotTable,
+			JobManagerTable jobManagerTable,
+			JobLeaderService jobLeaderService,
+			FatalErrorHandler fatalErrorHandler) {
+
+		super(rpcService, RpcServiceUtils.createRandomName(TaskManager.TASK_MANAGER_NAME()));
 
 		checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0.");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index c99eb91..2be8ff1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -123,9 +123,9 @@ public class TaskManagerRunner implements FatalErrorHandler {
 		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
 
 		this.taskManager = new TaskExecutor(
+			rpcService,
 			taskManagerConfiguration,
 			taskManagerServices.getTaskManagerLocation(),
-			rpcService,
 			taskManagerServices.getMemoryManager(),
 			taskManagerServices.getIOManager(),
 			taskManagerServices.getNetworkEnvironment(),

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 72925bb..3464129 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -394,7 +394,9 @@ public class ResourceManagerTest extends TestLogger {
 
 		try {
 			final StandaloneResourceManager resourceManager = new StandaloneResourceManager(
-				rpcService, resourceManagerResourceID,
+				rpcService,
+				FlinkResourceManager.RESOURCE_MANAGER_NAME,
+				resourceManagerResourceID,
 				resourceManagerConfiguration,
 				highAvailabilityServices,
 				heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 3f9865c..37a86c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -155,11 +155,6 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 		return new VoidBlobStore();
 	}
 
-	@Override
-	public String getResourceManagerEndpointName() {
-		throw new UnsupportedOperationException();
-	}
-
 	// ------------------------------------------------------------------------
 	//  Shutdown
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index ee8f51d..8b9b800 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -92,10 +92,10 @@ public class JobMasterTest extends TestLogger {
 
 		try {
 			final JobMaster jobMaster = new JobMaster(
+				rpc,
 				jmResourceId,
 				jobGraph,
 				new Configuration(),
-				rpc,
 				haServices,
 				heartbeatServices,
 				Executors.newScheduledThreadPool(1),

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index c8e209d..49bc570 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -28,6 +29,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -38,7 +40,7 @@ import static org.mockito.Mockito.mock;
 /**
  * resourceManager HA test, including grant leadership and revoke leadership
  */
-public class ResourceManagerHATest {
+public class ResourceManagerHATest extends TestLogger {
 
 	@Test
 	public void testGrantAndRevokeLeadership() throws Exception {
@@ -68,7 +70,9 @@ public class ResourceManagerHATest {
 
 		final ResourceManager resourceManager =
 			new StandaloneResourceManager(
-				rpcService, rmResourceId,
+				rpcService,
+				FlinkResourceManager.RESOURCE_MANAGER_NAME,
+				rmResourceId,
 				resourceManagerConfiguration,
 				highAvailabilityServices,
 				heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 32b40ac..b6b8614 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -217,7 +218,9 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			Time.minutes(5L));
 
 		ResourceManager resourceManager = new StandaloneResourceManager(
-			rpcService, rmResourceId,
+			rpcService,
+			FlinkResourceManager.RESOURCE_MANAGER_NAME,
+			rmResourceId,
 			resourceManagerConfiguration,
 			highAvailabilityServices,
 			heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index cb0a414..bae7086 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -165,6 +166,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		StandaloneResourceManager resourceManager =
 			new StandaloneResourceManager(
 				rpcService,
+				FlinkResourceManager.RESOURCE_MANAGER_NAME,
 				resourceManagerResourceID,
 				resourceManagerConfiguration,
 				highAvailabilityServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 68aff42..4d2309a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -237,7 +238,9 @@ public class SlotProtocolTest extends TestLogger {
 
 		ResourceManager<ResourceID> resourceManager =
 			Mockito.spy(new StandaloneResourceManager(
-				testRpcService, rmResourceId,
+				testRpcService,
+				FlinkResourceManager.RESOURCE_MANAGER_NAME,
+				rmResourceId,
 				resourceManagerConfiguration,
 				testingHaServices,
 				heartbeatServices,
@@ -324,7 +327,9 @@ public class SlotProtocolTest extends TestLogger {
 				JobLeaderIdService jobLeaderIdService,
 				FatalErrorHandler fatalErrorHandler) {
 			super(
-				rpcService, resourceId,
+				rpcService,
+				FlinkResourceManager.RESOURCE_MANAGER_NAME,
+				resourceId,
 				resourceManagerConfiguration,
 				highAvailabilityServices,
 				heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index fdcd282..fa326b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -121,7 +122,9 @@ public class TaskExecutorITCase extends TestLogger {
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 
 		ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager(
-			rpcService, rmResourceId,
+			rpcService,
+			FlinkResourceManager.RESOURCE_MANAGER_NAME,
+			rmResourceId,
 			resourceManagerConfiguration,
 			testingHAServices,
 			heartbeatServices,
@@ -131,9 +134,9 @@ public class TaskExecutorITCase extends TestLogger {
 			testingFatalErrorHandler);
 
 		TaskExecutor taskExecutor = new TaskExecutor(
+			rpcService,
 			taskManagerConfiguration,
 			taskManagerLocation,
-			rpcService,
 			memoryManager,
 			ioManager,
 			networkEnvironment,

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 330d4fb..bc6fe68 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -169,9 +169,9 @@ public class TaskExecutorTest extends TestLogger {
 
 		try {
 			final TaskExecutor taskManager = new TaskExecutor(
+				rpc,
 				tmConfig,
 				taskManagerLocation,
-				rpc,
 				mock(MemoryManager.class),
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
@@ -275,9 +275,9 @@ public class TaskExecutorTest extends TestLogger {
 
 		try {
 			final TaskExecutor taskManager = new TaskExecutor(
+				rpc,
 				taskManagerConfiguration,
 				taskManagerLocation,
-				rpc,
 				mock(MemoryManager.class),
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
@@ -344,9 +344,9 @@ public class TaskExecutorTest extends TestLogger {
 			final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
 			TaskExecutor taskManager = new TaskExecutor(
+				rpc,
 				taskManagerServicesConfiguration,
 				taskManagerLocation,
-				rpc,
 				mock(MemoryManager.class),
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
@@ -427,9 +427,9 @@ public class TaskExecutorTest extends TestLogger {
 			final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
 			TaskExecutor taskManager = new TaskExecutor(
+				rpc,
 				taskManagerServicesConfiguration,
 				taskManagerLocation,
-				rpc,
 				mock(MemoryManager.class),
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
@@ -559,9 +559,9 @@ public class TaskExecutorTest extends TestLogger {
 			final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
 			TaskExecutor taskManager = new TaskExecutor(
+				rpc,
 				taskManagerConfiguration,
 				mock(TaskManagerLocation.class),
-				rpc,
 				mock(MemoryManager.class),
 				mock(IOManager.class),
 				networkEnvironment,
@@ -676,9 +676,9 @@ public class TaskExecutorTest extends TestLogger {
 
 		try {
 			TaskExecutor taskManager = new TaskExecutor(
+				rpc,
 				taskManagerConfiguration,
 				taskManagerLocation,
-				rpc,
 				mock(MemoryManager.class),
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
@@ -790,9 +790,9 @@ public class TaskExecutorTest extends TestLogger {
 
 		try {
 			TaskExecutor taskManager = new TaskExecutor(
+				rpc,
 				taskManagerConfiguration,
 				taskManagerLocation,
-				rpc,
 				mock(MemoryManager.class),
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
@@ -867,9 +867,9 @@ public class TaskExecutorTest extends TestLogger {
 			final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
 			TaskExecutor taskManager = new TaskExecutor(
+				rpc,
 				taskManagerServicesConfiguration,
 				taskManagerLocation,
-				rpc,
 				mock(MemoryManager.class),
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
@@ -1011,9 +1011,9 @@ public class TaskExecutorTest extends TestLogger {
 
 		try {
 			final TaskExecutor taskManager = new TaskExecutor(
+				rpc,
 				taskManagerConfiguration,
 				taskManagerLocation,
-				rpc,
 				mock(MemoryManager.class),
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 4c935f1..65d12b5 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -199,7 +200,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 			commonRpcService.getScheduledExecutor());
 
 		return new YarnResourceManager(
-			commonRpcService, ResourceID.generate(),
+			commonRpcService,
+			FlinkResourceManager.RESOURCE_MANAGER_NAME,
+			ResourceID.generate(),
 			config,
 			ENV,
 			resourceManagerConfiguration,

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index f8cf275..74359c8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -108,6 +108,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 
 	public YarnResourceManager(
 			RpcService rpcService,
+			String resourceManagerEndpointId,
 			ResourceID resourceId,
 			Configuration flinkConfig,
 			Map<String, String> env,
@@ -120,6 +121,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 			FatalErrorHandler fatalErrorHandler) {
 		super(
 			rpcService,
+			resourceManagerEndpointId,
 			resourceId,
 			resourceManagerConfiguration,
 			highAvailabilityServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index c3902d3..071bb7d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -39,7 +39,7 @@ public class YarnConfigOptions {
 	 * The port where the application master RPC system is listening.
 	 */
 	public static final ConfigOption<Integer> APP_MASTER_RPC_PORT =
-			key("yarn.appmaster.rpc.address")
+			key("yarn.appmaster.rpc.port")
 			.defaultValue(-1);
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
index 7aa481f..f1656aa 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
@@ -38,9 +38,6 @@ import java.io.IOException;
  */
 public abstract class AbstractYarnNonHaServices extends YarnHighAvailabilityServices {
 
-	/** The constant name of the ResourceManager RPC endpoint */
-	protected static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";
-
 	// ------------------------------------------------------------------------
 
 	/**
@@ -62,15 +59,6 @@ public abstract class AbstractYarnNonHaServices extends YarnHighAvailabilityServ
 	}
 
 	// ------------------------------------------------------------------------
-	//  Names
-	// ------------------------------------------------------------------------
-
-	@Override
-	public String getResourceManagerEndpointName() {
-		return RESOURCE_MANAGER_RPC_ENDPOINT_NAME;
-	}
-
-	// ------------------------------------------------------------------------
 	//  Services
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
index eb4b77e..72d780f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn.highavailability;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -107,7 +108,7 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi
 			}
 
 			this.resourceManagerRpcUrl = RpcServiceUtils.getRpcUrl(
-					rmHost, rmPort, RESOURCE_MANAGER_RPC_ENDPOINT_NAME, config);
+					rmHost, rmPort, FlinkResourceManager.RESOURCE_MANAGER_NAME, config);
 
 			// all well!
 			successful = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
index a13deac..c730102 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
@@ -23,8 +23,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -40,14 +38,9 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.FileNotFoundException;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import static org.mockito.Mockito.*;
-
-
 public class YarnPreConfiguredMasterHaServicesTest extends TestLogger {
 
 	@ClassRule
@@ -95,29 +88,6 @@ public class YarnPreConfiguredMasterHaServicesTest extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	@Test
-	public void testConstantResourceManagerName() throws Exception {
-		final Configuration flinkConfig = new Configuration();
-		flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
-		flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
-
-		YarnHighAvailabilityServices services1 = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
-		YarnHighAvailabilityServices services2 = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
-
-		try {
-			String rmName1 = services1.getResourceManagerEndpointName();
-			String rmName2 = services2.getResourceManagerEndpointName();
-
-			assertNotNull(rmName1);
-			assertNotNull(rmName2);
-			assertEquals(rmName1, rmName2);
-		}
-		finally {
-			services1.closeAndCleanupAllData();
-			services2.closeAndCleanupAllData();
-		}
-	}
-
-	@Test
 	public void testMissingRmConfiguration() throws Exception {
 		final Configuration flinkConfig = new Configuration();