You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/04/25 18:51:44 UTC
flink git commit: [FLINK-6155] Introduce an endpoint id for
RpcEndpoints
Repository: flink
Updated Branches:
refs/heads/master 651e8561f -> 433a345ed
[FLINK-6155] Introduce an endpoint id for RpcEndpoints
An endpoint id allows to assign a unique name to a RpcEndpoint. This name can be used
to look up the rpc endpoint within the RpcService.
Remove component endpoint name methods from the HighAvailabilityServices
Remove JobMaster endpoint id
This closes #3596.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/433a345e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/433a345e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/433a345e
Branch: refs/heads/master
Commit: 433a345edccdee29385957841e6513679690a5e9
Parents: 651e856
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Mar 22 09:09:02 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Apr 25 20:50:40 2017 +0200
----------------------------------------------------------------------
.../highavailability/EmbeddedNonHaServices.java | 6 ----
.../HighAvailabilityServices.java | 6 ----
.../runtime/highavailability/NonHaServices.java | 10 ------
.../highavailability/ZookeeperHaServices.java | 10 ------
.../runtime/jobmaster/JobManagerRunner.java | 2 +-
.../flink/runtime/jobmaster/JobMaster.java | 7 ++--
.../flink/runtime/minicluster/MiniCluster.java | 2 ++
.../resourcemanager/ResourceManager.java | 5 ++-
.../resourcemanager/ResourceManagerRunner.java | 2 ++
.../StandaloneResourceManager.java | 2 ++
.../apache/flink/runtime/rpc/RpcEndpoint.java | 30 ++++++++++++++--
.../flink/runtime/rpc/RpcServiceUtils.java | 25 ++++++++++++-
.../flink/runtime/rpc/akka/AkkaRpcService.java | 2 +-
.../runtime/taskexecutor/TaskExecutor.java | 38 ++++++++++----------
.../runtime/taskexecutor/TaskManagerRunner.java | 2 +-
.../clusterframework/ResourceManagerTest.java | 4 ++-
.../TestingHighAvailabilityServices.java | 5 ---
.../flink/runtime/jobmaster/JobMasterTest.java | 2 +-
.../resourcemanager/ResourceManagerHATest.java | 8 +++--
.../ResourceManagerJobMasterTest.java | 5 ++-
.../ResourceManagerTaskExecutorTest.java | 2 ++
.../slotmanager/SlotProtocolTest.java | 9 +++--
.../taskexecutor/TaskExecutorITCase.java | 7 ++--
.../runtime/taskexecutor/TaskExecutorTest.java | 18 +++++-----
.../yarn/YarnFlinkApplicationMasterRunner.java | 5 ++-
.../apache/flink/yarn/YarnResourceManager.java | 2 ++
.../yarn/configuration/YarnConfigOptions.java | 2 +-
.../AbstractYarnNonHaServices.java | 12 -------
.../YarnPreConfiguredMasterNonHaServices.java | 3 +-
.../YarnPreConfiguredMasterHaServicesTest.java | 30 ----------------
30 files changed, 135 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
index a417599..8bf81eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -43,12 +43,6 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High
// ------------------------------------------------------------------------
@Override
- public String getResourceManagerEndpointName() {
- // dynamic actor name
- return null;
- }
-
- @Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return resourceManagerLeaderService.createLeaderRetrievalService();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 4169204..3de75d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -57,12 +57,6 @@ public interface HighAvailabilityServices extends AutoCloseable {
UUID DEFAULT_LEADER_ID = new UUID(0, 0);
// ------------------------------------------------------------------------
- // Endpoint Naming
- // ------------------------------------------------------------------------
-
- String getResourceManagerEndpointName();
-
- // ------------------------------------------------------------------------
// Services
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index d644fb9..beb5963 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -54,16 +54,6 @@ public class NonHaServices extends AbstractNonHaServices implements HighAvailabi
}
// ------------------------------------------------------------------------
- // Names
- // ------------------------------------------------------------------------
-
- @Override
- public String getResourceManagerEndpointName() {
- return RESOURCE_MANAGER_RPC_ENDPOINT_NAME;
- }
-
-
- // ------------------------------------------------------------------------
// Services
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index 741f9e6..4d0db0a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -112,16 +112,6 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
// ------------------------------------------------------------------------
@Override
- public String getResourceManagerEndpointName() {
- // since the resource manager name must be dynamic, we return null here
- return null;
- }
-
- // ------------------------------------------------------------------------
- // Services
- // ------------------------------------------------------------------------
-
- @Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 33ee29d..d7fae35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -188,10 +188,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
// now start the JobManager
this.jobManager = new JobMaster(
+ rpcService,
resourceId,
jobGraph,
configuration,
- rpcService,
haServices,
heartbeatServices,
jobManagerServices.executorService,
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 080b48e..6fe8cb3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
@@ -87,6 +88,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -188,10 +190,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
// ------------------------------------------------------------------------
public JobMaster(
+ RpcService rpcService,
ResourceID resourceId,
JobGraph jobGraph,
Configuration configuration,
- RpcService rpcService,
HighAvailabilityServices highAvailabilityService,
HeartbeatServices heartbeatServices,
ScheduledExecutorService executor,
@@ -202,7 +204,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
OnCompletionActions jobCompletionActions,
FatalErrorHandler errorHandler,
ClassLoader userCodeLoader) throws Exception {
- super(rpcService);
+
+ super(rpcService, RpcServiceUtils.createRandomName(JobManager.JOB_MANAGER_NAME()));
this.resourceId = checkNotNull(resourceId);
this.jobGraph = checkNotNull(jobGraph);
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 9d5f9d1..03fbef5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -524,6 +525,7 @@ public class MiniCluster {
resourceManagerRunners[i] = new ResourceManagerRunner(
ResourceID.generate(),
+ FlinkResourceManager.RESOURCE_MANAGER_NAME + '_' + i,
configuration,
resourceManagerRpcServices[i],
haServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 5467177..c0ff412 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -131,6 +131,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
public ResourceManager(
RpcService rpcService,
+ String resourceManagerEndpointId,
ResourceID resourceId,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
@@ -140,7 +141,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
- super(rpcService);
+ super(rpcService, resourceManagerEndpointId);
this.resourceId = checkNotNull(resourceId);
this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration);
@@ -162,6 +163,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
infoMessageListeners = new ConcurrentHashMap<>(8);
}
+
+
// ------------------------------------------------------------------------
// RPC lifecycle methods
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 3a8baa6..c6d64db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -46,6 +46,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
public ResourceManagerRunner(
final ResourceID resourceId,
+ final String resourceManagerEndpointId,
final Configuration configuration,
final RpcService rpcService,
final HighAvailabilityServices highAvailabilityServices,
@@ -70,6 +71,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
this.resourceManager = new StandaloneResourceManager(
rpcService,
+ resourceManagerEndpointId,
resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index fd5a001..389ca5c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -39,6 +39,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
public StandaloneResourceManager(
RpcService rpcService,
+ String resourceManagerEndpointId,
ResourceID resourceId,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
@@ -49,6 +50,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
FatalErrorHandler fatalErrorHandler) {
super(
rpcService,
+ resourceManagerEndpointId,
resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index f30e345..40b9568 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -63,6 +64,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
/** RPC service to be used to start the RPC server and to obtain rpc gateways */
private final RpcService rpcService;
+ /** Unique identifier for this rpc endpoint */
+ private final String endpointId;
+
/** Class of the self gateway */
private final Class<C> selfGatewayType;
@@ -79,10 +83,12 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
/**
* Initializes the RPC endpoint.
*
- * @param rpcService The RPC server that dispatches calls to this RPC endpoint.
+ * @param rpcService The RPC server that dispatches calls to this RPC endpoint.
+ * @param endpointId Unique identifier for this endpoint
*/
- protected RpcEndpoint(final RpcService rpcService) {
+ protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
this.rpcService = checkNotNull(rpcService, "rpcService");
+ this.endpointId = checkNotNull(endpointId, "endpointId");
// IMPORTANT: Don't change order of selfGatewayType and self because rpcService.startServer
// requires that selfGatewayType has been initialized
@@ -93,6 +99,15 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
}
/**
+ * Initializes the RPC endpoint with a random endpoint id.
+ *
+ * @param rpcService The RPC server that dispatches calls to this RPC endpoint.
+ */
+ protected RpcEndpoint(final RpcService rpcService) {
+ this(rpcService, UUID.randomUUID().toString());
+ }
+
+ /**
* Returns the class of the self gateway type.
*
* @return Class of the self gateway type
@@ -100,7 +115,16 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
public final Class<C> getSelfGatewayType() {
return selfGatewayType;
}
-
+
+ /**
+ * Returns the rpc endpoint's identifier.
+ *
+ * @return Rpc endpoint's identifier.
+ */
+ public String getEndpointId() {
+ return endpointId;
+ }
+
// ------------------------------------------------------------------------
// Start & Shutdown
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
index 018c3ed..e555e7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.Preconditions;
import org.jboss.netty.channel.ChannelException;
import org.slf4j.Logger;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicLong;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -47,6 +49,8 @@ public class RpcServiceUtils {
private static final Logger LOG = LoggerFactory.getLogger(RpcServiceUtils.class);
+ private static final AtomicLong nextNameOffset = new AtomicLong(0L);
+
// ------------------------------------------------------------------------
// RPC instantiation
// ------------------------------------------------------------------------
@@ -105,7 +109,7 @@ public class RpcServiceUtils {
* @param hostname The hostname or address where the target RPC service is listening.
* @param port The port where the target RPC service is listening.
* @param endpointName The name of the RPC endpoint.
- * @param config Teh configuration from which to deduce further settings.
+ * @param config The configuration from which to deduce further settings.
*
* @return The RPC URL of the specified RPC endpoint.
*/
@@ -144,6 +148,25 @@ public class RpcServiceUtils {
return String.format("%s://flink@%s/user/%s", protocol, hostPort, endpointName);
}
+ /**
+ * Creates a random name of the form prefix_X, where X is an increasing number.
+ *
+ * @param prefix Prefix string to prepend to the monotonically increasing name offset number
+ * @return A random name of the form prefix_X where X is an increasing number
+ */
+ public static String createRandomName(String prefix) {
+ Preconditions.checkNotNull(prefix, "Prefix must not be null.");
+
+ long nameOffset;
+
+ // obtain the next name offset by incrementing it atomically
+ do {
+ nameOffset = nextNameOffset.get();
+ } while (!nextNameOffset.compareAndSet(nameOffset, nameOffset + 1L));
+
+ return prefix + '_' + nameOffset;
+ }
+
// ------------------------------------------------------------------------
/** This class is not meant to be instantiated */
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index f5ccdbb..d182b03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -187,7 +187,7 @@ public class AkkaRpcService implements RpcService {
synchronized (lock) {
checkState(!stopped, "RpcService is stopped");
- actorRef = actorSystem.actorOf(akkaRpcActorProps);
+ actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
actors.add(actorRef);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 6ad0bd9..d04cabb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -67,6 +67,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
@@ -85,6 +86,7 @@ import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
@@ -167,24 +169,24 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
// ------------------------------------------------------------------------
public TaskExecutor(
- TaskManagerConfiguration taskManagerConfiguration,
- TaskManagerLocation taskManagerLocation,
- RpcService rpcService,
- MemoryManager memoryManager,
- IOManager ioManager,
- NetworkEnvironment networkEnvironment,
- HighAvailabilityServices haServices,
- HeartbeatServices heartbeatServices,
- MetricRegistry metricRegistry,
- TaskManagerMetricGroup taskManagerMetricGroup,
- BroadcastVariableManager broadcastVariableManager,
- FileCache fileCache,
- TaskSlotTable taskSlotTable,
- JobManagerTable jobManagerTable,
- JobLeaderService jobLeaderService,
- FatalErrorHandler fatalErrorHandler) {
-
- super(rpcService);
+ RpcService rpcService,
+ TaskManagerConfiguration taskManagerConfiguration,
+ TaskManagerLocation taskManagerLocation,
+ MemoryManager memoryManager,
+ IOManager ioManager,
+ NetworkEnvironment networkEnvironment,
+ HighAvailabilityServices haServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ TaskManagerMetricGroup taskManagerMetricGroup,
+ BroadcastVariableManager broadcastVariableManager,
+ FileCache fileCache,
+ TaskSlotTable taskSlotTable,
+ JobManagerTable jobManagerTable,
+ JobLeaderService jobLeaderService,
+ FatalErrorHandler fatalErrorHandler) {
+
+ super(rpcService, RpcServiceUtils.createRandomName(TaskManager.TASK_MANAGER_NAME()));
checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0.");
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index c99eb91..2be8ff1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -123,9 +123,9 @@ public class TaskManagerRunner implements FatalErrorHandler {
TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
this.taskManager = new TaskExecutor(
+ rpcService,
taskManagerConfiguration,
taskManagerServices.getTaskManagerLocation(),
- rpcService,
taskManagerServices.getMemoryManager(),
taskManagerServices.getIOManager(),
taskManagerServices.getNetworkEnvironment(),
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 72925bb..3464129 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -394,7 +394,9 @@ public class ResourceManagerTest extends TestLogger {
try {
final StandaloneResourceManager resourceManager = new StandaloneResourceManager(
- rpcService, resourceManagerResourceID,
+ rpcService,
+ FlinkResourceManager.RESOURCE_MANAGER_NAME,
+ resourceManagerResourceID,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 3f9865c..37a86c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -155,11 +155,6 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
return new VoidBlobStore();
}
- @Override
- public String getResourceManagerEndpointName() {
- throw new UnsupportedOperationException();
- }
-
// ------------------------------------------------------------------------
// Shutdown
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index ee8f51d..8b9b800 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -92,10 +92,10 @@ public class JobMasterTest extends TestLogger {
try {
final JobMaster jobMaster = new JobMaster(
+ rpc,
jmResourceId,
jobGraph,
new Configuration(),
- rpc,
haServices,
heartbeatServices,
Executors.newScheduledThreadPool(1),
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index c8e209d..49bc570 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -28,6 +29,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
@@ -38,7 +40,7 @@ import static org.mockito.Mockito.mock;
/**
* resourceManager HA test, including grant leadership and revoke leadership
*/
-public class ResourceManagerHATest {
+public class ResourceManagerHATest extends TestLogger {
@Test
public void testGrantAndRevokeLeadership() throws Exception {
@@ -68,7 +70,9 @@ public class ResourceManagerHATest {
final ResourceManager resourceManager =
new StandaloneResourceManager(
- rpcService, rmResourceId,
+ rpcService,
+ FlinkResourceManager.RESOURCE_MANAGER_NAME,
+ rmResourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 32b40ac..b6b8614 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -217,7 +218,9 @@ public class ResourceManagerJobMasterTest extends TestLogger {
Time.minutes(5L));
ResourceManager resourceManager = new StandaloneResourceManager(
- rpcService, rmResourceId,
+ rpcService,
+ FlinkResourceManager.RESOURCE_MANAGER_NAME,
+ rmResourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index cb0a414..bae7086 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -165,6 +166,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
StandaloneResourceManager resourceManager =
new StandaloneResourceManager(
rpcService,
+ FlinkResourceManager.RESOURCE_MANAGER_NAME,
resourceManagerResourceID,
resourceManagerConfiguration,
highAvailabilityServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 68aff42..4d2309a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -237,7 +238,9 @@ public class SlotProtocolTest extends TestLogger {
ResourceManager<ResourceID> resourceManager =
Mockito.spy(new StandaloneResourceManager(
- testRpcService, rmResourceId,
+ testRpcService,
+ FlinkResourceManager.RESOURCE_MANAGER_NAME,
+ rmResourceId,
resourceManagerConfiguration,
testingHaServices,
heartbeatServices,
@@ -324,7 +327,9 @@ public class SlotProtocolTest extends TestLogger {
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
super(
- rpcService, resourceId,
+ rpcService,
+ FlinkResourceManager.RESOURCE_MANAGER_NAME,
+ resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index fdcd282..fa326b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -121,7 +122,9 @@ public class TaskExecutorITCase extends TestLogger {
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager(
- rpcService, rmResourceId,
+ rpcService,
+ FlinkResourceManager.RESOURCE_MANAGER_NAME,
+ rmResourceId,
resourceManagerConfiguration,
testingHAServices,
heartbeatServices,
@@ -131,9 +134,9 @@ public class TaskExecutorITCase extends TestLogger {
testingFatalErrorHandler);
TaskExecutor taskExecutor = new TaskExecutor(
+ rpcService,
taskManagerConfiguration,
taskManagerLocation,
- rpcService,
memoryManager,
ioManager,
networkEnvironment,
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 330d4fb..bc6fe68 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -169,9 +169,9 @@ public class TaskExecutorTest extends TestLogger {
try {
final TaskExecutor taskManager = new TaskExecutor(
+ rpc,
tmConfig,
taskManagerLocation,
- rpc,
mock(MemoryManager.class),
mock(IOManager.class),
mock(NetworkEnvironment.class),
@@ -275,9 +275,9 @@ public class TaskExecutorTest extends TestLogger {
try {
final TaskExecutor taskManager = new TaskExecutor(
+ rpc,
taskManagerConfiguration,
taskManagerLocation,
- rpc,
mock(MemoryManager.class),
mock(IOManager.class),
mock(NetworkEnvironment.class),
@@ -344,9 +344,9 @@ public class TaskExecutorTest extends TestLogger {
final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
TaskExecutor taskManager = new TaskExecutor(
+ rpc,
taskManagerServicesConfiguration,
taskManagerLocation,
- rpc,
mock(MemoryManager.class),
mock(IOManager.class),
mock(NetworkEnvironment.class),
@@ -427,9 +427,9 @@ public class TaskExecutorTest extends TestLogger {
final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
TaskExecutor taskManager = new TaskExecutor(
+ rpc,
taskManagerServicesConfiguration,
taskManagerLocation,
- rpc,
mock(MemoryManager.class),
mock(IOManager.class),
mock(NetworkEnvironment.class),
@@ -559,9 +559,9 @@ public class TaskExecutorTest extends TestLogger {
final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
TaskExecutor taskManager = new TaskExecutor(
+ rpc,
taskManagerConfiguration,
mock(TaskManagerLocation.class),
- rpc,
mock(MemoryManager.class),
mock(IOManager.class),
networkEnvironment,
@@ -676,9 +676,9 @@ public class TaskExecutorTest extends TestLogger {
try {
TaskExecutor taskManager = new TaskExecutor(
+ rpc,
taskManagerConfiguration,
taskManagerLocation,
- rpc,
mock(MemoryManager.class),
mock(IOManager.class),
mock(NetworkEnvironment.class),
@@ -790,9 +790,9 @@ public class TaskExecutorTest extends TestLogger {
try {
TaskExecutor taskManager = new TaskExecutor(
+ rpc,
taskManagerConfiguration,
taskManagerLocation,
- rpc,
mock(MemoryManager.class),
mock(IOManager.class),
mock(NetworkEnvironment.class),
@@ -867,9 +867,9 @@ public class TaskExecutorTest extends TestLogger {
final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
TaskExecutor taskManager = new TaskExecutor(
+ rpc,
taskManagerServicesConfiguration,
taskManagerLocation,
- rpc,
mock(MemoryManager.class),
mock(IOManager.class),
mock(NetworkEnvironment.class),
@@ -1011,9 +1011,9 @@ public class TaskExecutorTest extends TestLogger {
try {
final TaskExecutor taskManager = new TaskExecutor(
+ rpc,
taskManagerConfiguration,
taskManagerLocation,
- rpc,
mock(MemoryManager.class),
mock(IOManager.class),
mock(NetworkEnvironment.class),
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 4c935f1..65d12b5 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -199,7 +200,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
commonRpcService.getScheduledExecutor());
return new YarnResourceManager(
- commonRpcService, ResourceID.generate(),
+ commonRpcService,
+ FlinkResourceManager.RESOURCE_MANAGER_NAME,
+ ResourceID.generate(),
config,
ENV,
resourceManagerConfiguration,
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index f8cf275..74359c8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -108,6 +108,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
public YarnResourceManager(
RpcService rpcService,
+ String resourceManagerEndpointId,
ResourceID resourceId,
Configuration flinkConfig,
Map<String, String> env,
@@ -120,6 +121,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
FatalErrorHandler fatalErrorHandler) {
super(
rpcService,
+ resourceManagerEndpointId,
resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index c3902d3..071bb7d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -39,7 +39,7 @@ public class YarnConfigOptions {
* The port where the application master RPC system is listening.
*/
public static final ConfigOption<Integer> APP_MASTER_RPC_PORT =
- key("yarn.appmaster.rpc.address")
+ key("yarn.appmaster.rpc.port")
.defaultValue(-1);
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
index 7aa481f..f1656aa 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
@@ -38,9 +38,6 @@ import java.io.IOException;
*/
public abstract class AbstractYarnNonHaServices extends YarnHighAvailabilityServices {
- /** The constant name of the ResourceManager RPC endpoint */
- protected static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";
-
// ------------------------------------------------------------------------
/**
@@ -62,15 +59,6 @@ public abstract class AbstractYarnNonHaServices extends YarnHighAvailabilityServ
}
// ------------------------------------------------------------------------
- // Names
- // ------------------------------------------------------------------------
-
- @Override
- public String getResourceManagerEndpointName() {
- return RESOURCE_MANAGER_RPC_ENDPOINT_NAME;
- }
-
- // ------------------------------------------------------------------------
// Services
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
index eb4b77e..72d780f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn.highavailability;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -107,7 +108,7 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi
}
this.resourceManagerRpcUrl = RpcServiceUtils.getRpcUrl(
- rmHost, rmPort, RESOURCE_MANAGER_RPC_ENDPOINT_NAME, config);
+ rmHost, rmPort, FlinkResourceManager.RESOURCE_MANAGER_NAME, config);
// all well!
successful = true;
http://git-wip-us.apache.org/repos/asf/flink/blob/433a345e/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
index a13deac..c730102 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
@@ -23,8 +23,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -40,14 +38,9 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.FileNotFoundException;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
-
-
public class YarnPreConfiguredMasterHaServicesTest extends TestLogger {
@ClassRule
@@ -95,29 +88,6 @@ public class YarnPreConfiguredMasterHaServicesTest extends TestLogger {
// ------------------------------------------------------------------------
@Test
- public void testConstantResourceManagerName() throws Exception {
- final Configuration flinkConfig = new Configuration();
- flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
- flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
-
- YarnHighAvailabilityServices services1 = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
- YarnHighAvailabilityServices services2 = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
-
- try {
- String rmName1 = services1.getResourceManagerEndpointName();
- String rmName2 = services2.getResourceManagerEndpointName();
-
- assertNotNull(rmName1);
- assertNotNull(rmName2);
- assertEquals(rmName1, rmName2);
- }
- finally {
- services1.closeAndCleanupAllData();
- services2.closeAndCleanupAllData();
- }
- }
-
- @Test
public void testMissingRmConfiguration() throws Exception {
final Configuration flinkConfig = new Configuration();