You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/07/28 16:50:16 UTC
[07/50] [abbrv] hadoop git commit: YARN-6102. RMActiveService context
to be updated with new RMContext on failover. Contributed by Rohith Sharma K
S.
YARN-6102. RMActiveService context to be updated with new RMContext on failover. Contributed by Rohith Sharma K S.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e3153284
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e3153284
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e3153284
Branch: refs/heads/HDFS-10467
Commit: e3153284288d6cfa7a28511dfefe1c8a7d6b4eda
Parents: 2054324
Author: Sunil G <su...@apache.org>
Authored: Mon Jul 24 10:59:01 2017 +0530
Committer: Sunil G <su...@apache.org>
Committed: Mon Jul 24 11:39:03 2017 +0530
----------------------------------------------------------------------
...ActiveStandbyElectorBasedElectorService.java | 12 +-
.../server/resourcemanager/AdminService.java | 71 +++--
.../CuratorBasedElectorService.java | 10 +-
.../resourcemanager/RMActiveServiceContext.java | 36 +--
.../server/resourcemanager/RMContextImpl.java | 312 ++++++++++---------
.../resourcemanager/RMServiceContext.java | 162 ++++++++++
.../server/resourcemanager/ResourceManager.java | 35 ++-
.../metrics/TimelineServiceV2Publisher.java | 6 +-
.../RMTimelineCollectorManager.java | 10 +-
.../yarn/server/resourcemanager/MockRM.java | 2 +-
.../resourcemanager/TestRMEmbeddedElector.java | 8 +-
.../yarn/server/resourcemanager/TestRMHA.java | 16 +-
.../TestSystemMetricsPublisherForV2.java | 13 +-
13 files changed, 451 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
index b59bc25..a8dcda4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
@@ -57,7 +57,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
- private RMContext rmContext;
+ private ResourceManager rm;
private byte[] localActiveNodeInfo;
private ActiveStandbyElector elector;
@@ -66,9 +66,9 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
@VisibleForTesting
final Object zkDisconnectLock = new Object();
- ActiveStandbyElectorBasedElectorService(RMContext rmContext) {
+ ActiveStandbyElectorBasedElectorService(ResourceManager rm) {
super(ActiveStandbyElectorBasedElectorService.class.getName());
- this.rmContext = rmContext;
+ this.rm = rm;
}
@Override
@@ -140,7 +140,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
cancelDisconnectTimer();
try {
- rmContext.getRMAdminService().transitionToActive(req);
+ rm.getRMContext().getRMAdminService().transitionToActive(req);
} catch (Exception e) {
throw new ServiceFailedException("RM could not transition to Active", e);
}
@@ -151,7 +151,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
cancelDisconnectTimer();
try {
- rmContext.getRMAdminService().transitionToStandby(req);
+ rm.getRMContext().getRMAdminService().transitionToStandby(req);
} catch (Exception e) {
LOG.error("RM could not transition to Standby", e);
}
@@ -205,7 +205,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
@SuppressWarnings(value = "unchecked")
@Override
public void notifyFatalError(String errorMessage) {
- rmContext.getDispatcher().getEventHandler().handle(
+ rm.getRMContext().getDispatcher().getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
errorMessage));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 7571765..3457ae3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -102,7 +102,6 @@ public class AdminService extends CompositeService implements
private static final Log LOG = LogFactory.getLog(AdminService.class);
- private final RMContext rmContext;
private final ResourceManager rm;
private String rmId;
@@ -123,16 +122,16 @@ public class AdminService extends CompositeService implements
@VisibleForTesting
boolean isCentralizedNodeLabelConfiguration = true;
- public AdminService(ResourceManager rm, RMContext rmContext) {
+ public AdminService(ResourceManager rm) {
super(AdminService.class.getName());
this.rm = rm;
- this.rmContext = rmContext;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
autoFailoverEnabled =
- rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf);
+ rm.getRMContext().isHAEnabled()
+ && HAUtil.isAutomaticFailoverEnabled(conf);
masterServiceBindAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
@@ -189,7 +188,7 @@ public class AdminService extends CompositeService implements
RMPolicyProvider.getInstance());
}
- if (rmContext.isHAEnabled()) {
+ if (rm.getRMContext().isHAEnabled()) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
@@ -265,7 +264,7 @@ public class AdminService extends CompositeService implements
}
private synchronized boolean isRMActive() {
- return HAServiceState.ACTIVE == rmContext.getHAServiceState();
+ return HAServiceState.ACTIVE == rm.getRMContext().getHAServiceState();
}
private void throwStandbyException() throws StandbyException {
@@ -304,7 +303,7 @@ public class AdminService extends CompositeService implements
// call all refresh*s for active RM to get the updated configurations.
refreshAll();
} catch (Exception e) {
- rmContext
+ rm.getRMContext()
.getDispatcher()
.getEventHandler()
.handle(
@@ -363,7 +362,7 @@ public class AdminService extends CompositeService implements
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
- HAServiceState haState = rmContext.getHAServiceState();
+ HAServiceState haState = rm.getRMContext().getHAServiceState();
HAServiceStatus ret = new HAServiceStatus(haState);
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
@@ -395,11 +394,12 @@ public class AdminService extends CompositeService implements
}
private void refreshQueues() throws IOException, YarnException {
- rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
+ rm.getRMContext().getScheduler().reinitialize(getConfig(),
+ this.rm.getRMContext());
// refresh the reservation system
- ReservationSystem rSystem = rmContext.getReservationSystem();
+ ReservationSystem rSystem = rm.getRMContext().getReservationSystem();
if (rSystem != null) {
- rSystem.reinitialize(getConfig(), rmContext);
+ rSystem.reinitialize(getConfig(), rm.getRMContext());
}
}
@@ -418,14 +418,14 @@ public class AdminService extends CompositeService implements
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
switch (request.getDecommissionType()) {
case NORMAL:
- rmContext.getNodesListManager().refreshNodes(conf);
+ rm.getRMContext().getNodesListManager().refreshNodes(conf);
break;
case GRACEFUL:
- rmContext.getNodesListManager().refreshNodesGracefully(
+ rm.getRMContext().getNodesListManager().refreshNodesGracefully(
conf, request.getDecommissionTimeout());
break;
case FORCEFUL:
- rmContext.getNodesListManager().refreshNodesForcefully();
+ rm.getRMContext().getNodesListManager().refreshNodesForcefully();
break;
}
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
@@ -440,7 +440,7 @@ public class AdminService extends CompositeService implements
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
- rmContext.getNodesListManager().refreshNodes(conf);
+ rm.getRMContext().getNodesListManager().refreshNodes(conf);
}
@Override
@@ -559,10 +559,11 @@ public class AdminService extends CompositeService implements
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
- rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
- rmContext.getApplicationMasterService().refreshServiceAcls(
+ rm.getRMContext().getClientRMService().refreshServiceAcls(conf,
+ policyProvider);
+ rm.getRMContext().getApplicationMasterService().refreshServiceAcls(
conf, policyProvider);
- rmContext.getResourceTrackerService().refreshServiceAcls(
+ rm.getRMContext().getResourceTrackerService().refreshServiceAcls(
conf, policyProvider);
}
@@ -601,7 +602,7 @@ public class AdminService extends CompositeService implements
// if any invalid nodes, throw exception instead of partially updating
// valid nodes.
for (NodeId nodeId : nodeIds) {
- RMNode node = this.rmContext.getRMNodes().get(nodeId);
+ RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId);
if (node == null) {
LOG.error("Resource update get failed on all nodes due to change "
+ "resource on an unrecognized node: " + nodeId);
@@ -619,14 +620,14 @@ public class AdminService extends CompositeService implements
for (Map.Entry<NodeId, ResourceOption> entry : nodeResourceMap.entrySet()) {
ResourceOption newResourceOption = entry.getValue();
NodeId nodeId = entry.getKey();
- RMNode node = this.rmContext.getRMNodes().get(nodeId);
+ RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId);
if (node == null) {
LOG.warn("Resource update get failed on an unrecognized node: " + nodeId);
allSuccess = false;
} else {
// update resource to RMNode
- this.rmContext.getDispatcher().getEventHandler()
+ this.rm.getRMContext().getDispatcher().getEventHandler()
.handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption));
LOG.info("Update resource on node(" + node.getNodeID()
+ ") with resource(" + newResourceOption.toString() + ")");
@@ -661,7 +662,8 @@ public class AdminService extends CompositeService implements
DynamicResourceConfiguration newConf;
InputStream drInputStream =
- this.rmContext.getConfigurationProvider().getConfigurationInputStream(
+ this.rm.getRMContext().getConfigurationProvider()
+ .getConfigurationInputStream(
configuration, YarnConfiguration.DR_CONFIGURATION_FILE);
if (drInputStream != null) {
@@ -679,7 +681,7 @@ public class AdminService extends CompositeService implements
updateNodeResource(updateRequest);
}
// refresh dynamic resource in ResourceTrackerService
- this.rmContext.getResourceTrackerService().
+ this.rm.getRMContext().getResourceTrackerService().
updateDynamicResourceConfiguration(newConf);
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
@@ -692,7 +694,8 @@ public class AdminService extends CompositeService implements
private synchronized Configuration getConfiguration(Configuration conf,
String... confFileNames) throws YarnException, IOException {
for (String confFileName : confFileNames) {
- InputStream confFileInputStream = this.rmContext.getConfigurationProvider()
+ InputStream confFileInputStream =
+ this.rm.getRMContext().getConfigurationProvider()
.getConfigurationInputStream(conf, confFileName);
if (confFileInputStream != null) {
conf.addResource(confFileInputStream);
@@ -746,7 +749,7 @@ public class AdminService extends CompositeService implements
AddToClusterNodeLabelsResponse response =
recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class);
try {
- rmContext.getNodeLabelManager()
+ rm.getRMContext().getNodeLabelManager()
.addToCluserNodeLabels(request.getNodeLabels());
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
@@ -769,7 +772,8 @@ public class AdminService extends CompositeService implements
RemoveFromClusterNodeLabelsResponse response =
recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class);
try {
- rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels());
+ rm.getRMContext().getNodeLabelManager()
+ .removeFromClusterNodeLabels(request.getNodeLabels());
RMAuditLogger
.logSuccess(user.getShortUserName(), operation, "AdminService");
return response;
@@ -805,19 +809,20 @@ public class AdminService extends CompositeService implements
boolean isKnown = false;
// both active and inactive nodes are recognized as known nodes
if (requestedNode.getPort() != 0) {
- if (rmContext.getRMNodes().containsKey(requestedNode)
- || rmContext.getInactiveRMNodes().containsKey(requestedNode)) {
+ if (rm.getRMContext().getRMNodes().containsKey(requestedNode) || rm
+ .getRMContext().getInactiveRMNodes().containsKey(requestedNode)) {
isKnown = true;
}
} else {
- for (NodeId knownNode : rmContext.getRMNodes().keySet()) {
+ for (NodeId knownNode : rm.getRMContext().getRMNodes().keySet()) {
if (knownNode.getHost().equals(requestedNode.getHost())) {
isKnown = true;
break;
}
}
if (!isKnown) {
- for (NodeId knownNode : rmContext.getInactiveRMNodes().keySet()) {
+ for (NodeId knownNode : rm.getRMContext().getInactiveRMNodes()
+ .keySet()) {
if (knownNode.getHost().equals(requestedNode.getHost())) {
isKnown = true;
break;
@@ -841,7 +846,7 @@ public class AdminService extends CompositeService implements
}
}
try {
- rmContext.getNodeLabelManager().replaceLabelsOnNode(
+ rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode(
request.getNodeToLabels());
RMAuditLogger
.logSuccess(user.getShortUserName(), operation, "AdminService");
@@ -878,7 +883,7 @@ public class AdminService extends CompositeService implements
checkRMStatus(user.getShortUserName(), operation, msg);
- Set<NodeId> decommissioningNodes = rmContext.getNodesListManager()
+ Set<NodeId> decommissioningNodes = rm.getRMContext().getNodesListManager()
.checkForDecommissioningNodes();
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
@@ -914,6 +919,6 @@ public class AdminService extends CompositeService implements
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
- rmContext.getScheduler().setClusterMaxPriority(conf);
+ rm.getRMContext().getScheduler().setClusterMaxPriority(conf);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
index bcdf48b..d7485f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
@@ -45,14 +45,12 @@ public class CuratorBasedElectorService extends AbstractService
LogFactory.getLog(CuratorBasedElectorService.class);
private LeaderLatch leaderLatch;
private CuratorFramework curator;
- private RMContext rmContext;
private String latchPath;
private String rmId;
private ResourceManager rm;
- public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) {
+ public CuratorBasedElectorService(ResourceManager rm) {
super(CuratorBasedElectorService.class.getName());
- this.rmContext = rmContext;
this.rm = rm;
}
@@ -102,7 +100,8 @@ public class CuratorBasedElectorService extends AbstractService
public void isLeader() {
LOG.info(rmId + "is elected leader, transitioning to active");
try {
- rmContext.getRMAdminService().transitionToActive(
+ rm.getRMContext().getRMAdminService()
+ .transitionToActive(
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
} catch (Exception e) {
@@ -123,7 +122,8 @@ public class CuratorBasedElectorService extends AbstractService
public void notLeader() {
LOG.info(rmId + " relinquish leadership");
try {
- rmContext.getRMAdminService().transitionToStandby(
+ rm.getRMContext().getRMAdminService()
+ .transitionToStandby(
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 0e305a9..9dc5945 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -42,20 +42,20 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
/**
- * The RMActiveServiceContext is the class that maintains all the
- * RMActiveService contexts.This is expected to be used only by ResourceManager
- * and RMContext.
+ * The RMActiveServiceContext is the class that maintains <b>Active</b> service
+ * context. Services that need to run only on the Active RM. This is expected to
+ * be used only by RMContext.
*/
@Private
@Unstable
@@ -94,7 +94,6 @@ public class RMActiveServiceContext {
private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
- private RMTimelineCollectorManager timelineCollectorManager;
private RMNodeLabelsManager nodeLabelManager;
private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater;
@@ -107,6 +106,7 @@ public class RMActiveServiceContext {
private PlacementManager queuePlacementManager = null;
private RMAppLifetimeMonitor rmAppLifetimeMonitor;
+ private QueueLimitCalculator queueLimitCalculator;
public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager();
@@ -374,19 +374,6 @@ public class RMActiveServiceContext {
@Private
@Unstable
- public RMTimelineCollectorManager getRMTimelineCollectorManager() {
- return timelineCollectorManager;
- }
-
- @Private
- @Unstable
- public void setRMTimelineCollectorManager(
- RMTimelineCollectorManager collectorManager) {
- this.timelineCollectorManager = collectorManager;
- }
-
- @Private
- @Unstable
public long getEpoch() {
return this.epoch;
}
@@ -483,4 +470,17 @@ public class RMActiveServiceContext {
public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
return this.rmAppLifetimeMonitor;
}
+
+ @Private
+ @Unstable
+ public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
+ return this.queueLimitCalculator;
+ }
+
+ @Private
+ @Unstable
+ public void setContainerQueueLimitCalculator(
+ QueueLimitCalculator limitCalculator) {
+ this.queueLimitCalculator = limitCalculator;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index fb160c4..db2c585 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -57,37 +56,39 @@ import org.apache.hadoop.yarn.util.Clock;
import com.google.common.annotations.VisibleForTesting;
+/**
+ * RMContextImpl class holds two services context.
+ * <ul>
+ * <li>serviceContext : These services called as <b>Always On</b> services.
+ * Services that need to run always irrespective of the HA state of the RM.</li>
+ * <li>activeServiceCotext : Active services context. Services that need to run
+ * only on the Active RM.</li>
+ * </ul>
+ * <p>
+ * <b>Note:</b> If any new service to be added to context, add it to a right
+ * context as per above description.
+ */
public class RMContextImpl implements RMContext {
- private Dispatcher rmDispatcher;
-
- private boolean isHAEnabled;
-
- private HAServiceState haServiceState =
- HAServiceProtocol.HAServiceState.INITIALIZING;
-
- private AdminService adminService;
-
- private ConfigurationProvider configurationProvider;
+ /**
+ * RM service contexts which runs through out RM life span. These are created
+ * once during start of RM.
+ */
+ private RMServiceContext serviceContext;
+ /**
+ * RM Active service context. This will be recreated for every transition from
+ * ACTIVE->STANDBY.
+ */
private RMActiveServiceContext activeServiceContext;
- private Configuration yarnConfiguration;
-
- private RMApplicationHistoryWriter rmApplicationHistoryWriter;
- private SystemMetricsPublisher systemMetricsPublisher;
- private EmbeddedElector elector;
-
- private QueueLimitCalculator queueLimitCalculator;
-
- private final Object haServiceStateLock = new Object();
-
- private ResourceManager resourceManager;
/**
* Default constructor. To be used in conjunction with setter methods for
* individual fields.
*/
public RMContextImpl() {
+ this.serviceContext = new RMServiceContext();
+ this.activeServiceContext = new RMActiveServiceContext();
}
@VisibleForTesting
@@ -138,19 +139,154 @@ public class RMContextImpl implements RMContext {
clientToAMTokenSecretManager, null);
}
+ /**
+ * RM service contexts which runs through out JVM life span. These are created
+ * once during start of RM.
+ * @return serviceContext of RM
+ */
+ @Private
+ @Unstable
+ public RMServiceContext getServiceContext() {
+ return serviceContext;
+ }
+
+ /**
+ * <b>Note:</b> setting service context clears all services embedded with it.
+ * @param context rm service context
+ */
+ @Private
+ @Unstable
+ public void setServiceContext(RMServiceContext context) {
+ this.serviceContext = context;
+ }
+
@Override
- public Dispatcher getDispatcher() {
- return this.rmDispatcher;
+ public ResourceManager getResourceManager() {
+ return serviceContext.getResourceManager();
+ }
+
+ public void setResourceManager(ResourceManager rm) {
+ serviceContext.setResourceManager(rm);
+ }
+
+ @Override
+ public EmbeddedElector getLeaderElectorService() {
+ return serviceContext.getLeaderElectorService();
}
@Override
public void setLeaderElectorService(EmbeddedElector elector) {
- this.elector = elector;
+ serviceContext.setLeaderElectorService(elector);
}
@Override
- public EmbeddedElector getLeaderElectorService() {
- return this.elector;
+ public Dispatcher getDispatcher() {
+ return serviceContext.getDispatcher();
+ }
+
+ void setDispatcher(Dispatcher dispatcher) {
+ serviceContext.setDispatcher(dispatcher);
+ }
+
+ @Override
+ public AdminService getRMAdminService() {
+ return serviceContext.getRMAdminService();
+ }
+
+ void setRMAdminService(AdminService adminService) {
+ serviceContext.setRMAdminService(adminService);
+ }
+
+ @Override
+ public boolean isHAEnabled() {
+ return serviceContext.isHAEnabled();
+ }
+
+ void setHAEnabled(boolean isHAEnabled) {
+ serviceContext.setHAEnabled(isHAEnabled);
+ }
+
+ @Override
+ public HAServiceState getHAServiceState() {
+ return serviceContext.getHAServiceState();
+ }
+
+ void setHAServiceState(HAServiceState serviceState) {
+ serviceContext.setHAServiceState(serviceState);
+ }
+
+ @Override
+ public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
+ return serviceContext.getRMApplicationHistoryWriter();
+ }
+
+ @Override
+ public void setRMApplicationHistoryWriter(
+ RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+ serviceContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
+ }
+
+ @Override
+ public SystemMetricsPublisher getSystemMetricsPublisher() {
+ return serviceContext.getSystemMetricsPublisher();
+ }
+
+ @Override
+ public void setSystemMetricsPublisher(
+ SystemMetricsPublisher metricsPublisher) {
+ serviceContext.setSystemMetricsPublisher(metricsPublisher);
+ }
+
+ @Override
+ public RMTimelineCollectorManager getRMTimelineCollectorManager() {
+ return serviceContext.getRMTimelineCollectorManager();
+ }
+
+ @Override
+ public void setRMTimelineCollectorManager(
+ RMTimelineCollectorManager timelineCollectorManager) {
+ serviceContext.setRMTimelineCollectorManager(timelineCollectorManager);
+ }
+
+ @Override
+ public ConfigurationProvider getConfigurationProvider() {
+ return serviceContext.getConfigurationProvider();
+ }
+
+ public void setConfigurationProvider(
+ ConfigurationProvider configurationProvider) {
+ serviceContext.setConfigurationProvider(configurationProvider);
+ }
+
+ @Override
+ public Configuration getYarnConfiguration() {
+ return serviceContext.getYarnConfiguration();
+ }
+
+ public void setYarnConfiguration(Configuration yarnConfiguration) {
+ serviceContext.setYarnConfiguration(yarnConfiguration);
+ }
+
+ public String getHAZookeeperConnectionState() {
+ return serviceContext.getHAZookeeperConnectionState();
+ }
+
+ // ==========================================================================
+ /**
+ * RM Active service context. This will be recreated for every transition from
+ * ACTIVE to STANDBY.
+ * @return activeServiceContext of active services
+ */
+ @Private
+ @Unstable
+ public RMActiveServiceContext getActiveServiceContext() {
+ return activeServiceContext;
+ }
+
+ @Private
+ @Unstable
+ void setActiveServiceContext(RMActiveServiceContext activeServiceContext) {
+ this.activeServiceContext = activeServiceContext;
}
@Override
@@ -228,11 +364,6 @@ public class RMContextImpl implements RMContext {
return activeServiceContext.getClientToAMTokenSecretManager();
}
- @Override
- public AdminService getRMAdminService() {
- return this.adminService;
- }
-
@VisibleForTesting
public void setStateStore(RMStateStore store) {
activeServiceContext.setStateStore(store);
@@ -253,24 +384,6 @@ public class RMContextImpl implements RMContext {
return activeServiceContext.getResourceTrackerService();
}
- void setHAEnabled(boolean isHAEnabled) {
- this.isHAEnabled = isHAEnabled;
- }
-
- void setHAServiceState(HAServiceState serviceState) {
- synchronized (haServiceStateLock) {
- this.haServiceState = serviceState;
- }
- }
-
- void setDispatcher(Dispatcher dispatcher) {
- this.rmDispatcher = dispatcher;
- }
-
- void setRMAdminService(AdminService adminService) {
- this.adminService = adminService;
- }
-
@Override
public void setClientRMService(ClientRMService clientRMService) {
activeServiceContext.setClientRMService(clientRMService);
@@ -348,18 +461,6 @@ public class RMContextImpl implements RMContext {
activeServiceContext.setResourceTrackerService(resourceTrackerService);
}
- @Override
- public boolean isHAEnabled() {
- return isHAEnabled;
- }
-
- @Override
- public HAServiceState getHAServiceState() {
- synchronized (haServiceStateLock) {
- return haServiceState;
- }
- }
-
public void setWorkPreservingRecoveryEnabled(boolean enabled) {
activeServiceContext.setWorkPreservingRecoveryEnabled(enabled);
}
@@ -369,50 +470,6 @@ public class RMContextImpl implements RMContext {
return activeServiceContext.isWorkPreservingRecoveryEnabled();
}
- @Override
- public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
- return this.rmApplicationHistoryWriter;
- }
-
- @Override
- public void setRMTimelineCollectorManager(
- RMTimelineCollectorManager timelineCollectorManager) {
- activeServiceContext.setRMTimelineCollectorManager(
- timelineCollectorManager);
- }
-
- @Override
- public RMTimelineCollectorManager getRMTimelineCollectorManager() {
- return activeServiceContext.getRMTimelineCollectorManager();
- }
-
- @Override
- public void setSystemMetricsPublisher(
- SystemMetricsPublisher metricsPublisher) {
- this.systemMetricsPublisher = metricsPublisher;
- }
-
- @Override
- public SystemMetricsPublisher getSystemMetricsPublisher() {
- return this.systemMetricsPublisher;
- }
-
- @Override
- public void setRMApplicationHistoryWriter(
- RMApplicationHistoryWriter rmApplicationHistoryWriter) {
- this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
-
- }
-
- @Override
- public ConfigurationProvider getConfigurationProvider() {
- return this.configurationProvider;
- }
-
- public void setConfigurationProvider(
- ConfigurationProvider configurationProvider) {
- this.configurationProvider = configurationProvider;
- }
@Override
public long getEpoch() {
@@ -463,27 +520,6 @@ public class RMContextImpl implements RMContext {
return activeServiceContext.getSystemCredentialsForApps();
}
- @Private
- @Unstable
- public RMActiveServiceContext getActiveServiceContext() {
- return activeServiceContext;
- }
-
- @Private
- @Unstable
- void setActiveServiceContext(RMActiveServiceContext activeServiceContext) {
- this.activeServiceContext = activeServiceContext;
- }
-
- @Override
- public Configuration getYarnConfiguration() {
- return this.yarnConfiguration;
- }
-
- public void setYarnConfiguration(Configuration yarnConfiguration) {
- this.yarnConfiguration=yarnConfiguration;
- }
-
@Override
public PlacementManager getQueuePlacementManager() {
return this.activeServiceContext.getQueuePlacementManager();
@@ -496,12 +532,12 @@ public class RMContextImpl implements RMContext {
@Override
public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
- return this.queueLimitCalculator;
+ return activeServiceContext.getNodeManagerQueueLimitCalculator();
}
public void setContainerQueueLimitCalculator(
QueueLimitCalculator limitCalculator) {
- this.queueLimitCalculator = limitCalculator;
+ activeServiceContext.setContainerQueueLimitCalculator(limitCalculator);
}
@Override
@@ -515,21 +551,5 @@ public class RMContextImpl implements RMContext {
return this.activeServiceContext.getRMAppLifetimeMonitor();
}
- public String getHAZookeeperConnectionState() {
- if (elector == null) {
- return "Could not find leader elector. Verify both HA and automatic " +
- "failover are enabled.";
- } else {
- return elector.getZookeeperConnectionState();
- }
- }
-
- @Override
- public ResourceManager getResourceManager() {
- return resourceManager;
- }
-
- public void setResourceManager(ResourceManager rm) {
- this.resourceManager = rm;
- }
+ // Note: Read java doc before adding any services over here.
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java
new file mode 100644
index 0000000..45c6166
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.yarn.conf.ConfigurationProvider;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+
+/**
+ * RMServiceContext class maintains "Always On" services. Services that need to
+ * run always irrespective of the HA state of the RM. This is created during
+ * initialization of RMContextImpl.
+ * <p>
+ * <b>Note:</b> If any services to be added in this class, make sure service
+ * will be running always irrespective of the HA state of the RM
+ */
+@Private
+@Unstable
+public class RMServiceContext {
+
+ private Dispatcher rmDispatcher;
+ private boolean isHAEnabled;
+ private HAServiceState haServiceState =
+ HAServiceProtocol.HAServiceState.INITIALIZING;
+ private AdminService adminService;
+ private ConfigurationProvider configurationProvider;
+ private Configuration yarnConfiguration;
+ private RMApplicationHistoryWriter rmApplicationHistoryWriter;
+ private SystemMetricsPublisher systemMetricsPublisher;
+ private EmbeddedElector elector;
+ private final Object haServiceStateLock = new Object();
+ private ResourceManager resourceManager;
+ private RMTimelineCollectorManager timelineCollectorManager;
+
+ public ResourceManager getResourceManager() {
+ return resourceManager;
+ }
+
+ public void setResourceManager(ResourceManager rm) {
+ this.resourceManager = rm;
+ }
+
+ public ConfigurationProvider getConfigurationProvider() {
+ return this.configurationProvider;
+ }
+
+ public void setConfigurationProvider(
+ ConfigurationProvider configurationProvider) {
+ this.configurationProvider = configurationProvider;
+ }
+
+ public Dispatcher getDispatcher() {
+ return this.rmDispatcher;
+ }
+
+ void setDispatcher(Dispatcher dispatcher) {
+ this.rmDispatcher = dispatcher;
+ }
+
+ public EmbeddedElector getLeaderElectorService() {
+ return this.elector;
+ }
+
+ public void setLeaderElectorService(EmbeddedElector embeddedElector) {
+ this.elector = embeddedElector;
+ }
+
+ public AdminService getRMAdminService() {
+ return this.adminService;
+ }
+
+ void setRMAdminService(AdminService service) {
+ this.adminService = service;
+ }
+
+ void setHAEnabled(boolean rmHAEnabled) {
+ this.isHAEnabled = rmHAEnabled;
+ }
+
+ public boolean isHAEnabled() {
+ return isHAEnabled;
+ }
+
+ public HAServiceState getHAServiceState() {
+ synchronized (haServiceStateLock) {
+ return haServiceState;
+ }
+ }
+
+ void setHAServiceState(HAServiceState serviceState) {
+ synchronized (haServiceStateLock) {
+ this.haServiceState = serviceState;
+ }
+ }
+
+ public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
+ return this.rmApplicationHistoryWriter;
+ }
+
+ public void setRMApplicationHistoryWriter(
+ RMApplicationHistoryWriter applicationHistoryWriter) {
+ this.rmApplicationHistoryWriter = applicationHistoryWriter;
+ }
+
+ public void setSystemMetricsPublisher(
+ SystemMetricsPublisher metricsPublisher) {
+ this.systemMetricsPublisher = metricsPublisher;
+ }
+
+ public SystemMetricsPublisher getSystemMetricsPublisher() {
+ return this.systemMetricsPublisher;
+ }
+
+ public Configuration getYarnConfiguration() {
+ return this.yarnConfiguration;
+ }
+
+ public void setYarnConfiguration(Configuration yarnConfiguration) {
+ this.yarnConfiguration = yarnConfiguration;
+ }
+
+ public RMTimelineCollectorManager getRMTimelineCollectorManager() {
+ return timelineCollectorManager;
+ }
+
+ public void setRMTimelineCollectorManager(
+ RMTimelineCollectorManager collectorManager) {
+ this.timelineCollectorManager = collectorManager;
+ }
+
+ public String getHAZookeeperConnectionState() {
+ if (elector == null) {
+ return "Could not find leader elector. Verify both HA and automatic "
+ + "failover are enabled.";
+ } else {
+ return elector.getZookeeperConnectionState();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index f727f55..b63b60d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -115,7 +115,6 @@ import org.eclipse.jetty.webapp.WebAppContext;
import com.google.common.annotations.VisibleForTesting;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
@@ -345,9 +344,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
this.curator = createAndStartCurator(conf);
- elector = new CuratorBasedElectorService(rmContext, this);
+ elector = new CuratorBasedElectorService(this);
} else {
- elector = new ActiveStandbyElectorBasedElectorService(rmContext);
+ elector = new ActiveStandbyElectorBasedElectorService(this);
}
return elector;
}
@@ -497,7 +496,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
private RMTimelineCollectorManager createRMTimelineCollectorManager() {
- return new RMTimelineCollectorManager(rmContext);
+ return new RMTimelineCollectorManager(this);
}
protected SystemMetricsPublisher createSystemMetricsPublisher() {
@@ -508,7 +507,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
// we're dealing with the v.2.x publisher
LOG.info("system metrics publisher with the timeline service V2 is " +
"configured");
- publisher = new TimelineServiceV2Publisher(rmContext);
+ publisher = new TimelineServiceV2Publisher(
+ rmContext.getRMTimelineCollectorManager());
} else {
// we're dealing with the v.1.x publisher
LOG.info("system metrics publisher with the timeline service V1 is " +
@@ -560,7 +560,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
private ApplicationMasterLauncher applicationMasterLauncher;
private ContainerAllocationExpirer containerAllocationExpirer;
private ResourceManager rm;
- private RMActiveServiceContext activeServiceContext;
private boolean fromActive = false;
private StandByTransitionRunnable standByTransitionRunnable;
@@ -573,9 +572,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected void serviceInit(Configuration configuration) throws Exception {
standByTransitionRunnable = new StandByTransitionRunnable();
- activeServiceContext = new RMActiveServiceContext();
- rmContext.setActiveServiceContext(activeServiceContext);
-
rmSecretManagerService = createRMSecretManagerService();
addService(rmSecretManagerService);
@@ -1149,7 +1145,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
ClusterMetrics.destroy();
QueueMetrics.clearQueueMetrics();
if (initialize) {
- resetDispatcher();
+ resetRMContext();
createAndInitActiveServices(true);
}
}
@@ -1294,7 +1290,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected AdminService createAdminService() {
- return new AdminService(this, rmContext);
+ return new AdminService(this);
}
protected RMSecretManagerService createRMSecretManagerService() {
@@ -1417,17 +1413,24 @@ public class ResourceManager extends CompositeService implements Recoverable {
return dispatcher;
}
- private void resetDispatcher() {
+ private void resetRMContext() {
+ RMContextImpl rmContextImpl = new RMContextImpl();
+ // transfer service context to new RM service Context
+ rmContextImpl.setServiceContext(rmContext.getServiceContext());
+
+ // reset dispatcher
Dispatcher dispatcher = setupDispatcher();
- ((Service)dispatcher).init(this.conf);
- ((Service)dispatcher).start();
- removeService((Service)rmDispatcher);
+ ((Service) dispatcher).init(this.conf);
+ ((Service) dispatcher).start();
+ removeService((Service) rmDispatcher);
// Need to stop previous rmDispatcher before assigning new dispatcher
// otherwise causes "AsyncDispatcher event handler" thread leak
((Service) rmDispatcher).stop();
rmDispatcher = dispatcher;
addIfService(rmDispatcher);
- rmContext.setDispatcher(rmDispatcher);
+ rmContextImpl.setDispatcher(dispatcher);
+
+ rmContext = rmContextImpl;
}
private void setSchedulerRecoveryStartAndWaitTime(RMState state,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
index a8bf6bd..a3a2ebc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@@ -75,9 +74,10 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
private RMTimelineCollectorManager rmTimelineCollectorManager;
private boolean publishContainerEvents;
- public TimelineServiceV2Publisher(RMContext rmContext) {
+ public TimelineServiceV2Publisher(
+ RMTimelineCollectorManager timelineCollectorManager) {
super("TimelineserviceV2Publisher");
- rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager();
+ rmTimelineCollectorManager = timelineCollectorManager;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
index 64c3749..c980458 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
@@ -41,16 +41,16 @@ public class RMTimelineCollectorManager extends TimelineCollectorManager {
private static final Log LOG =
LogFactory.getLog(RMTimelineCollectorManager.class);
- private RMContext rmContext;
+ private ResourceManager rm;
- public RMTimelineCollectorManager(RMContext rmContext) {
+ public RMTimelineCollectorManager(ResourceManager resourceManager) {
super(RMTimelineCollectorManager.class.getName());
- this.rmContext = rmContext;
+ this.rm = resourceManager;
}
@Override
protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
- RMApp app = rmContext.getRMApps().get(appId);
+ RMApp app = rm.getRMContext().getRMApps().get(appId);
if (app == null) {
throw new YarnRuntimeException(
"Unable to get the timeline collector context info for a " +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 23009db..5a215e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -1055,7 +1055,7 @@ public class MockRM extends ResourceManager {
@Override
protected AdminService createAdminService() {
- return new AdminService(this, getRMContext()) {
+ return new AdminService(this) {
@Override
protected void startServer() {
// override to not start rpc handler
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
index 1fe9bbe..140483a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
@@ -122,13 +122,15 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
throws IOException, InterruptedException {
AdminService as = mock(AdminService.class);
RMContext rc = mock(RMContext.class);
+ ResourceManager rm = mock(ResourceManager.class);
Configuration myConf = new Configuration(conf);
myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50);
+ when(rm.getRMContext()).thenReturn(rc);
when(rc.getRMAdminService()).thenReturn(as);
- ActiveStandbyElectorBasedElectorService
- ees = new ActiveStandbyElectorBasedElectorService(rc);
+ ActiveStandbyElectorBasedElectorService ees =
+ new ActiveStandbyElectorBasedElectorService(rm);
ees.init(myConf);
ees.enterNeutralMode();
@@ -290,7 +292,7 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
@Override
protected EmbeddedElector createEmbeddedElector() {
- return new ActiveStandbyElectorBasedElectorService(getRMContext()) {
+ return new ActiveStandbyElectorBasedElectorService(this) {
@Override
public void becomeActive() throws
ServiceFailedException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index f807217..ec6b1e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -71,6 +71,7 @@ public class TestRMHA {
private Log LOG = LogFactory.getLog(TestRMHA.class);
private Configuration configuration;
private MockRM rm = null;
+ private MockNM nm = null;
private RMApp app = null;
private RMAppAttempt attempt = null;
private static final String STATE_ERR =
@@ -135,7 +136,7 @@ public class TestRMHA {
try {
rm.getNewAppId();
- rm.registerNode("127.0.0.1:1", 2048);
+ nm = rm.registerNode("127.0.0.1:1", 2048);
app = rm.submitApp(1024);
attempt = app.getCurrentAppAttempt();
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
@@ -551,6 +552,17 @@ public class TestRMHA {
verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
assertEquals(1, rm.getRMContext().getRMNodes().size());
assertEquals(1, rm.getRMContext().getRMApps().size());
+ Assert.assertNotNull("Node not registered", nm);
+
+ rm.adminService.transitionToStandby(requestInfo);
+ checkMonitorHealth();
+ checkStandbyRMFunctionality();
+ // race condition causes to register/node heartbeat node even after service
+ // is stopping/stopped. New RMContext is being created on every transition
+ // to standby, so metrics should be 0 which indicates new context reference
+ // has taken.
+ nm.registerNode();
+ verifyClusterMetrics(0, 0, 0, 0, 0, 0);
// 3. Create new RM
rm = new MockRM(conf, memStore) {
@@ -592,7 +604,7 @@ public class TestRMHA {
rm = new MockRM(configuration) {
@Override
protected AdminService createAdminService() {
- return new AdminService(this, getRMContext()) {
+ return new AdminService(this) {
int counter = 0;
@Override
protected void setConfig(Configuration conf) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3153284/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
index 2d40c91..ec09945 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -98,10 +99,12 @@ public class TestSystemMetricsPublisherForV2 {
new Path(testRootDir.getAbsolutePath()), true);
}
+ ResourceManager rm = mock(ResourceManager.class);
RMContext rmContext = mock(RMContext.class);
rmAppsMapInContext = new ConcurrentHashMap<ApplicationId, RMApp>();
when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext);
- rmTimelineCollectorManager = new RMTimelineCollectorManager(rmContext);
+ when(rm.getRMContext()).thenReturn(rmContext);
+ rmTimelineCollectorManager = new RMTimelineCollectorManager(rm);
when(rmContext.getRMTimelineCollectorManager()).thenReturn(
rmTimelineCollectorManager);
@@ -113,7 +116,8 @@ public class TestSystemMetricsPublisherForV2 {
dispatcher.init(conf);
dispatcher.start();
- metricsPublisher = new TimelineServiceV2Publisher(rmContext) {
+ metricsPublisher =
+ new TimelineServiceV2Publisher(rmTimelineCollectorManager) {
@Override
protected Dispatcher getDispatcher() {
return dispatcher;
@@ -162,7 +166,7 @@ public class TestSystemMetricsPublisherForV2 {
public void testSystemMetricPublisherInitialization() {
@SuppressWarnings("resource")
TimelineServiceV2Publisher publisher =
- new TimelineServiceV2Publisher(mock(RMContext.class));
+ new TimelineServiceV2Publisher(mock(RMTimelineCollectorManager.class));
try {
Configuration conf = getTimelineV2Conf();
conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED,
@@ -174,7 +178,8 @@ public class TestSystemMetricsPublisherForV2 {
publisher.stop();
- publisher = new TimelineServiceV2Publisher(mock(RMContext.class));
+ publisher = new TimelineServiceV2Publisher(
+ mock(RMTimelineCollectorManager.class));
conf = getTimelineV2Conf();
publisher.init(conf);
assertTrue("Expected to have registered event handlers and set ready to "
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org