You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2017/08/03 06:58:18 UTC
[41/50] [abbrv] hadoop git commit: YARN-6102. RMActiveService context
to be updated with new RMContext on failover. Contributed by Rohith Sharma K
S.
YARN-6102. RMActiveService context to be updated with new RMContext on failover. Contributed by Rohith Sharma K S.
(cherry picked from commit a657472b42c58f87fd3165e0a746d83b72182a24)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c602f05b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c602f05b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c602f05b
Branch: refs/heads/YARN-5355_branch2
Commit: c602f05b82a4f44448ec11efe42d2d90d3b2cb59
Parents: bb4f440
Author: Sunil G <su...@apache.org>
Authored: Mon Jul 24 20:57:25 2017 +0530
Committer: Varun Saxena <va...@apache.org>
Committed: Fri Jul 28 22:48:33 2017 +0530
----------------------------------------------------------------------
...ActiveStandbyElectorBasedElectorService.java | 12 +-
.../server/resourcemanager/AdminService.java | 71 ++---
.../CuratorBasedElectorService.java | 10 +-
.../resourcemanager/RMActiveServiceContext.java | 15 +
.../server/resourcemanager/RMContextImpl.java | 294 ++++++++++---------
.../resourcemanager/RMServiceContext.java | 151 ++++++++++
.../server/resourcemanager/ResourceManager.java | 28 +-
.../yarn/server/resourcemanager/MockRM.java | 2 +-
.../resourcemanager/TestRMEmbeddedElector.java | 8 +-
.../yarn/server/resourcemanager/TestRMHA.java | 16 +-
10 files changed, 406 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
index 751eedd..7e41399 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
@@ -57,7 +57,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
- private RMContext rmContext;
+ private ResourceManager rm;
private byte[] localActiveNodeInfo;
private ActiveStandbyElector elector;
@@ -66,9 +66,9 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
@VisibleForTesting
final Object zkDisconnectLock = new Object();
- ActiveStandbyElectorBasedElectorService(RMContext rmContext) {
+ ActiveStandbyElectorBasedElectorService(ResourceManager rm) {
super(ActiveStandbyElectorBasedElectorService.class.getName());
- this.rmContext = rmContext;
+ this.rm = rm;
}
@Override
@@ -139,7 +139,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
cancelDisconnectTimer();
try {
- rmContext.getRMAdminService().transitionToActive(req);
+ rm.getRMContext().getRMAdminService().transitionToActive(req);
} catch (Exception e) {
throw new ServiceFailedException("RM could not transition to Active", e);
}
@@ -150,7 +150,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
cancelDisconnectTimer();
try {
- rmContext.getRMAdminService().transitionToStandby(req);
+ rm.getRMContext().getRMAdminService().transitionToStandby(req);
} catch (Exception e) {
LOG.error("RM could not transition to Standby", e);
}
@@ -204,7 +204,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
@SuppressWarnings(value = "unchecked")
@Override
public void notifyFatalError(String errorMessage) {
- rmContext.getDispatcher().getEventHandler().handle(
+ rm.getRMContext().getDispatcher().getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
errorMessage));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 74c87a2..afea100 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -102,7 +102,6 @@ public class AdminService extends CompositeService implements
private static final Log LOG = LogFactory.getLog(AdminService.class);
- private final RMContext rmContext;
private final ResourceManager rm;
private String rmId;
@@ -123,16 +122,16 @@ public class AdminService extends CompositeService implements
@VisibleForTesting
boolean isCentralizedNodeLabelConfiguration = true;
- public AdminService(ResourceManager rm, RMContext rmContext) {
+ public AdminService(ResourceManager rm) {
super(AdminService.class.getName());
this.rm = rm;
- this.rmContext = rmContext;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
autoFailoverEnabled =
- rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf);
+ rm.getRMContext().isHAEnabled()
+ && HAUtil.isAutomaticFailoverEnabled(conf);
masterServiceBindAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
@@ -189,7 +188,7 @@ public class AdminService extends CompositeService implements
RMPolicyProvider.getInstance());
}
- if (rmContext.isHAEnabled()) {
+ if (rm.getRMContext().isHAEnabled()) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
@@ -265,7 +264,7 @@ public class AdminService extends CompositeService implements
}
private synchronized boolean isRMActive() {
- return HAServiceState.ACTIVE == rmContext.getHAServiceState();
+ return HAServiceState.ACTIVE == rm.getRMContext().getHAServiceState();
}
private void throwStandbyException() throws StandbyException {
@@ -305,7 +304,7 @@ public class AdminService extends CompositeService implements
refreshAll();
} catch (Exception e) {
LOG.error("RefreshAll failed so firing fatal event", e);
- rmContext
+ rm.getRMContext()
.getDispatcher()
.getEventHandler()
.handle(
@@ -364,7 +363,7 @@ public class AdminService extends CompositeService implements
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
- HAServiceState haState = rmContext.getHAServiceState();
+ HAServiceState haState = rm.getRMContext().getHAServiceState();
HAServiceStatus ret = new HAServiceStatus(haState);
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
@@ -396,11 +395,12 @@ public class AdminService extends CompositeService implements
}
private void refreshQueues() throws IOException, YarnException {
- rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
+ rm.getRMContext().getScheduler().reinitialize(getConfig(),
+ this.rm.getRMContext());
// refresh the reservation system
- ReservationSystem rSystem = rmContext.getReservationSystem();
+ ReservationSystem rSystem = rm.getRMContext().getReservationSystem();
if (rSystem != null) {
- rSystem.reinitialize(getConfig(), rmContext);
+ rSystem.reinitialize(getConfig(), rm.getRMContext());
}
}
@@ -419,14 +419,14 @@ public class AdminService extends CompositeService implements
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
switch (request.getDecommissionType()) {
case NORMAL:
- rmContext.getNodesListManager().refreshNodes(conf);
+ rm.getRMContext().getNodesListManager().refreshNodes(conf);
break;
case GRACEFUL:
- rmContext.getNodesListManager().refreshNodesGracefully(
+ rm.getRMContext().getNodesListManager().refreshNodesGracefully(
conf, request.getDecommissionTimeout());
break;
case FORCEFUL:
- rmContext.getNodesListManager().refreshNodesForcefully();
+ rm.getRMContext().getNodesListManager().refreshNodesForcefully();
break;
}
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
@@ -441,7 +441,7 @@ public class AdminService extends CompositeService implements
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
- rmContext.getNodesListManager().refreshNodes(conf);
+ rm.getRMContext().getNodesListManager().refreshNodes(conf);
}
@Override
@@ -560,10 +560,11 @@ public class AdminService extends CompositeService implements
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
- rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
- rmContext.getApplicationMasterService().refreshServiceAcls(
+ rm.getRMContext().getClientRMService().refreshServiceAcls(conf,
+ policyProvider);
+ rm.getRMContext().getApplicationMasterService().refreshServiceAcls(
conf, policyProvider);
- rmContext.getResourceTrackerService().refreshServiceAcls(
+ rm.getRMContext().getResourceTrackerService().refreshServiceAcls(
conf, policyProvider);
}
@@ -602,7 +603,7 @@ public class AdminService extends CompositeService implements
// if any invalid nodes, throw exception instead of partially updating
// valid nodes.
for (NodeId nodeId : nodeIds) {
- RMNode node = this.rmContext.getRMNodes().get(nodeId);
+ RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId);
if (node == null) {
LOG.error("Resource update get failed on all nodes due to change "
+ "resource on an unrecognized node: " + nodeId);
@@ -620,14 +621,14 @@ public class AdminService extends CompositeService implements
for (Map.Entry<NodeId, ResourceOption> entry : nodeResourceMap.entrySet()) {
ResourceOption newResourceOption = entry.getValue();
NodeId nodeId = entry.getKey();
- RMNode node = this.rmContext.getRMNodes().get(nodeId);
+ RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId);
if (node == null) {
LOG.warn("Resource update get failed on an unrecognized node: " + nodeId);
allSuccess = false;
} else {
// update resource to RMNode
- this.rmContext.getDispatcher().getEventHandler()
+ this.rm.getRMContext().getDispatcher().getEventHandler()
.handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption));
LOG.info("Update resource on node(" + node.getNodeID()
+ ") with resource(" + newResourceOption.toString() + ")");
@@ -662,7 +663,8 @@ public class AdminService extends CompositeService implements
DynamicResourceConfiguration newConf;
InputStream drInputStream =
- this.rmContext.getConfigurationProvider().getConfigurationInputStream(
+ this.rm.getRMContext().getConfigurationProvider()
+ .getConfigurationInputStream(
configuration, YarnConfiguration.DR_CONFIGURATION_FILE);
if (drInputStream != null) {
@@ -680,7 +682,7 @@ public class AdminService extends CompositeService implements
updateNodeResource(updateRequest);
}
// refresh dynamic resource in ResourceTrackerService
- this.rmContext.getResourceTrackerService().
+ this.rm.getRMContext().getResourceTrackerService().
updateDynamicResourceConfiguration(newConf);
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
@@ -693,7 +695,8 @@ public class AdminService extends CompositeService implements
private synchronized Configuration getConfiguration(Configuration conf,
String... confFileNames) throws YarnException, IOException {
for (String confFileName : confFileNames) {
- InputStream confFileInputStream = this.rmContext.getConfigurationProvider()
+ InputStream confFileInputStream =
+ this.rm.getRMContext().getConfigurationProvider()
.getConfigurationInputStream(conf, confFileName);
if (confFileInputStream != null) {
conf.addResource(confFileInputStream);
@@ -747,7 +750,7 @@ public class AdminService extends CompositeService implements
AddToClusterNodeLabelsResponse response =
recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class);
try {
- rmContext.getNodeLabelManager()
+ rm.getRMContext().getNodeLabelManager()
.addToCluserNodeLabels(request.getNodeLabels());
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
@@ -770,7 +773,8 @@ public class AdminService extends CompositeService implements
RemoveFromClusterNodeLabelsResponse response =
recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class);
try {
- rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels());
+ rm.getRMContext().getNodeLabelManager()
+ .removeFromClusterNodeLabels(request.getNodeLabels());
RMAuditLogger
.logSuccess(user.getShortUserName(), operation, "AdminService");
return response;
@@ -806,19 +810,20 @@ public class AdminService extends CompositeService implements
boolean isKnown = false;
// both active and inactive nodes are recognized as known nodes
if (requestedNode.getPort() != 0) {
- if (rmContext.getRMNodes().containsKey(requestedNode)
- || rmContext.getInactiveRMNodes().containsKey(requestedNode)) {
+ if (rm.getRMContext().getRMNodes().containsKey(requestedNode) || rm
+ .getRMContext().getInactiveRMNodes().containsKey(requestedNode)) {
isKnown = true;
}
} else {
- for (NodeId knownNode : rmContext.getRMNodes().keySet()) {
+ for (NodeId knownNode : rm.getRMContext().getRMNodes().keySet()) {
if (knownNode.getHost().equals(requestedNode.getHost())) {
isKnown = true;
break;
}
}
if (!isKnown) {
- for (NodeId knownNode : rmContext.getInactiveRMNodes().keySet()) {
+ for (NodeId knownNode : rm.getRMContext().getInactiveRMNodes()
+ .keySet()) {
if (knownNode.getHost().equals(requestedNode.getHost())) {
isKnown = true;
break;
@@ -842,7 +847,7 @@ public class AdminService extends CompositeService implements
}
}
try {
- rmContext.getNodeLabelManager().replaceLabelsOnNode(
+ rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode(
request.getNodeToLabels());
RMAuditLogger
.logSuccess(user.getShortUserName(), operation, "AdminService");
@@ -879,7 +884,7 @@ public class AdminService extends CompositeService implements
checkRMStatus(user.getShortUserName(), operation, msg);
- Set<NodeId> decommissioningNodes = rmContext.getNodesListManager()
+ Set<NodeId> decommissioningNodes = rm.getRMContext().getNodesListManager()
.checkForDecommissioningNodes();
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
@@ -915,6 +920,6 @@ public class AdminService extends CompositeService implements
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
- rmContext.getScheduler().setClusterMaxPriority(conf);
+ rm.getRMContext().getScheduler().setClusterMaxPriority(conf);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
index bcdf48b..d7485f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
@@ -45,14 +45,12 @@ public class CuratorBasedElectorService extends AbstractService
LogFactory.getLog(CuratorBasedElectorService.class);
private LeaderLatch leaderLatch;
private CuratorFramework curator;
- private RMContext rmContext;
private String latchPath;
private String rmId;
private ResourceManager rm;
- public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) {
+ public CuratorBasedElectorService(ResourceManager rm) {
super(CuratorBasedElectorService.class.getName());
- this.rmContext = rmContext;
this.rm = rm;
}
@@ -102,7 +100,8 @@ public class CuratorBasedElectorService extends AbstractService
public void isLeader() {
LOG.info(rmId + "is elected leader, transitioning to active");
try {
- rmContext.getRMAdminService().transitionToActive(
+ rm.getRMContext().getRMAdminService()
+ .transitionToActive(
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
} catch (Exception e) {
@@ -123,7 +122,8 @@ public class CuratorBasedElectorService extends AbstractService
public void notLeader() {
LOG.info(rmId + " relinquish leadership");
try {
- rmContext.getRMAdminService().transitionToStandby(
+ rm.getRMContext().getRMAdminService()
+ .transitionToStandby(
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 0e305a9..4844eba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -107,6 +108,7 @@ public class RMActiveServiceContext {
private PlacementManager queuePlacementManager = null;
private RMAppLifetimeMonitor rmAppLifetimeMonitor;
+ private QueueLimitCalculator queueLimitCalculator;
public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager();
@@ -483,4 +485,17 @@ public class RMActiveServiceContext {
public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
return this.rmAppLifetimeMonitor;
}
+
+ @Private
+ @Unstable
+ public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
+ return this.queueLimitCalculator;
+ }
+
+ @Private
+ @Unstable
+ public void setContainerQueueLimitCalculator(
+ QueueLimitCalculator limitCalculator) {
+ this.queueLimitCalculator = limitCalculator;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index fb160c4..ab3672e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -57,37 +57,39 @@ import org.apache.hadoop.yarn.util.Clock;
import com.google.common.annotations.VisibleForTesting;
+/**
+ * RMContextImpl class holds two services context.
+ * <ul>
+ * <li>serviceContext : These services called as <b>Always On</b> services.
+ * Services that need to run always irrespective of the HA state of the RM.</li>
+ * <li>activeServiceCotext : Active services context. Services that need to run
+ * only on the Active RM.</li>
+ * </ul>
+ * <p>
+ * <b>Note:</b> If any new service to be added to context, add it to a right
+ * context as per above description.
+ */
public class RMContextImpl implements RMContext {
- private Dispatcher rmDispatcher;
-
- private boolean isHAEnabled;
-
- private HAServiceState haServiceState =
- HAServiceProtocol.HAServiceState.INITIALIZING;
-
- private AdminService adminService;
-
- private ConfigurationProvider configurationProvider;
+ /**
+ * RM service contexts which runs through out RM life span. These are created
+ * once during start of RM.
+ */
+ private RMServiceContext serviceContext;
+ /**
+ * RM Active service context. This will be recreated for every transition from
+ * ACTIVE->STANDBY.
+ */
private RMActiveServiceContext activeServiceContext;
- private Configuration yarnConfiguration;
-
- private RMApplicationHistoryWriter rmApplicationHistoryWriter;
- private SystemMetricsPublisher systemMetricsPublisher;
- private EmbeddedElector elector;
-
- private QueueLimitCalculator queueLimitCalculator;
-
- private final Object haServiceStateLock = new Object();
-
- private ResourceManager resourceManager;
/**
* Default constructor. To be used in conjunction with setter methods for
* individual fields.
*/
public RMContextImpl() {
+ this.serviceContext = new RMServiceContext();
+ this.activeServiceContext = new RMActiveServiceContext();
}
@VisibleForTesting
@@ -138,19 +140,143 @@ public class RMContextImpl implements RMContext {
clientToAMTokenSecretManager, null);
}
+ /**
+ * RM service contexts which runs through out JVM life span. These are created
+ * once during start of RM.
+ * @return serviceContext of RM
+ */
+ @Private
+ @Unstable
+ public RMServiceContext getServiceContext() {
+ return serviceContext;
+ }
+
+ /**
+ * <b>Note:</b> setting service context clears all services embedded with it.
+ * @param context rm service context
+ */
+ @Private
+ @Unstable
+ public void setServiceContext(RMServiceContext context) {
+ this.serviceContext = context;
+ }
+
@Override
- public Dispatcher getDispatcher() {
- return this.rmDispatcher;
+ public ResourceManager getResourceManager() {
+ return serviceContext.getResourceManager();
+ }
+
+ public void setResourceManager(ResourceManager rm) {
+ serviceContext.setResourceManager(rm);
+ }
+
+ @Override
+ public EmbeddedElector getLeaderElectorService() {
+ return serviceContext.getLeaderElectorService();
}
@Override
public void setLeaderElectorService(EmbeddedElector elector) {
- this.elector = elector;
+ serviceContext.setLeaderElectorService(elector);
}
@Override
- public EmbeddedElector getLeaderElectorService() {
- return this.elector;
+ public Dispatcher getDispatcher() {
+ return serviceContext.getDispatcher();
+ }
+
+ void setDispatcher(Dispatcher dispatcher) {
+ serviceContext.setDispatcher(dispatcher);
+ }
+
+ @Override
+ public AdminService getRMAdminService() {
+ return serviceContext.getRMAdminService();
+ }
+
+ void setRMAdminService(AdminService adminService) {
+ serviceContext.setRMAdminService(adminService);
+ }
+
+ @Override
+ public boolean isHAEnabled() {
+ return serviceContext.isHAEnabled();
+ }
+
+ void setHAEnabled(boolean isHAEnabled) {
+ serviceContext.setHAEnabled(isHAEnabled);
+ }
+
+ @Override
+ public HAServiceState getHAServiceState() {
+ return serviceContext.getHAServiceState();
+ }
+
+ void setHAServiceState(HAServiceState serviceState) {
+ serviceContext.setHAServiceState(serviceState);
+ }
+
+ @Override
+ public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
+ return serviceContext.getRMApplicationHistoryWriter();
+ }
+
+ @Override
+ public void setRMApplicationHistoryWriter(
+ RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+ serviceContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
+ }
+
+ @Override
+ public SystemMetricsPublisher getSystemMetricsPublisher() {
+ return serviceContext.getSystemMetricsPublisher();
+ }
+
+ @Override
+ public void setSystemMetricsPublisher(
+ SystemMetricsPublisher metricsPublisher) {
+ serviceContext.setSystemMetricsPublisher(metricsPublisher);
+ }
+
+ @Override
+ public ConfigurationProvider getConfigurationProvider() {
+ return serviceContext.getConfigurationProvider();
+ }
+
+ public void setConfigurationProvider(
+ ConfigurationProvider configurationProvider) {
+ serviceContext.setConfigurationProvider(configurationProvider);
+ }
+
+ @Override
+ public Configuration getYarnConfiguration() {
+ return serviceContext.getYarnConfiguration();
+ }
+
+ public void setYarnConfiguration(Configuration yarnConfiguration) {
+ serviceContext.setYarnConfiguration(yarnConfiguration);
+ }
+
+ public String getHAZookeeperConnectionState() {
+ return serviceContext.getHAZookeeperConnectionState();
+ }
+
+ // ==========================================================================
+ /**
+ * RM Active service context. This will be recreated for every transition from
+ * ACTIVE to STANDBY.
+ * @return activeServiceContext of active services
+ */
+ @Private
+ @Unstable
+ public RMActiveServiceContext getActiveServiceContext() {
+ return activeServiceContext;
+ }
+
+ @Private
+ @Unstable
+ void setActiveServiceContext(RMActiveServiceContext activeServiceContext) {
+ this.activeServiceContext = activeServiceContext;
}
@Override
@@ -228,11 +354,6 @@ public class RMContextImpl implements RMContext {
return activeServiceContext.getClientToAMTokenSecretManager();
}
- @Override
- public AdminService getRMAdminService() {
- return this.adminService;
- }
-
@VisibleForTesting
public void setStateStore(RMStateStore store) {
activeServiceContext.setStateStore(store);
@@ -253,24 +374,6 @@ public class RMContextImpl implements RMContext {
return activeServiceContext.getResourceTrackerService();
}
- void setHAEnabled(boolean isHAEnabled) {
- this.isHAEnabled = isHAEnabled;
- }
-
- void setHAServiceState(HAServiceState serviceState) {
- synchronized (haServiceStateLock) {
- this.haServiceState = serviceState;
- }
- }
-
- void setDispatcher(Dispatcher dispatcher) {
- this.rmDispatcher = dispatcher;
- }
-
- void setRMAdminService(AdminService adminService) {
- this.adminService = adminService;
- }
-
@Override
public void setClientRMService(ClientRMService clientRMService) {
activeServiceContext.setClientRMService(clientRMService);
@@ -348,18 +451,6 @@ public class RMContextImpl implements RMContext {
activeServiceContext.setResourceTrackerService(resourceTrackerService);
}
- @Override
- public boolean isHAEnabled() {
- return isHAEnabled;
- }
-
- @Override
- public HAServiceState getHAServiceState() {
- synchronized (haServiceStateLock) {
- return haServiceState;
- }
- }
-
public void setWorkPreservingRecoveryEnabled(boolean enabled) {
activeServiceContext.setWorkPreservingRecoveryEnabled(enabled);
}
@@ -370,11 +461,6 @@ public class RMContextImpl implements RMContext {
}
@Override
- public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
- return this.rmApplicationHistoryWriter;
- }
-
- @Override
public void setRMTimelineCollectorManager(
RMTimelineCollectorManager timelineCollectorManager) {
activeServiceContext.setRMTimelineCollectorManager(
@@ -382,39 +468,6 @@ public class RMContextImpl implements RMContext {
}
@Override
- public RMTimelineCollectorManager getRMTimelineCollectorManager() {
- return activeServiceContext.getRMTimelineCollectorManager();
- }
-
- @Override
- public void setSystemMetricsPublisher(
- SystemMetricsPublisher metricsPublisher) {
- this.systemMetricsPublisher = metricsPublisher;
- }
-
- @Override
- public SystemMetricsPublisher getSystemMetricsPublisher() {
- return this.systemMetricsPublisher;
- }
-
- @Override
- public void setRMApplicationHistoryWriter(
- RMApplicationHistoryWriter rmApplicationHistoryWriter) {
- this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
-
- }
-
- @Override
- public ConfigurationProvider getConfigurationProvider() {
- return this.configurationProvider;
- }
-
- public void setConfigurationProvider(
- ConfigurationProvider configurationProvider) {
- this.configurationProvider = configurationProvider;
- }
-
- @Override
public long getEpoch() {
return activeServiceContext.getEpoch();
}
@@ -463,27 +516,6 @@ public class RMContextImpl implements RMContext {
return activeServiceContext.getSystemCredentialsForApps();
}
- @Private
- @Unstable
- public RMActiveServiceContext getActiveServiceContext() {
- return activeServiceContext;
- }
-
- @Private
- @Unstable
- void setActiveServiceContext(RMActiveServiceContext activeServiceContext) {
- this.activeServiceContext = activeServiceContext;
- }
-
- @Override
- public Configuration getYarnConfiguration() {
- return this.yarnConfiguration;
- }
-
- public void setYarnConfiguration(Configuration yarnConfiguration) {
- this.yarnConfiguration=yarnConfiguration;
- }
-
@Override
public PlacementManager getQueuePlacementManager() {
return this.activeServiceContext.getQueuePlacementManager();
@@ -496,12 +528,12 @@ public class RMContextImpl implements RMContext {
@Override
public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
- return this.queueLimitCalculator;
+ return activeServiceContext.getNodeManagerQueueLimitCalculator();
}
public void setContainerQueueLimitCalculator(
QueueLimitCalculator limitCalculator) {
- this.queueLimitCalculator = limitCalculator;
+ activeServiceContext.setContainerQueueLimitCalculator(limitCalculator);
}
@Override
@@ -515,21 +547,5 @@ public class RMContextImpl implements RMContext {
return this.activeServiceContext.getRMAppLifetimeMonitor();
}
- public String getHAZookeeperConnectionState() {
- if (elector == null) {
- return "Could not find leader elector. Verify both HA and automatic " +
- "failover are enabled.";
- } else {
- return elector.getZookeeperConnectionState();
- }
- }
-
- @Override
- public ResourceManager getResourceManager() {
- return resourceManager;
- }
-
- public void setResourceManager(ResourceManager rm) {
- this.resourceManager = rm;
- }
+ // Note: Read java doc before adding any services over here.
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java
new file mode 100644
index 0000000..fe34d63
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.yarn.conf.ConfigurationProvider;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+
+/**
+ * RMServiceContext class maintains "Always On" services. Services that need to
+ * run always irrespective of the HA state of the RM. This is created during
+ * initialization of RMContextImpl.
+ * <p>
+ * <b>Note:</b> If any services to be added in this class, make sure service
+ * will be running always irrespective of the HA state of the RM
+ */
+@Private
+@Unstable
+public class RMServiceContext {
+
+ private Dispatcher rmDispatcher;
+ private boolean isHAEnabled;
+ private HAServiceState haServiceState =
+ HAServiceProtocol.HAServiceState.INITIALIZING;
+ private AdminService adminService;
+ private ConfigurationProvider configurationProvider;
+ private Configuration yarnConfiguration;
+ private RMApplicationHistoryWriter rmApplicationHistoryWriter;
+ private SystemMetricsPublisher systemMetricsPublisher;
+ private EmbeddedElector elector;
+ private final Object haServiceStateLock = new Object();
+ private ResourceManager resourceManager;
+
+ public ResourceManager getResourceManager() {
+ return resourceManager;
+ }
+
+ public void setResourceManager(ResourceManager rm) {
+ this.resourceManager = rm;
+ }
+
+ public ConfigurationProvider getConfigurationProvider() {
+ return this.configurationProvider;
+ }
+
+ public void setConfigurationProvider(
+ ConfigurationProvider configurationProvider) {
+ this.configurationProvider = configurationProvider;
+ }
+
+ public Dispatcher getDispatcher() {
+ return this.rmDispatcher;
+ }
+
+ void setDispatcher(Dispatcher dispatcher) {
+ this.rmDispatcher = dispatcher;
+ }
+
+ public EmbeddedElector getLeaderElectorService() {
+ return this.elector;
+ }
+
+ public void setLeaderElectorService(EmbeddedElector embeddedElector) {
+ this.elector = embeddedElector;
+ }
+
+ public AdminService getRMAdminService() {
+ return this.adminService;
+ }
+
+ void setRMAdminService(AdminService service) {
+ this.adminService = service;
+ }
+
+ void setHAEnabled(boolean rmHAEnabled) {
+ this.isHAEnabled = rmHAEnabled;
+ }
+
+ public boolean isHAEnabled() {
+ return isHAEnabled;
+ }
+
+ public HAServiceState getHAServiceState() {
+ synchronized (haServiceStateLock) {
+ return haServiceState;
+ }
+ }
+
+ void setHAServiceState(HAServiceState serviceState) {
+ synchronized (haServiceStateLock) {
+ this.haServiceState = serviceState;
+ }
+ }
+
+ public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
+ return this.rmApplicationHistoryWriter;
+ }
+
+ public void setRMApplicationHistoryWriter(
+ RMApplicationHistoryWriter applicationHistoryWriter) {
+ this.rmApplicationHistoryWriter = applicationHistoryWriter;
+ }
+
+ public void setSystemMetricsPublisher(
+ SystemMetricsPublisher metricsPublisher) {
+ this.systemMetricsPublisher = metricsPublisher;
+ }
+
+ public SystemMetricsPublisher getSystemMetricsPublisher() {
+ return this.systemMetricsPublisher;
+ }
+
+ public Configuration getYarnConfiguration() {
+ return this.yarnConfiguration;
+ }
+
+ public void setYarnConfiguration(Configuration yarnConfiguration) {
+ this.yarnConfiguration = yarnConfiguration;
+ }
+
+ public String getHAZookeeperConnectionState() {
+ if (elector == null) {
+ return "Could not find leader elector. Verify both HA and automatic "
+ + "failover are enabled.";
+ } else {
+ return elector.getZookeeperConnectionState();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 1adef33..d8de137 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -347,9 +347,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
this.curator = createAndStartCurator(conf);
- elector = new CuratorBasedElectorService(rmContext, this);
+ elector = new CuratorBasedElectorService(this);
} else {
- elector = new ActiveStandbyElectorBasedElectorService(rmContext);
+ elector = new ActiveStandbyElectorBasedElectorService(this);
}
return elector;
}
@@ -562,7 +562,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
private ApplicationMasterLauncher applicationMasterLauncher;
private ContainerAllocationExpirer containerAllocationExpirer;
private ResourceManager rm;
- private RMActiveServiceContext activeServiceContext;
private boolean fromActive = false;
private StandByTransitionRunnable standByTransitionRunnable;
@@ -575,9 +574,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected void serviceInit(Configuration configuration) throws Exception {
standByTransitionRunnable = new StandByTransitionRunnable();
- activeServiceContext = new RMActiveServiceContext();
- rmContext.setActiveServiceContext(activeServiceContext);
-
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
rmSecretManagerService = createRMSecretManagerService();
addService(rmSecretManagerService);
@@ -1135,7 +1131,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
ClusterMetrics.destroy();
QueueMetrics.clearQueueMetrics();
if (initialize) {
- resetDispatcher();
+ resetRMContext();
createAndInitActiveServices(true);
}
}
@@ -1280,7 +1276,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected AdminService createAdminService() {
- return new AdminService(this, rmContext);
+ return new AdminService(this);
}
protected RMSecretManagerService createRMSecretManagerService() {
@@ -1403,16 +1399,24 @@ public class ResourceManager extends CompositeService implements Recoverable {
return dispatcher;
}
- private void resetDispatcher() {
+ private void resetRMContext() {
+ RMContextImpl rmContextImpl = new RMContextImpl();
+ // transfer service context to new RM service Context
+ rmContextImpl.setServiceContext(rmContext.getServiceContext());
+
+ // reset dispatcher
Dispatcher dispatcher = setupDispatcher();
- ((Service)dispatcher).init(this.conf);
- ((Service)dispatcher).start();
- removeService((Service)rmDispatcher);
+ ((Service) dispatcher).init(this.conf);
+ ((Service) dispatcher).start();
+ removeService((Service) rmDispatcher);
// Need to stop previous rmDispatcher before assigning new dispatcher
// otherwise causes "AsyncDispatcher event handler" thread leak
((Service) rmDispatcher).stop();
rmDispatcher = dispatcher;
addIfService(rmDispatcher);
+ rmContextImpl.setDispatcher(dispatcher);
+
+ rmContext = rmContextImpl;
rmContext.setDispatcher(rmDispatcher);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index aca2fc5..b0ad977 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -1042,7 +1042,7 @@ public class MockRM extends ResourceManager {
@Override
protected AdminService createAdminService() {
- return new AdminService(this, getRMContext()) {
+ return new AdminService(this) {
@Override
protected void startServer() {
// override to not start rpc handler
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
index c4fcc5d..47d18f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
@@ -123,13 +123,15 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
throws IOException, InterruptedException {
AdminService as = mock(AdminService.class);
RMContext rc = mock(RMContext.class);
+ ResourceManager rm = mock(ResourceManager.class);
Configuration myConf = new Configuration(conf);
myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50);
+ when(rm.getRMContext()).thenReturn(rc);
when(rc.getRMAdminService()).thenReturn(as);
- ActiveStandbyElectorBasedElectorService
- ees = new ActiveStandbyElectorBasedElectorService(rc);
+ ActiveStandbyElectorBasedElectorService ees =
+ new ActiveStandbyElectorBasedElectorService(rm);
ees.init(myConf);
ees.enterNeutralMode();
@@ -291,7 +293,7 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
@Override
protected EmbeddedElector createEmbeddedElector() {
- return new ActiveStandbyElectorBasedElectorService(getRMContext()) {
+ return new ActiveStandbyElectorBasedElectorService(this) {
@Override
public void becomeActive() throws
ServiceFailedException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index 0efda9e..a558dd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -70,6 +70,7 @@ public class TestRMHA {
private Log LOG = LogFactory.getLog(TestRMHA.class);
private Configuration configuration;
private MockRM rm = null;
+ private MockNM nm = null;
private RMApp app = null;
private RMAppAttempt attempt = null;
private static final String STATE_ERR =
@@ -134,7 +135,7 @@ public class TestRMHA {
try {
rm.getNewAppId();
- rm.registerNode("127.0.0.1:1", 2048);
+ nm = rm.registerNode("127.0.0.1:1", 2048);
app = rm.submitApp(1024);
attempt = app.getCurrentAppAttempt();
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
@@ -549,6 +550,17 @@ public class TestRMHA {
verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
assertEquals(1, rm.getRMContext().getRMNodes().size());
assertEquals(1, rm.getRMContext().getRMApps().size());
+ Assert.assertNotNull("Node not registered", nm);
+
+ rm.adminService.transitionToStandby(requestInfo);
+ checkMonitorHealth();
+ checkStandbyRMFunctionality();
+ // race condition causes to register/node heartbeat node even after service
+ // is stopping/stopped. New RMContext is being created on every transition
+ // to standby, so metrics should be 0 which indicates new context reference
+ // has taken.
+ nm.registerNode();
+ verifyClusterMetrics(0, 0, 0, 0, 0, 0);
// 3. Create new RM
rm = new MockRM(conf, memStore) {
@@ -590,7 +602,7 @@ public class TestRMHA {
rm = new MockRM(configuration) {
@Override
protected AdminService createAdminService() {
- return new AdminService(this, getRMContext()) {
+ return new AdminService(this) {
int counter = 0;
@Override
protected void setConfig(Configuration conf) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org