You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ar...@apache.org on 2013/12/03 18:33:47 UTC
svn commit: r1547492 - in
/hadoop/common/branches/HDFS-2832/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/
hadoop-yarn/hadoop-y...
Author: arp
Date: Tue Dec 3 17:33:46 2013
New Revision: 1547492
URL: http://svn.apache.org/r1547492
Log:
Merging r1547121 through r1547473 from trunk to branch HDFS-2832
Added:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/RMNotYetActiveException.java
- copied unchanged from r1547473, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/RMNotYetActiveException.java
Removed:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt?rev=1547492&r1=1547491&r2=1547492&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt Tue Dec 3 17:33:46 2013
@@ -15,7 +15,7 @@ Trunk - Unreleased
YARN-524 TestYarnVersionInfo failing if generated properties doesn't
include an SVN URL. (stevel)
-Release 2.3.0 - UNRELEASED
+Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES
@@ -129,6 +129,9 @@ Release 2.3.0 - UNRELEASED
YARN-1241. In Fair Scheduler, maxRunningApps does not work for non-leaf
queues. (Sandy Ryza)
+ YARN-1318. Promoted AdminService to an Always-On service and merged it into
+ RMHAProtocolService. (Karthik Kambatla via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
@@ -188,7 +191,7 @@ Release 2.3.0 - UNRELEASED
YARN-1416. Fixed a few invalid transitions in RMApp, RMAppAttempt and in some
tests. (Jian He via vinodkv)
-Release 2.2.1 - UNRELEASED
+Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1547492&r1=1547491&r2=1547492&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Tue Dec 3 17:33:46 2013
@@ -285,18 +285,6 @@ public class YarnConfiguration extends C
public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids";
public static final String RM_HA_ID = RM_HA_PREFIX + "id";
- @org.apache.hadoop.classification.InterfaceAudience.Private
- // TODO Remove after YARN-1318
- public static final String RM_HA_ADMIN_ADDRESS =
- RM_HA_PREFIX + "admin.address";
- public static final int DEFAULT_RM_HA_ADMIN_PORT = 8034;
- public static String DEFAULT_RM_HA_ADMIN_ADDRESS =
- "0.0.0.0:" + DEFAULT_RM_HA_ADMIN_PORT;
- public static final String RM_HA_ADMIN_CLIENT_THREAD_COUNT =
- RM_HA_PREFIX + "admin.client.thread-count";
- public static final int DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT = 1;
- // end @Private
-
public static final List<String> RM_RPC_ADDRESS_CONF_KEYS =
Collections.unmodifiableList(Arrays.asList(
RM_ADDRESS,
@@ -304,9 +292,7 @@ public class YarnConfiguration extends C
RM_ADMIN_ADDRESS,
RM_RESOURCE_TRACKER_ADDRESS,
RM_WEBAPP_ADDRESS,
- RM_WEBAPP_HTTPS_ADDRESS,
- // TODO Remove after YARN-1318
- RM_HA_ADMIN_ADDRESS));
+ RM_WEBAPP_HTTPS_ADDRESS));
////////////////////////////////
// RM state store configs
@@ -786,11 +772,6 @@ public class YarnConfiguration extends C
public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER =
"security.resourcelocalizer.protocol.acl";
- @org.apache.hadoop.classification.InterfaceAudience.Private
- // TODO Remove after YARN-1318
- public static final String
- YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL =
- CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL;
/** No. of milliseconds to wait between sending a SIGTERM and SIGKILL
* to a running container */
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java?rev=1547492&r1=1547491&r2=1547492&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java Tue Dec 3 17:33:46 2013
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
+import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
@@ -45,25 +46,25 @@ public interface ResourceManagerAdminist
@Public
@Stable
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
- throws YarnException, IOException;
+ throws RMNotYetActiveException, YarnException, IOException;
@Public
@Stable
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
- throws YarnException, IOException;
+ throws RMNotYetActiveException, YarnException, IOException;
@Public
@Stable
public RefreshSuperUserGroupsConfigurationResponse
refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
- throws YarnException, IOException;
+ throws RMNotYetActiveException, YarnException, IOException;
@Public
@Stable
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request)
- throws YarnException, IOException;
+ throws RMNotYetActiveException, YarnException, IOException;
@Public
@Stable
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java?rev=1547492&r1=1547491&r2=1547492&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java Tue Dec 3 17:33:46 2013
@@ -32,9 +32,9 @@ public class RMHAServiceTarget extends H
public RMHAServiceTarget(YarnConfiguration conf)
throws IOException {
haAdminServiceAddress = conf.getSocketAddr(
- YarnConfiguration.RM_HA_ADMIN_ADDRESS,
- YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS,
- YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT);
+ YarnConfiguration.RM_ADMIN_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
}
@Override
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java?rev=1547492&r1=1547491&r2=1547492&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java Tue Dec 3 17:33:46 2013
@@ -21,18 +21,31 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.net.InetSocketAddress;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceStatus;
+import org.apache.hadoop.ha.HealthCheckFailedException;
+import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
+import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
+import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -51,22 +64,20 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
-public class AdminService extends AbstractService implements ResourceManagerAdministrationProtocol {
+public class AdminService extends AbstractService implements
+ HAServiceProtocol, ResourceManagerAdministrationProtocol {
private static final Log LOG = LogFactory.getLog(AdminService.class);
- private final Configuration conf;
- private final ResourceScheduler scheduler;
private final RMContext rmContext;
- private final NodesListManager nodesListManager;
-
- private final ClientRMService clientRMService;
- private final ApplicationMasterService applicationMasterService;
- private final ResourceTrackerService resourceTrackerService;
-
+ private final ResourceManager rm;
+ @VisibleForTesting
+ protected HAServiceProtocol.HAServiceState
+ haState = HAServiceProtocol.HAServiceState.INITIALIZING;
+ boolean haEnabled;
+
private Server server;
private InetSocketAddress masterServiceAddress;
private AccessControlList adminAcl;
@@ -74,23 +85,21 @@ public class AdminService extends Abstra
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
- public AdminService(Configuration conf, ResourceScheduler scheduler,
- RMContext rmContext, NodesListManager nodesListManager,
- ClientRMService clientRMService,
- ApplicationMasterService applicationMasterService,
- ResourceTrackerService resourceTrackerService) {
+ public AdminService(ResourceManager rm, RMContext rmContext) {
super(AdminService.class.getName());
- this.conf = conf;
- this.scheduler = scheduler;
+ this.rm = rm;
this.rmContext = rmContext;
- this.nodesListManager = nodesListManager;
- this.clientRMService = clientRMService;
- this.applicationMasterService = applicationMasterService;
- this.resourceTrackerService = resourceTrackerService;
}
@Override
- public void serviceInit(Configuration conf) throws Exception {
+ public synchronized void serviceInit(Configuration conf) throws Exception {
+ haEnabled = HAUtil.isHAEnabled(conf);
+ if (haEnabled) {
+ HAUtil.verifyAndSetConfiguration(conf);
+ rm.setConf(conf);
+ }
+ rm.createAndInitActiveServices();
+
masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
@@ -102,50 +111,185 @@ public class AdminService extends Abstra
}
@Override
- protected void serviceStart() throws Exception {
+ protected synchronized void serviceStart() throws Exception {
+ if (haEnabled) {
+ transitionToStandby(true);
+ } else {
+ transitionToActive();
+ }
+ startServer();
+ super.serviceStart();
+ }
+
+ @Override
+ protected synchronized void serviceStop() throws Exception {
+ stopServer();
+ transitionToStandby(false);
+ haState = HAServiceState.STOPPING;
+ super.serviceStop();
+ }
+
+ protected void startServer() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
- this.server =
- rpc.getServer(ResourceManagerAdministrationProtocol.class, this, masterServiceAddress,
- conf, null,
- conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
- YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
-
+ this.server = (Server) rpc.getServer(
+ ResourceManagerAdministrationProtocol.class, this, masterServiceAddress,
+ conf, null,
+ conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
+
// Enable service authorization?
if (conf.getBoolean(
- CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(conf, new RMPolicyProvider());
}
+ if (haEnabled) {
+ RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
+ ProtobufRpcEngine.class);
+
+ HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
+ new HAServiceProtocolServerSideTranslatorPB(this);
+ BlockingService haPbService =
+ HAServiceProtocolProtos.HAServiceProtocolService
+ .newReflectiveBlockingService(haServiceProtocolXlator);
+ server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ HAServiceProtocol.class, haPbService);
+ }
+
this.server.start();
conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
- server.getListenerAddress());
- super.serviceStart();
+ server.getListenerAddress());
}
- @Override
- protected void serviceStop() throws Exception {
+ protected void stopServer() throws Exception {
if (this.server != null) {
this.server.stop();
}
- super.serviceStop();
+ }
+
+ private UserGroupInformation checkAccess(String method) throws IOException {
+ return RMServerUtils.verifyAccess(adminAcl, method, LOG);
}
private UserGroupInformation checkAcls(String method) throws YarnException {
try {
- return RMServerUtils.verifyAccess(adminAcl, method, LOG);
+ return checkAccess(method);
} catch (IOException ioe) {
throw RPCUtil.getRemoteException(ioe);
}
}
-
+
+ private synchronized boolean isRMActive() {
+ return HAServiceState.ACTIVE == haState;
+ }
+
+ @Override
+ public synchronized void monitorHealth()
+ throws IOException {
+ checkAccess("monitorHealth");
+ if (haState == HAServiceProtocol.HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) {
+ throw new HealthCheckFailedException(
+ "Active ResourceManager services are not running!");
+ }
+ }
+
+ synchronized void transitionToActive() throws Exception {
+ if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
+ LOG.info("Already in active state");
+ return;
+ }
+
+ LOG.info("Transitioning to active");
+ rm.startActiveServices();
+ haState = HAServiceProtocol.HAServiceState.ACTIVE;
+ LOG.info("Transitioned to active");
+ }
+
+ @Override
+ public synchronized void transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo)
+ throws IOException {
+ UserGroupInformation user = checkAccess("transitionToActive");
+ // TODO (YARN-1177): When automatic failover is enabled,
+ // check if transition should be allowed for this request
+ try {
+ transitionToActive();
+ RMAuditLogger.logSuccess(user.getShortUserName(),
+ "transitionToActive", "RMHAProtocolService");
+ } catch (Exception e) {
+ RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
+ adminAcl.toString(), "RMHAProtocolService",
+ "Exception transitioning to active");
+ throw new ServiceFailedException(
+ "Error when transitioning to Active mode", e);
+ }
+ }
+
+ synchronized void transitionToStandby(boolean initialize)
+ throws Exception {
+ if (haState == HAServiceProtocol.HAServiceState.STANDBY) {
+ LOG.info("Already in standby state");
+ return;
+ }
+
+ LOG.info("Transitioning to standby");
+ if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
+ rm.stopActiveServices();
+ if (initialize) {
+ rm.createAndInitActiveServices();
+ }
+ }
+ haState = HAServiceProtocol.HAServiceState.STANDBY;
+ LOG.info("Transitioned to standby");
+ }
+
+ @Override
+ public synchronized void transitionToStandby(HAServiceProtocol.StateChangeRequestInfo reqInfo)
+ throws IOException {
+ UserGroupInformation user = checkAccess("transitionToStandby");
+ // TODO (YARN-1177): When automatic failover is enabled,
+ // check if transition should be allowed for this request
+ try {
+ transitionToStandby(true);
+ RMAuditLogger.logSuccess(user.getShortUserName(),
+ "transitionToStandby", "RMHAProtocolService");
+ } catch (Exception e) {
+ RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
+ adminAcl.toString(), "RMHAProtocolService",
+ "Exception transitioning to standby");
+ throw new ServiceFailedException(
+ "Error when transitioning to Standby mode", e);
+ }
+ }
+
+ @Override
+ public synchronized HAServiceStatus getServiceStatus() throws IOException {
+ checkAccess("getServiceState");
+ HAServiceStatus ret = new HAServiceStatus(haState);
+ if (haState == HAServiceProtocol.HAServiceState.ACTIVE || haState ==
+ HAServiceProtocol.HAServiceState.STANDBY) {
+ ret.setReadyToBecomeActive();
+ } else {
+ ret.setNotReadyToBecomeActive("State is " + haState);
+ }
+ return ret;
+ }
+
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws YarnException {
UserGroupInformation user = checkAcls("refreshQueues");
+
+ if (!isRMActive()) {
+ RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues",
+ adminAcl.toString(), "AdminService",
+ "ResourceManager is not active. Can not refresh queues.");
+ throw new RMNotYetActiveException();
+ }
+
try {
- scheduler.reinitialize(conf, this.rmContext);
+ rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
RMAuditLogger.logSuccess(user.getShortUserName(), "refreshQueues",
"AdminService");
return recordFactory.newRecordInstance(RefreshQueuesResponse.class);
@@ -162,8 +306,16 @@ public class AdminService extends Abstra
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws YarnException {
UserGroupInformation user = checkAcls("refreshNodes");
+
+ if (!isRMActive()) {
+ RMAuditLogger.logFailure(user.getShortUserName(), "refreshNodes",
+ adminAcl.toString(), "AdminService",
+ "ResourceManager is not active. Can not refresh nodes.");
+ throw new RMNotYetActiveException();
+ }
+
try {
- this.nodesListManager.refreshNodes(new YarnConfiguration());
+ rmContext.getNodesListManager().refreshNodes(new YarnConfiguration());
RMAuditLogger.logSuccess(user.getShortUserName(), "refreshNodes",
"AdminService");
return recordFactory.newRecordInstance(RefreshNodesResponse.class);
@@ -180,7 +332,16 @@ public class AdminService extends Abstra
RefreshSuperUserGroupsConfigurationRequest request)
throws YarnException {
UserGroupInformation user = checkAcls("refreshSuperUserGroupsConfiguration");
-
+
+ // TODO (YARN-1459): Revisit handling super-user-groups on Standby RM
+ if (!isRMActive()) {
+ RMAuditLogger.logFailure(user.getShortUserName(),
+ "refreshSuperUserGroupsConfiguration",
+ adminAcl.toString(), "AdminService",
+ "ResourceManager is not active. Can not refresh super-user-groups.");
+ throw new RMNotYetActiveException();
+ }
+
ProxyUsers.refreshSuperUserGroupsConfiguration(new Configuration());
RMAuditLogger.logSuccess(user.getShortUserName(),
"refreshSuperUserGroupsConfiguration", "AdminService");
@@ -193,7 +354,16 @@ public class AdminService extends Abstra
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request) throws YarnException {
UserGroupInformation user = checkAcls("refreshUserToGroupsMappings");
-
+
+ // TODO (YARN-1459): Revisit handling user-groups on Standby RM
+ if (!isRMActive()) {
+ RMAuditLogger.logFailure(user.getShortUserName(),
+ "refreshUserToGroupsMapping",
+ adminAcl.toString(), "AdminService",
+ "ResourceManager is not active. Can not refresh user-groups.");
+ throw new RMNotYetActiveException();
+ }
+
Groups.getUserToGroupsMappingService().refresh();
RMAuditLogger.logSuccess(user.getShortUserName(),
"refreshUserToGroupsMappings", "AdminService");
@@ -233,9 +403,16 @@ public class AdminService extends Abstra
PolicyProvider policyProvider = new RMPolicyProvider();
refreshServiceAcls(conf, policyProvider);
- clientRMService.refreshServiceAcls(conf, policyProvider);
- applicationMasterService.refreshServiceAcls(conf, policyProvider);
- resourceTrackerService.refreshServiceAcls(conf, policyProvider);
+ if (isRMActive()) {
+ rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
+ rmContext.getApplicationMasterService().refreshServiceAcls(
+ conf, policyProvider);
+ rmContext.getResourceTrackerService().refreshServiceAcls(
+ conf, policyProvider);
+ } else {
+ LOG.warn("ResourceManager is not active. Not refreshing ACLs for " +
+ "Clients, ApplicationMasters and NodeManagers");
+ }
return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
}
@@ -249,5 +426,4 @@ public class AdminService extends Abstra
public String[] getGroupsForUser(String user) throws IOException {
return UserGroupInformation.createRemoteUser(user).getGroupNames();
}
-
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1547492&r1=1547491&r2=1547492&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java Tue Dec 3 17:33:46 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -64,12 +65,22 @@ public interface RMContext {
NMTokenSecretManagerInRM getNMTokenSecretManager();
+ ResourceScheduler getScheduler();
+
+ NodesListManager getNodesListManager();
+
ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager();
-
- void setClientRMService(ClientRMService clientRMService);
-
+
+ AdminService getRMAdminService();
+
ClientRMService getClientRMService();
-
+
+ ApplicationMasterService getApplicationMasterService();
+
+ ResourceTrackerService getResourceTrackerService();
+
+ void setClientRMService(ClientRMService clientRMService);
+
RMDelegationTokenSecretManager getRMDelegationTokenSecretManager();
void setRMDelegationTokenSecretManager(
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1547492&r1=1547491&r2=1547492&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Tue Dec 3 17:33:46 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -42,7 +43,7 @@ import com.google.common.annotations.Vis
public class RMContextImpl implements RMContext {
- private final Dispatcher rmDispatcher;
+ private Dispatcher rmDispatcher;
private final ConcurrentMap<ApplicationId, RMApp> applications
= new ConcurrentHashMap<ApplicationId, RMApp>();
@@ -57,34 +58,25 @@ public class RMContextImpl implements RM
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore stateStore = null;
private ContainerAllocationExpirer containerAllocationExpirer;
- private final DelegationTokenRenewer delegationTokenRenewer;
- private final AMRMTokenSecretManager amRMTokenSecretManager;
- private final RMContainerTokenSecretManager containerTokenSecretManager;
- private final NMTokenSecretManagerInRM nmTokenSecretManager;
- private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
+ private DelegationTokenRenewer delegationTokenRenewer;
+ private AMRMTokenSecretManager amRMTokenSecretManager;
+ private RMContainerTokenSecretManager containerTokenSecretManager;
+ private NMTokenSecretManagerInRM nmTokenSecretManager;
+ private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
+ private AdminService adminService;
private ClientRMService clientRMService;
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
+ private ResourceScheduler scheduler;
+ private NodesListManager nodesListManager;
+ private ResourceTrackerService resourceTrackerService;
+ private ApplicationMasterService applicationMasterService;
+
+ /**
+ * Default constructor. To be used in conjunction with setter methods for
+ * individual fields.
+ */
+ public RMContextImpl() {
- public RMContextImpl(Dispatcher rmDispatcher,
- RMStateStore store,
- ContainerAllocationExpirer containerAllocationExpirer,
- AMLivelinessMonitor amLivelinessMonitor,
- AMLivelinessMonitor amFinishingMonitor,
- DelegationTokenRenewer delegationTokenRenewer,
- AMRMTokenSecretManager amRMTokenSecretManager,
- RMContainerTokenSecretManager containerTokenSecretManager,
- NMTokenSecretManagerInRM nmTokenSecretManager,
- ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
- this.rmDispatcher = rmDispatcher;
- this.stateStore = store;
- this.containerAllocationExpirer = containerAllocationExpirer;
- this.amLivelinessMonitor = amLivelinessMonitor;
- this.amFinishingMonitor = amFinishingMonitor;
- this.delegationTokenRenewer = delegationTokenRenewer;
- this.amRMTokenSecretManager = amRMTokenSecretManager;
- this.containerTokenSecretManager = containerTokenSecretManager;
- this.nmTokenSecretManager = nmTokenSecretManager;
- this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
}
@VisibleForTesting
@@ -98,10 +90,17 @@ public class RMContextImpl implements RM
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
- this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor,
- amFinishingMonitor, delegationTokenRenewer, appTokenSecretManager,
- containerTokenSecretManager, nmTokenSecretManager,
- clientToAMTokenSecretManager);
+ this();
+ this.setDispatcher(rmDispatcher);
+ this.setContainerAllocationExpirer(containerAllocationExpirer);
+ this.setAMLivelinessMonitor(amLivelinessMonitor);
+ this.setAMFinishingMonitor(amFinishingMonitor);
+ this.setDelegationTokenRenewer(delegationTokenRenewer);
+ this.setAMRMTokenSecretManager(appTokenSecretManager);
+ this.setContainerTokenSecretManager(containerTokenSecretManager);
+ this.setNMTokenSecretManager(nmTokenSecretManager);
+ this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
+
RMStateStore nullStore = new NullRMStateStore();
nullStore.setRMDispatcher(rmDispatcher);
try {
@@ -171,12 +170,27 @@ public class RMContextImpl implements RM
public NMTokenSecretManagerInRM getNMTokenSecretManager() {
return this.nmTokenSecretManager;
}
-
+
+ @Override
+ public ResourceScheduler getScheduler() {
+ return this.scheduler;
+ }
+
+ @Override
+ public NodesListManager getNodesListManager() {
+ return this.nodesListManager;
+ }
+
@Override
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
return this.clientToAMTokenSecretManager;
}
-
+
+ @Override
+ public AdminService getRMAdminService() {
+ return this.adminService;
+ }
+
@VisibleForTesting
public void setStateStore(RMStateStore store) {
stateStore = store;
@@ -186,7 +200,25 @@ public class RMContextImpl implements RM
public ClientRMService getClientRMService() {
return this.clientRMService;
}
-
+
+ @Override
+ public ApplicationMasterService getApplicationMasterService() {
+ return applicationMasterService;
+ }
+
+ @Override
+ public ResourceTrackerService getResourceTrackerService() {
+ return resourceTrackerService;
+ }
+
+ void setDispatcher(Dispatcher dispatcher) {
+ this.rmDispatcher = dispatcher;
+ }
+
+ void setRMAdminService(AdminService adminService) {
+ this.adminService = adminService;
+ }
+
@Override
public void setClientRMService(ClientRMService clientRMService) {
this.clientRMService = clientRMService;
@@ -202,4 +234,60 @@ public class RMContextImpl implements RM
RMDelegationTokenSecretManager delegationTokenSecretManager) {
this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
}
+
+ void setContainerAllocationExpirer(
+ ContainerAllocationExpirer containerAllocationExpirer) {
+ this.containerAllocationExpirer = containerAllocationExpirer;
+ }
+
+ void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) {
+ this.amLivelinessMonitor = amLivelinessMonitor;
+ }
+
+ void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) {
+ this.amFinishingMonitor = amFinishingMonitor;
+ }
+
+ void setContainerTokenSecretManager(
+ RMContainerTokenSecretManager containerTokenSecretManager) {
+ this.containerTokenSecretManager = containerTokenSecretManager;
+ }
+
+ void setNMTokenSecretManager(
+ NMTokenSecretManagerInRM nmTokenSecretManager) {
+ this.nmTokenSecretManager = nmTokenSecretManager;
+ }
+
+ void setScheduler(ResourceScheduler scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ void setDelegationTokenRenewer(
+ DelegationTokenRenewer delegationTokenRenewer) {
+ this.delegationTokenRenewer = delegationTokenRenewer;
+ }
+
+ void setClientToAMTokenSecretManager(
+ ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
+ this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
+ }
+
+ void setAMRMTokenSecretManager(
+ AMRMTokenSecretManager amRMTokenSecretManager) {
+ this.amRMTokenSecretManager = amRMTokenSecretManager;
+ }
+
+ void setNodesListManager(NodesListManager nodesListManager) {
+ this.nodesListManager = nodesListManager;
+ }
+
+ void setApplicationMasterService(
+ ApplicationMasterService applicationMasterService) {
+ this.applicationMasterService = applicationMasterService;
+ }
+
+ void setResourceTrackerService(
+ ResourceTrackerService resourceTrackerService) {
+ this.resourceTrackerService = resourceTrackerService;
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1547492&r1=1547491&r2=1547492&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Tue Dec 3 17:33:46 2013
@@ -118,7 +118,9 @@ public class ResourceManager extends Com
* the HA state of the RM.
*/
@VisibleForTesting
- protected RMHAProtocolService haService;
+ protected RMContextImpl rmContext;
+ @VisibleForTesting
+ protected AdminService adminService;
/**
* "Active" services. Services that need to run only on the Active RM.
@@ -129,8 +131,7 @@ public class ResourceManager extends Com
* in Active state.
*/
protected RMActiveServices activeServices;
- protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager =
- new ClientToAMTokenSecretManagerInRM();
+ protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager;
protected RMContainerTokenSecretManager containerTokenSecretManager;
protected NMTokenSecretManagerInRM nmTokenSecretManager;
@@ -143,7 +144,6 @@ public class ResourceManager extends Com
private ClientRMService clientRM;
protected ApplicationMasterService masterService;
private ApplicationMasterLauncher applicationMasterLauncher;
- private AdminService adminService;
private ContainerAllocationExpirer containerAllocationExpirer;
protected NMLivelinessMonitor nmLivelinessMonitor;
protected NodesListManager nodesListManager;
@@ -154,7 +154,6 @@ public class ResourceManager extends Com
protected RMDelegationTokenSecretManager rmDTSecretManager;
private DelegationTokenRenewer delegationTokenRenewer;
private WebApp webApp;
- protected RMContext rmContext;
protected ResourceTrackerService resourceTracker;
private boolean recoveryEnabled;
@@ -166,10 +165,6 @@ public class ResourceManager extends Com
super("ResourceManager");
}
- public RMHAProtocolService getHAService() {
- return this.haService;
- }
-
public RMContext getRMContext() {
return this.rmContext;
}
@@ -187,9 +182,12 @@ public class ResourceManager extends Com
protected void serviceInit(Configuration conf) throws Exception {
validateConfigs(conf);
this.conf = conf;
+ this.rmContext = new RMContextImpl();
+
+ adminService = createAdminService();
+ addService(adminService);
+ rmContext.setRMAdminService(adminService);
- haService = createRMHAProtocolService();
- addService(haService);
super.serviceInit(conf);
}
@@ -201,11 +199,7 @@ public class ResourceManager extends Com
@VisibleForTesting
protected void setRMStateStore(RMStateStore rmStore) {
rmStore.setRMDispatcher(rmDispatcher);
- ((RMContextImpl) rmContext).setStateStore(rmStore);
- }
-
- protected RMHAProtocolService createRMHAProtocolService() {
- return new RMHAProtocolService(this);
+ rmContext.setStateStore(rmStore);
}
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
@@ -224,7 +218,8 @@ public class ResourceManager extends Com
protected RMStateStoreOperationFailedEventDispatcher
createRMStateStoreOperationFailedEventDispatcher() {
- return new RMStateStoreOperationFailedEventDispatcher(haService);
+ return new RMStateStoreOperationFailedEventDispatcher(
+ rmContext.getRMAdminService());
}
protected Dispatcher createDispatcher() {
@@ -319,20 +314,31 @@ public class ResourceManager extends Com
rmDispatcher = createDispatcher();
addIfService(rmDispatcher);
+ rmContext.setDispatcher(rmDispatcher);
+
+ clientToAMSecretManager = new ClientToAMTokenSecretManagerInRM();
+ rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager);
amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
+ rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager);
containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
addService(containerAllocationExpirer);
+ rmContext.setContainerAllocationExpirer(containerAllocationExpirer);
AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
addService(amLivelinessMonitor);
+ rmContext.setAMLivelinessMonitor(amLivelinessMonitor);
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
addService(amFinishingMonitor);
+ rmContext.setAMFinishingMonitor(amFinishingMonitor);
containerTokenSecretManager = createContainerTokenSecretManager(conf);
+ rmContext.setContainerTokenSecretManager(containerTokenSecretManager);
+
nmTokenSecretManager = createNMTokenSecretManager(conf);
+ rmContext.setNMTokenSecretManager(nmTokenSecretManager);
boolean isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
@@ -358,24 +364,23 @@ public class ResourceManager extends Com
LOG.error("Failed to init state store", e);
ExitUtil.terminate(1, e);
}
+ rmContext.setStateStore(rmStore);
if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenRenewer = createDelegationTokenRenewer();
+ rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
}
- rmContext = new RMContextImpl(
- rmDispatcher, rmStore, containerAllocationExpirer, amLivelinessMonitor,
- amFinishingMonitor, delegationTokenRenewer, amRmTokenSecretManager,
- containerTokenSecretManager, nmTokenSecretManager,
- clientToAMSecretManager);
-
// Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
addService(nodesListManager);
+ rmContext.setNodesListManager(nodesListManager);
// Initialize the scheduler
scheduler = createScheduler();
+ rmContext.setScheduler(scheduler);
+
schedulerDispatcher = createSchedulerEventDispatcher();
addIfService(schedulerDispatcher);
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
@@ -397,6 +402,7 @@ public class ResourceManager extends Com
resourceTracker = createResourceTrackerService();
addService(resourceTracker);
+ rmContext.setResourceTrackerService(resourceTracker);
DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics.initSingleton("ResourceManager", null);
@@ -412,6 +418,7 @@ public class ResourceManager extends Com
masterService = createApplicationMasterService();
addService(masterService) ;
+ rmContext.setApplicationMasterService(masterService);
applicationACLsManager = new ApplicationACLsManager(conf);
@@ -422,12 +429,11 @@ public class ResourceManager extends Com
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
rmDTSecretManager = createRMDelegationTokenSecretManager(rmContext);
rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager);
+
clientRM = createClientRMService();
rmContext.setClientRMService(clientRM);
addService(clientRM);
-
- adminService = createAdminService(clientRM, masterService, resourceTracker);
- addService(adminService);
+ rmContext.setClientRMService(clientRM);
applicationMasterLauncher = createAMLauncher();
rmDispatcher.register(AMLauncherEventType.class,
@@ -649,11 +655,11 @@ public class ResourceManager extends Com
@Private
public static class RMStateStoreOperationFailedEventDispatcher implements
EventHandler<RMStateStoreOperationFailedEvent> {
- private final RMHAProtocolService haService;
+ private final AdminService adminService;
public RMStateStoreOperationFailedEventDispatcher(
- RMHAProtocolService haService) {
- this.haService = haService;
+ AdminService adminService) {
+ this.adminService = adminService;
}
@Override
@@ -665,12 +671,12 @@ public class ResourceManager extends Com
}
if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) {
LOG.info("RMStateStore has been fenced");
- synchronized(haService) {
- if (haService.haEnabled) {
+ synchronized(adminService) {
+ if (adminService.haEnabled) {
try {
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
- haService.transitionToStandby(true);
+ adminService.transitionToStandby(true);
return;
} catch (Exception e) {
LOG.error("Failed to transition RM to Standby mode.");
@@ -853,6 +859,9 @@ public class ResourceManager extends Com
if (activeServices != null) {
activeServices.stop();
activeServices = null;
+ rmContext.getRMNodes().clear();
+ rmContext.getInactiveRMNodes().clear();
+ rmContext.getRMApps().clear();
}
}
@@ -913,13 +922,8 @@ public class ResourceManager extends Com
return new ApplicationMasterService(this.rmContext, scheduler);
}
- protected AdminService createAdminService(
- ClientRMService clientRMService,
- ApplicationMasterService applicationMasterService,
- ResourceTrackerService resourceTrackerService) {
- return new AdminService(this.conf, scheduler, rmContext,
- this.nodesListManager, clientRMService, applicationMasterService,
- resourceTrackerService);
+ protected AdminService createAdminService() {
+ return new AdminService(this, rmContext);
}
@Private
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java?rev=1547492&r1=1547491&r2=1547492&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java Tue Dec 3 17:33:46 2013
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.server.api
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RMPolicyProvider extends PolicyProvider {
-
+
private static final Service[] resourceManagerServices =
new Service[] {
new Service(
@@ -53,9 +53,6 @@ public class RMPolicyProvider extends Po
new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL,
ContainerManagementProtocolPB.class),
- new Service(
- YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL,
- HAServiceProtocol.class),
};
@Override
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1547492&r1=1547491&r2=1547492&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Tue Dec 3 17:33:46 2013
@@ -307,16 +307,6 @@ public class MockRM extends ResourceMana
}
@Override
- protected RMHAProtocolService createRMHAProtocolService() {
- return new RMHAProtocolService(this) {
- @Override
- protected void startHAAdminServer() {
- // do nothing
- }
- };
- }
-
- @Override
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler(),
rmAppManager, applicationACLsManager, queueACLsManager,
@@ -391,19 +381,15 @@ public class MockRM extends ResourceMana
}
@Override
- protected AdminService createAdminService(ClientRMService clientRMService,
- ApplicationMasterService applicationMasterService,
- ResourceTrackerService resourceTrackerService) {
- return new AdminService(getConfig(), scheduler, getRMContext(),
- this.nodesListManager, clientRMService, applicationMasterService,
- resourceTrackerService) {
+ protected AdminService createAdminService() {
+ return new AdminService(this, getRMContext()) {
@Override
- protected void serviceStart() {
+ protected void startServer() {
// override to not start rpc handler
}
@Override
- protected void serviceStop() {
+ protected void stopServer() {
// don't do anything
}
};
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java?rev=1547492&r1=1547491&r2=1547492&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java Tue Dec 3 17:33:46 2013
@@ -62,7 +62,7 @@ public class TestRMHA {
private void checkMonitorHealth() throws IOException {
try {
- rm.haService.monitorHealth();
+ rm.adminService.monitorHealth();
} catch (HealthCheckFailedException e) {
fail("The RM is in bad health: it is Active, but the active services " +
"are not running");
@@ -71,20 +71,20 @@ public class TestRMHA {
private void checkStandbyRMFunctionality() throws IOException {
assertEquals(STATE_ERR, HAServiceState.STANDBY,
- rm.haService.getServiceStatus().getState());
+ rm.adminService.getServiceStatus().getState());
assertFalse("Active RM services are started",
rm.areActiveServicesRunning());
assertTrue("RM is not ready to become active",
- rm.haService.getServiceStatus().isReadyToBecomeActive());
+ rm.adminService.getServiceStatus().isReadyToBecomeActive());
}
private void checkActiveRMFunctionality() throws IOException {
assertEquals(STATE_ERR, HAServiceState.ACTIVE,
- rm.haService.getServiceStatus().getState());
+ rm.adminService.getServiceStatus().getState());
assertTrue("Active RM services aren't started",
rm.areActiveServicesRunning());
assertTrue("RM is not ready to become active",
- rm.haService.getServiceStatus().isReadyToBecomeActive());
+ rm.adminService.getServiceStatus().isReadyToBecomeActive());
try {
rm.getNewAppId();
@@ -113,9 +113,9 @@ public class TestRMHA {
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
assertEquals(STATE_ERR, HAServiceState.INITIALIZING,
- rm.haService.getServiceStatus().getState());
+ rm.adminService.getServiceStatus().getState());
assertFalse("RM is ready to become active before being started",
- rm.haService.getServiceStatus().isReadyToBecomeActive());
+ rm.adminService.getServiceStatus().isReadyToBecomeActive());
checkMonitorHealth();
rm.start();
@@ -123,27 +123,27 @@ public class TestRMHA {
checkStandbyRMFunctionality();
// 1. Transition to Standby - must be a no-op
- rm.haService.transitionToStandby(requestInfo);
+ rm.adminService.transitionToStandby(requestInfo);
checkMonitorHealth();
checkStandbyRMFunctionality();
// 2. Transition to active
- rm.haService.transitionToActive(requestInfo);
+ rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
// 3. Transition to active - no-op
- rm.haService.transitionToActive(requestInfo);
+ rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
// 4. Transition to standby
- rm.haService.transitionToStandby(requestInfo);
+ rm.adminService.transitionToStandby(requestInfo);
checkMonitorHealth();
checkStandbyRMFunctionality();
// 5. Transition to active to check Active->Standby->Active works
- rm.haService.transitionToActive(requestInfo);
+ rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
@@ -151,9 +151,9 @@ public class TestRMHA {
// become active
rm.stop();
assertEquals(STATE_ERR, HAServiceState.STOPPING,
- rm.haService.getServiceStatus().getState());
+ rm.adminService.getServiceStatus().getState());
assertFalse("RM is ready to become active even after it is stopped",
- rm.haService.getServiceStatus().isReadyToBecomeActive());
+ rm.adminService.getServiceStatus().isReadyToBecomeActive());
assertFalse("Active RM services are started",
rm.areActiveServicesRunning());
checkMonitorHealth();
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java?rev=1547492&r1=1547491&r2=1547492&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java Tue Dec 3 17:33:46 2013
@@ -129,7 +129,8 @@ public class TestZKRMStateStore extends
for (String rpcAddress : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
conf.set(HAUtil.addSuffix(rpcAddress, rmId), "localhost:0");
}
- conf.set(YarnConfiguration.RM_HA_ADMIN_ADDRESS, "localhost:" + adminPort);
+ conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId),
+ "localhost:" + adminPort);
return conf;
}
@@ -143,23 +144,23 @@ public class TestZKRMStateStore extends
ResourceManager rm1 = new ResourceManager();
rm1.init(conf1);
rm1.start();
- rm1.getHAService().transitionToActive(req);
+ rm1.getRMContext().getRMAdminService().transitionToActive(req);
assertEquals("RM with ZKStore didn't start",
Service.STATE.STARTED, rm1.getServiceState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
- rm1.getHAService().getServiceStatus().getState());
+ rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
Configuration conf2 = createHARMConf("rm1,rm2", "rm2", 5678);
ResourceManager rm2 = new ResourceManager();
rm2.init(conf2);
rm2.start();
- rm2.getHAService().transitionToActive(req);
+ rm2.getRMContext().getRMAdminService().transitionToActive(req);
assertEquals("RM with ZKStore didn't start",
Service.STATE.STARTED, rm2.getServiceState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
- rm2.getHAService().getServiceStatus().getState());
+ rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
// Submitting an application to RM1 to trigger a state store operation.
// RM1 should realize that it got fenced and is not the Active RM anymore.
@@ -181,16 +182,16 @@ public class TestZKRMStateStore extends
rmService.submitApplication(SubmitApplicationRequest.newInstance(asc));
for (int i = 0; i < 30; i++) {
- if (HAServiceProtocol.HAServiceState.ACTIVE == rm1.getHAService()
- .getServiceStatus().getState()) {
+ if (HAServiceProtocol.HAServiceState.ACTIVE ==
+ rm1.getRMContext().getRMAdminService().getServiceStatus().getState()) {
Thread.sleep(100);
}
}
assertEquals("RM should have been fenced",
HAServiceProtocol.HAServiceState.STANDBY,
- rm1.getHAService().getServiceStatus().getState());
+ rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
- rm2.getHAService().getServiceStatus().getState());
+ rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1547492&r1=1547491&r2=1547492&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Tue Dec 3 17:33:46 2013
@@ -179,12 +179,13 @@ public class TestRMAppTransitions {
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
store = mock(RMStateStore.class);
this.rmContext =
- new RMContextImpl(rmDispatcher, store,
+ new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, new AMRMTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM());
+ ((RMContextImpl)rmContext).setStateStore(store);
rmDispatcher.register(RMAppAttemptEventType.class,
new TestApplicationAttemptEventDispatcher(this.rmContext));