You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ji...@apache.org on 2014/02/07 02:57:30 UTC
svn commit: r1565515 [2/3] - in
/hadoop/common/branches/HDFS-5698/hadoop-yarn-project: ./ hadoop-yarn/bin/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop...
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java Fri Feb 7 01:57:21 2014
@@ -40,7 +40,7 @@ public class TestApplicationHistoryServe
Configuration config = new YarnConfiguration();
historyServer.init(config);
assertEquals(STATE.INITED, historyServer.getServiceState());
- assertEquals(2, historyServer.getServices().size());
+ assertEquals(3, historyServer.getServices().size());
ApplicationHistoryClientService historyService =
historyServer.getClientService();
assertNotNull(historyServer.getClientService());
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java Fri Feb 7 01:57:21 2014
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.io.Text;
@@ -312,7 +313,7 @@ public class BuilderUtils {
String url, long startTime, long finishTime,
FinalApplicationStatus finalStatus,
ApplicationResourceUsageReport appResources, String origTrackingUrl,
- float progress, String appType, Token amRmToken) {
+ float progress, String appType, Token amRmToken, Set<String> tags) {
ApplicationReport report = recordFactory
.newRecordInstance(ApplicationReport.class);
report.setApplicationId(applicationId);
@@ -334,6 +335,7 @@ public class BuilderUtils {
report.setProgress(progress);
report.setApplicationType(appType);
report.setAMRMToken(amRmToken);
+ report.setApplicationTags(tags);
return report;
}
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java Fri Feb 7 01:57:21 2014
@@ -45,8 +45,11 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.conf.ConfigurationProvider;
+import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -72,6 +75,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
public class AdminService extends CompositeService implements
@@ -89,6 +93,8 @@ public class AdminService extends Compos
private InetSocketAddress masterServiceAddress;
private AccessControlList adminAcl;
+ private ConfigurationProvider configurationProvider = null;
+
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -109,6 +115,10 @@ public class AdminService extends Compos
}
}
+ this.configurationProvider =
+ ConfigurationProviderFactory.getConfigurationProvider(conf);
+ configurationProvider.init(conf);
+
masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
@@ -129,6 +139,9 @@ public class AdminService extends Compos
@Override
protected synchronized void serviceStop() throws Exception {
stopServer();
+ if (this.configurationProvider != null) {
+ configurationProvider.close();
+ }
super.serviceStop();
}
@@ -295,23 +308,28 @@ public class AdminService extends Compos
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws YarnException, StandbyException {
- UserGroupInformation user = checkAcls("refreshQueues");
+ String argName = "refreshQueues";
+ UserGroupInformation user = checkAcls(argName);
if (!isRMActive()) {
- RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues",
+ RMAuditLogger.logFailure(user.getShortUserName(), argName,
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not refresh queues.");
throwStandbyException();
}
+ RefreshQueuesResponse response =
+ recordFactory.newRecordInstance(RefreshQueuesResponse.class);
try {
- rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
- RMAuditLogger.logSuccess(user.getShortUserName(), "refreshQueues",
+ Configuration conf =
+ getConfiguration(YarnConfiguration.CS_CONFIGURATION_FILE);
+ rmContext.getScheduler().reinitialize(conf, this.rmContext);
+ RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
- return recordFactory.newRecordInstance(RefreshQueuesResponse.class);
+ return response;
} catch (IOException ioe) {
LOG.info("Exception refreshing queues ", ioe);
- RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues",
+ RMAuditLogger.logFailure(user.getShortUserName(), argName,
adminAcl.toString(), "AdminService",
"Exception refreshing queues");
throw RPCUtil.getRemoteException(ioe);
@@ -346,21 +364,22 @@ public class AdminService extends Compos
@Override
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
- throws YarnException, StandbyException {
- UserGroupInformation user = checkAcls("refreshSuperUserGroupsConfiguration");
+ throws YarnException, IOException {
+ String argName = "refreshSuperUserGroupsConfiguration";
+ UserGroupInformation user = checkAcls(argName);
- // TODO (YARN-1459): Revisit handling super-user-groups on Standby RM
if (!isRMActive()) {
- RMAuditLogger.logFailure(user.getShortUserName(),
- "refreshSuperUserGroupsConfiguration",
+ RMAuditLogger.logFailure(user.getShortUserName(), argName,
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not refresh super-user-groups.");
throwStandbyException();
}
- ProxyUsers.refreshSuperUserGroupsConfiguration(new Configuration());
+ Configuration conf =
+ getConfiguration(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
+ ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
RMAuditLogger.logSuccess(user.getShortUserName(),
- "refreshSuperUserGroupsConfiguration", "AdminService");
+ argName, "AdminService");
return recordFactory.newRecordInstance(
RefreshSuperUserGroupsConfigurationResponse.class);
@@ -391,14 +410,22 @@ public class AdminService extends Compos
@Override
public RefreshAdminAclsResponse refreshAdminAcls(
- RefreshAdminAclsRequest request) throws YarnException {
- UserGroupInformation user = checkAcls("refreshAdminAcls");
+ RefreshAdminAclsRequest request) throws YarnException, IOException {
+ String argName = "refreshAdminAcls";
+ UserGroupInformation user = checkAcls(argName);
- Configuration conf = new Configuration();
+ if (!isRMActive()) {
+ RMAuditLogger.logFailure(user.getShortUserName(), argName,
+ adminAcl.toString(), "AdminService",
+ "ResourceManager is not active. Can not refresh user-groups.");
+ throwStandbyException();
+ }
+ Configuration conf =
+ getConfiguration(YarnConfiguration.YARN_SITE_XML_FILE);
adminAcl = new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
- RMAuditLogger.logSuccess(user.getShortUserName(), "refreshAdminAcls",
+ RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
return recordFactory.newRecordInstance(RefreshAdminAclsResponse.class);
@@ -406,9 +433,8 @@ public class AdminService extends Compos
@Override
public RefreshServiceAclsResponse refreshServiceAcls(
- RefreshServiceAclsRequest request) throws YarnException {
- Configuration conf = new Configuration();
- if (!conf.getBoolean(
+ RefreshServiceAclsRequest request) throws YarnException, IOException {
+ if (!getConfig().getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
throw RPCUtil.getRemoteException(
@@ -416,27 +442,38 @@ public class AdminService extends Compos
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION +
") not enabled."));
}
-
+
+ String argName = "refreshServiceAcls";
+ if (!isRMActive()) {
+ RMAuditLogger.logFailure(UserGroupInformation.getCurrentUser()
+ .getShortUserName(), argName,
+ adminAcl.toString(), "AdminService",
+ "ResourceManager is not active. Can not refresh Service ACLs.");
+ throwStandbyException();
+ }
+
PolicyProvider policyProvider = new RMPolicyProvider();
-
+ Configuration conf =
+ getConfiguration(YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
+
refreshServiceAcls(conf, policyProvider);
- if (isRMActive()) {
- rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
- rmContext.getApplicationMasterService().refreshServiceAcls(
- conf, policyProvider);
- rmContext.getResourceTrackerService().refreshServiceAcls(
- conf, policyProvider);
- } else {
- LOG.warn("ResourceManager is not active. Not refreshing ACLs for " +
- "Clients, ApplicationMasters and NodeManagers");
- }
+ rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
+ rmContext.getApplicationMasterService().refreshServiceAcls(
+ conf, policyProvider);
+ rmContext.getResourceTrackerService().refreshServiceAcls(
+ conf, policyProvider);
return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
}
- void refreshServiceAcls(Configuration configuration,
+ synchronized void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
- this.server.refreshServiceAcl(configuration, policyProvider);
+ if (this.configurationProvider instanceof LocalConfigurationProvider) {
+ this.server.refreshServiceAcl(configuration, policyProvider);
+ } else {
+ this.server.refreshServiceAclWithConfigration(configuration,
+ policyProvider);
+ }
}
@Override
@@ -483,5 +520,19 @@ public class AdminService extends Compos
UpdateNodeResourceResponse.class);
return response;
}
-
+
+ private synchronized Configuration getConfiguration(String confFileName)
+ throws YarnException, IOException {
+ return this.configurationProvider.getConfiguration(confFileName);
+ }
+
+ @VisibleForTesting
+ public AccessControlList getAccessControlList() {
+ return this.adminAcl;
+ }
+
+ @VisibleForTesting
+ public Server getServer() {
+ return this.server;
+ }
}
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Fri Feb 7 01:57:21 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -86,6 +87,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import com.google.common.annotations.VisibleForTesting;
+
@SuppressWarnings("unchecked")
@Private
public class ApplicationMasterService extends AbstractService implements
@@ -102,6 +105,7 @@ public class ApplicationMasterService ex
private final AllocateResponse resync =
recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext;
+ private boolean useLocalConfigurationProvider;
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
super(ApplicationMasterService.class.getName());
@@ -112,6 +116,15 @@ public class ApplicationMasterService ex
}
@Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ this.useLocalConfigurationProvider =
+ (LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
+ YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+ LocalConfigurationProvider.class)));
+ super.serviceInit(conf);
+ }
+
+ @Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
@@ -578,7 +591,12 @@ public class ApplicationMasterService ex
public void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
- this.server.refreshServiceAcl(configuration, policyProvider);
+ if (this.useLocalConfigurationProvider) {
+ this.server.refreshServiceAcl(configuration, policyProvider);
+ } else {
+ this.server.refreshServiceAclWithConfigration(configuration,
+ policyProvider);
+ }
}
@Override
@@ -604,4 +622,9 @@ public class ApplicationMasterService ex
this.response = response;
}
}
+
+ @VisibleForTesting
+ public Server getServer() {
+ return this.server;
+ }
}
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Fri Feb 7 01:57:21 2014
@@ -43,7 +43,9 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -94,6 +96,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -104,6 +108,10 @@ import org.apache.hadoop.yarn.server.sec
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+
/**
* The client interface to the Resource Manager. This module handles all the rpc
@@ -128,6 +136,7 @@ public class ClientRMService extends Abs
private final ApplicationACLsManager applicationsACLsManager;
private final QueueACLsManager queueACLsManager;
+ private boolean useLocalConfigurationProvider;
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
@@ -145,6 +154,10 @@ public class ClientRMService extends Abs
@Override
protected void serviceInit(Configuration conf) throws Exception {
clientBindAddress = getBindAddress(conf);
+ this.useLocalConfigurationProvider =
+ (LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
+ YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+ LocalConfigurationProvider.class)));
super.serviceInit(conf);
}
@@ -433,9 +446,11 @@ public class ClientRMService extends Abs
request.getApplicationStates();
Set<String> users = request.getUsers();
Set<String> queues = request.getQueues();
+ Set<String> tags = request.getApplicationTags();
long limit = request.getLimit();
LongRange start = request.getStartRange();
LongRange finish = request.getFinishRange();
+ ApplicationsRequestScope scope = request.getScope();
final Map<ApplicationId, RMApp> apps = rmContext.getRMApps();
Iterator<RMApp> appsIter;
@@ -482,6 +497,17 @@ public class ClientRMService extends Abs
List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
while (appsIter.hasNext() && reports.size() < limit) {
RMApp application = appsIter.next();
+
+ // Check if current application falls under the specified scope
+ boolean allowAccess = checkAccess(callerUGI, application.getUser(),
+ ApplicationAccessType.VIEW_APP, application);
+ if (scope == ApplicationsRequestScope.OWN &&
+ !callerUGI.getUserName().equals(application.getUser())) {
+ continue;
+ } else if (scope == ApplicationsRequestScope.VIEWABLE && !allowAccess) {
+ continue;
+ }
+
if (applicationTypes != null && !applicationTypes.isEmpty()) {
String appTypeToMatch = caseSensitive
? application.getApplicationType()
@@ -511,8 +537,23 @@ public class ClientRMService extends Abs
continue;
}
- boolean allowAccess = checkAccess(callerUGI, application.getUser(),
- ApplicationAccessType.VIEW_APP, application);
+ if (tags != null && !tags.isEmpty()) {
+ Set<String> appTags = application.getApplicationTags();
+ if (appTags == null || appTags.isEmpty()) {
+ continue;
+ }
+ boolean match = false;
+ for (String tag : tags) {
+ if (appTags.contains(tag)) {
+ match = true;
+ break;
+ }
+ }
+ if (!match) {
+ continue;
+ }
+ }
+
reports.add(application.createAndGetApplicationReport(
callerUGI.getUserName(), allowAccess));
}
@@ -686,10 +727,74 @@ public class ClientRMService extends Abs
}
}
+ @SuppressWarnings("unchecked")
@Override
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
MoveApplicationAcrossQueuesRequest request) throws YarnException {
- throw new UnsupportedOperationException("Move not yet supported");
+ ApplicationId applicationId = request.getApplicationId();
+
+ UserGroupInformation callerUGI;
+ try {
+ callerUGI = UserGroupInformation.getCurrentUser();
+ } catch (IOException ie) {
+ LOG.info("Error getting UGI ", ie);
+ RMAuditLogger.logFailure("UNKNOWN", AuditConstants.MOVE_APP_REQUEST,
+ "UNKNOWN", "ClientRMService" , "Error getting UGI",
+ applicationId);
+ throw RPCUtil.getRemoteException(ie);
+ }
+
+ RMApp application = this.rmContext.getRMApps().get(applicationId);
+ if (application == null) {
+ RMAuditLogger.logFailure(callerUGI.getUserName(),
+ AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService",
+ "Trying to move an absent application", applicationId);
+ throw new ApplicationNotFoundException("Trying to move an absent"
+ + " application " + applicationId);
+ }
+
+ if (!checkAccess(callerUGI, application.getUser(),
+ ApplicationAccessType.MODIFY_APP, application)) {
+ RMAuditLogger.logFailure(callerUGI.getShortUserName(),
+ AuditConstants.MOVE_APP_REQUEST,
+ "User doesn't have permissions to "
+ + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
+ AuditConstants.UNAUTHORIZED_USER, applicationId);
+ throw RPCUtil.getRemoteException(new AccessControlException("User "
+ + callerUGI.getShortUserName() + " cannot perform operation "
+ + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
+ }
+
+ // Moves only allowed when app is in a state that means it is tracked by
+ // the scheduler
+ if (EnumSet.of(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppState.FAILED,
+ RMAppState.FINAL_SAVING, RMAppState.FINISHING, RMAppState.FINISHED,
+ RMAppState.KILLED, RMAppState.KILLING, RMAppState.FAILED)
+ .contains(application.getState())) {
+ String msg = "App in " + application.getState() + " state cannot be moved.";
+ RMAuditLogger.logFailure(callerUGI.getShortUserName(),
+ AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", msg);
+ throw new YarnException(msg);
+ }
+
+ SettableFuture<Object> future = SettableFuture.create();
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppMoveEvent(applicationId, request.getTargetQueue(), future));
+
+ try {
+ Futures.get(future, YarnException.class);
+ } catch (YarnException ex) {
+ RMAuditLogger.logFailure(callerUGI.getShortUserName(),
+ AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService",
+ ex.getMessage());
+ throw ex;
+ }
+
+ RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
+ AuditConstants.MOVE_APP_REQUEST, "ClientRMService" , applicationId);
+ MoveApplicationAcrossQueuesResponse response = recordFactory
+ .newRecordInstance(MoveApplicationAcrossQueuesResponse.class);
+ return response;
}
private String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
@@ -704,7 +809,12 @@ public class ClientRMService extends Abs
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
- this.server.refreshServiceAcl(configuration, policyProvider);
+ if (this.useLocalConfigurationProvider) {
+ this.server.refreshServiceAcl(configuration, policyProvider);
+ } else {
+ this.server.refreshServiceAclWithConfigration(configuration,
+ policyProvider);
+ }
}
private boolean isAllowedDelegationTokenOp() throws IOException {
@@ -718,4 +828,9 @@ public class ClientRMService extends Abs
return true;
}
}
+
+ @VisibleForTesting
+ public Server getServer() {
+ return this.server;
+ }
}
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Fri Feb 7 01:57:21 2014
@@ -320,7 +320,8 @@ public class RMAppManager implements Eve
submissionContext.getApplicationName(), user,
submissionContext.getQueue(),
submissionContext, this.scheduler, this.masterService,
- submitTime, submissionContext.getApplicationType());
+ submitTime, submissionContext.getApplicationType(),
+ submissionContext.getApplicationTags());
// Concurrent app submissions with same applicationId will fail here
// Concurrent app submissions with different applicationIds will not
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java Fri Feb 7 01:57:21 2014
@@ -45,6 +45,7 @@ public class RMAuditLogger {
public static final String KILL_APP_REQUEST = "Kill Application Request";
public static final String SUBMIT_APP_REQUEST = "Submit Application Request";
+ public static final String MOVE_APP_REQUEST = "Move Application Request";
public static final String FINISH_SUCCESS_APP = "Application Finished - Succeeded";
public static final String FINISH_FAILED_APP = "Application Finished - Failed";
public static final String FINISH_KILLED_APP = "Application Finished - Killed";
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Fri Feb 7 01:57:21 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -66,6 +67,8 @@ import org.apache.hadoop.yarn.server.uti
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
+import com.google.common.annotations.VisibleForTesting;
+
public class ResourceTrackerService extends AbstractService implements
ResourceTracker {
@@ -92,6 +95,7 @@ public class ResourceTrackerService exte
private int minAllocMb;
private int minAllocVcores;
+ private boolean useLocalConfigurationProvider;
static {
resync.setNodeAction(NodeAction.RESYNC);
@@ -141,6 +145,10 @@ public class ResourceTrackerService exte
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
+ this.useLocalConfigurationProvider =
+ (LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
+ YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+ LocalConfigurationProvider.class)));
super.serviceInit(conf);
}
@@ -415,6 +423,16 @@ public class ResourceTrackerService exte
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
- this.server.refreshServiceAcl(configuration, policyProvider);
+ if (this.useLocalConfigurationProvider) {
+ this.server.refreshServiceAcl(configuration, policyProvider);
+ } else {
+ this.server.refreshServiceAclWithConfigration(configuration,
+ policyProvider);
+ }
+ }
+
+ @VisibleForTesting
+ public Server getServer() {
+ return this.server;
}
}
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Fri Feb 7 01:57:21 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.re
import java.util.Collection;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -194,7 +195,13 @@ public interface RMApp extends EventHand
* Returns the application type
* @return the application type.
*/
- String getApplicationType();
+ String getApplicationType();
+
+ /**
+ * Get tags for the application
+ * @return tags corresponding to the application
+ */
+ Set<String> getApplicationTags();
/**
* Check whether this application is safe to terminate.
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Fri Feb 7 01:57:21 2014
@@ -23,6 +23,7 @@ public enum RMAppEventType {
START,
RECOVER,
KILL,
+ MOVE, // Move app to a new queue
// Source: Scheduler and RMAppManager
APP_REJECTED,
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Fri Feb 7 01:57:21 2014
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
@@ -103,6 +104,7 @@ public class RMAppImpl implements RMApp,
private final long submitTime;
private final Set<RMNode> updatedNodes = new HashSet<RMNode>();
private final String applicationType;
+ private final Set<String> applicationTags;
// Mutable fields
private long startTime;
@@ -166,6 +168,8 @@ public class RMAppImpl implements RMApp,
// Transitions from SUBMITTED state
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+ .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
+ RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(
@@ -181,6 +185,8 @@ public class RMAppImpl implements RMApp,
// Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+ .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+ RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED)
.addTransition(RMAppState.ACCEPTED,
@@ -190,10 +196,8 @@ public class RMAppImpl implements RMApp,
// waiting for the previous AM to exit.
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.ACCEPTED))
- .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
- RMAppEventType.KILL,
- new FinalSavingTransition(
- new AppKilledTransition(), RMAppState.KILLED))
+ .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
+ RMAppEventType.KILL, new KillAttemptTransition())
// ACCECPTED state can once again receive APP_ACCEPTED event, because on
// recovery the app returns ACCEPTED state and the app once again go
// through the scheduler and triggers one more APP_ACCEPTED event at
@@ -204,6 +208,8 @@ public class RMAppImpl implements RMApp,
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+ .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
+ RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_UNREGISTERED,
new FinalSavingTransition(
@@ -295,9 +301,9 @@ public class RMAppImpl implements RMApp,
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
Configuration config, String name, String user, String queue,
- ApplicationSubmissionContext submissionContext,
- YarnScheduler scheduler,
- ApplicationMasterService masterService, long submitTime, String applicationType) {
+ ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
+ ApplicationMasterService masterService, long submitTime,
+ String applicationType, Set<String> applicationTags) {
this.applicationId = applicationId;
this.name = name;
@@ -313,6 +319,7 @@ public class RMAppImpl implements RMApp,
this.submitTime = submitTime;
this.startTime = System.currentTimeMillis();
this.applicationType = applicationType;
+ this.applicationTags = applicationTags;
int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@@ -546,7 +553,7 @@ public class RMAppImpl implements RMApp,
createApplicationState(), diags,
trackingUrl, this.startTime, this.finishTime, finishState,
appUsageReport, origTrackingUrl, progress, this.applicationType,
- amrmToken);
+ amrmToken, applicationTags);
} finally {
this.readLock.unlock();
}
@@ -692,6 +699,31 @@ public class RMAppImpl implements RMApp,
};
}
+ /**
+ * Move an app to a new queue.
+ * This transition must set the result on the Future in the RMAppMoveEvent,
+ * either as an exception for failure or null for success, or the client will
+ * be left waiting forever.
+ */
+ @SuppressWarnings("unchecked")
+ private static final class RMAppMoveTransition extends RMAppTransition {
+ public void transition(RMAppImpl app, RMAppEvent event) {
+ RMAppMoveEvent moveEvent = (RMAppMoveEvent) event;
+ try {
+ app.queue = app.scheduler.moveApplication(app.applicationId,
+ moveEvent.getTargetQueue());
+ } catch (YarnException ex) {
+ moveEvent.getResult().setException(ex);
+ return;
+ }
+
+ // TODO: Write out change to state store (YARN-1558)
+
+ moveEvent.getResult().set(null);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
private static final class RMAppRecoveredTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
@@ -1054,6 +1086,11 @@ public class RMAppImpl implements RMApp,
}
@Override
+ public Set<String> getApplicationTags() {
+ return this.applicationTags;
+ }
+
+ @Override
public boolean isAppSafeToTerminate() {
RMAppState state = getState();
return state.equals(RMAppState.FINISHING)
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Fri Feb 7 01:57:21 2014
@@ -27,11 +27,12 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-public class AbstractYarnScheduler {
+public abstract class AbstractYarnScheduler implements ResourceScheduler {
protected RMContext rmContext;
protected Map<ApplicationId, SchedulerApplication> applications;
@@ -61,4 +62,11 @@ public class AbstractYarnScheduler {
public Map<ApplicationId, SchedulerApplication> getSchedulerApplications() {
return applications;
}
+
+ @Override
+ public String moveApplication(ApplicationId appId, String newQueue)
+ throws YarnException {
+ throw new YarnException(getClass().getSimpleName()
+ + " does not support moving apps between queues");
+ }
}
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Fri Feb 7 01:57:21 2014
@@ -64,7 +64,7 @@ public class AppSchedulingInfo {
private Set<String> blacklist = new HashSet<String>();
//private final ApplicationStore store;
- private final ActiveUsersManager activeUsersManager;
+ private ActiveUsersManager activeUsersManager;
/* Allocated by scheduler */
boolean pending = true; // for app metrics
@@ -171,11 +171,10 @@ public class AppSchedulingInfo {
.getNumContainers() : 0;
Resource lastRequestCapability = lastRequest != null ? lastRequest
.getCapability() : Resources.none();
- metrics.incrPendingResources(user, request.getNumContainers()
- - lastRequestContainers, Resources.subtractFrom( // save a clone
- Resources.multiply(request.getCapability(), request
- .getNumContainers()), Resources.multiply(lastRequestCapability,
- lastRequestContainers)));
+ metrics.incrPendingResources(user, request.getNumContainers(),
+ request.getCapability());
+ metrics.decrPendingResources(user, lastRequestContainers,
+ lastRequestCapability);
}
}
}
@@ -262,9 +261,15 @@ public class AppSchedulingInfo {
pending = false;
metrics.runAppAttempt(applicationId, user);
}
- LOG.debug("allocate: user: " + user + ", memory: "
- + request.getCapability());
- metrics.allocateResources(user, 1, request.getCapability());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("allocate: applicationId=" + applicationId
+ + " container=" + container.getId()
+ + " host=" + container.getNodeId().toString()
+ + " user=" + user
+ + " resource=" + request.getCapability());
+ }
+ metrics.allocateResources(user, 1, request.getCapability(), true);
}
/**
@@ -277,9 +282,6 @@ public class AppSchedulingInfo {
synchronized private void allocateNodeLocal(
SchedulerNode node, Priority priority,
ResourceRequest nodeLocalRequest, Container container) {
- // Update consumption and track allocations
- allocate(container);
-
// Update future requirements
nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
if (nodeLocalRequest.getNumContainers() == 0) {
@@ -306,10 +308,6 @@ public class AppSchedulingInfo {
synchronized private void allocateRackLocal(
SchedulerNode node, Priority priority,
ResourceRequest rackLocalRequest, Container container) {
-
- // Update consumption and track allocations
- allocate(container);
-
// Update future requirements
rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
if (rackLocalRequest.getNumContainers() == 0) {
@@ -329,10 +327,6 @@ public class AppSchedulingInfo {
synchronized private void allocateOffSwitch(
SchedulerNode node, Priority priority,
ResourceRequest offSwitchRequest, Container container) {
-
- // Update consumption and track allocations
- allocate(container);
-
// Update future requirements
decrementOutstanding(offSwitchRequest);
}
@@ -365,18 +359,24 @@ public class AppSchedulingInfo {
}
}
- synchronized private void allocate(Container container) {
- // Update consumption and track allocations
- //TODO: fixme sharad
- /* try {
- store.storeContainer(container);
- } catch (IOException ie) {
- // TODO fix this. we shouldnt ignore
- }*/
-
- LOG.debug("allocate: applicationId=" + applicationId + " container="
- + container.getId() + " host="
- + container.getNodeId().toString());
+ synchronized public void move(Queue newQueue) {
+ QueueMetrics oldMetrics = queue.getMetrics();
+ QueueMetrics newMetrics = newQueue.getMetrics();
+ for (Map<String, ResourceRequest> asks : requests.values()) {
+ ResourceRequest request = asks.get(ResourceRequest.ANY);
+ if (request != null) {
+ oldMetrics.decrPendingResources(user, request.getNumContainers(),
+ request.getCapability());
+ newMetrics.incrPendingResources(user, request.getNumContainers(),
+ request.getCapability());
+ }
+ }
+ oldMetrics.moveAppFrom(this);
+ newMetrics.moveAppTo(this);
+ activeUsersManager.deactivateApplication(user, applicationId);
+ activeUsersManager = newQueue.getActiveUsersManager();
+ activeUsersManager.activateApplication(user, applicationId);
+ this.queue = newQueue;
}
synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
@@ -386,8 +386,7 @@ public class AppSchedulingInfo {
ResourceRequest request = asks.get(ResourceRequest.ANY);
if (request != null) {
metrics.decrPendingResources(user, request.getNumContainers(),
- Resources.multiply(request.getCapability(), request
- .getNumContainers()));
+ request.getCapability());
}
}
metrics.finishAppAttempt(applicationId, pending, user);
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Fri Feb 7 01:57:21 2014
@@ -58,4 +58,6 @@ public interface Queue {
List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user);
boolean hasAccess(QueueACL acl, UserGroupInformation user);
+
+ public ActiveUsersManager getActiveUsersManager();
}
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Fri Feb 7 01:57:21 2014
@@ -280,6 +280,36 @@ public class QueueMetrics implements Met
parent.finishApp(user, rmAppFinalState);
}
}
+
+ public void moveAppFrom(AppSchedulingInfo app) {
+ if (app.isPending()) {
+ appsPending.decr();
+ } else {
+ appsRunning.decr();
+ }
+ QueueMetrics userMetrics = getUserMetrics(app.getUser());
+ if (userMetrics != null) {
+ userMetrics.moveAppFrom(app);
+ }
+ if (parent != null) {
+ parent.moveAppFrom(app);
+ }
+ }
+
+ public void moveAppTo(AppSchedulingInfo app) {
+ if (app.isPending()) {
+ appsPending.incr();
+ } else {
+ appsRunning.incr();
+ }
+ QueueMetrics userMetrics = getUserMetrics(app.getUser());
+ if (userMetrics != null) {
+ userMetrics.moveAppTo(app);
+ }
+ if (parent != null) {
+ parent.moveAppTo(app);
+ }
+ }
/**
* Set available resources. To be called by scheduler periodically as
@@ -324,8 +354,8 @@ public class QueueMetrics implements Met
private void _incrPendingResources(int containers, Resource res) {
pendingContainers.incr(containers);
- pendingMB.incr(res.getMemory());
- pendingVCores.incr(res.getVirtualCores());
+ pendingMB.incr(res.getMemory() * containers);
+ pendingVCores.incr(res.getVirtualCores() * containers);
}
public void decrPendingResources(String user, int containers, Resource res) {
@@ -341,22 +371,25 @@ public class QueueMetrics implements Met
private void _decrPendingResources(int containers, Resource res) {
pendingContainers.decr(containers);
- pendingMB.decr(res.getMemory());
- pendingVCores.decr(res.getVirtualCores());
+ pendingMB.decr(res.getMemory() * containers);
+ pendingVCores.decr(res.getVirtualCores() * containers);
}
- public void allocateResources(String user, int containers, Resource res) {
+ public void allocateResources(String user, int containers, Resource res,
+ boolean decrPending) {
allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemory() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers);
- _decrPendingResources(containers, Resources.multiply(res, containers));
+ if (decrPending) {
+ _decrPendingResources(containers, res);
+ }
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
- userMetrics.allocateResources(user, containers, res);
+ userMetrics.allocateResources(user, containers, res, decrPending);
}
if (parent != null) {
- parent.allocateResources(user, containers, res);
+ parent.allocateResources(user, containers, res, decrPending);
}
}
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Fri Feb 7 01:57:21 2014
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.server.res
@Unstable
public class SchedulerApplication {
- private final Queue queue;
+ private Queue queue;
private final String user;
private SchedulerApplicationAttempt currentAttempt;
@@ -37,6 +37,10 @@ public class SchedulerApplication {
public Queue getQueue() {
return queue;
}
+
+ public void setQueue(Queue queue) {
+ this.queue = queue;
+ }
public String getUser() {
return user;
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Fri Feb 7 01:57:21 2014
@@ -57,7 +57,7 @@ import com.google.common.collect.Multise
*/
@Private
@Unstable
-public abstract class SchedulerApplicationAttempt {
+public class SchedulerApplicationAttempt {
private static final Log LOG = LogFactory
.getLog(SchedulerApplicationAttempt.class);
@@ -91,7 +91,7 @@ public abstract class SchedulerApplicati
protected Map<Priority, Long> lastScheduledContainer =
new HashMap<Priority, Long>();
- protected final Queue queue;
+ protected Queue queue;
protected boolean isStopped = false;
protected final RMContext rmContext;
@@ -431,4 +431,25 @@ public abstract class SchedulerApplicati
this.appSchedulingInfo
.transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo);
}
+
+ public synchronized void move(Queue newQueue) {
+ QueueMetrics oldMetrics = queue.getMetrics();
+ QueueMetrics newMetrics = newQueue.getMetrics();
+ String user = getUser();
+ for (RMContainer liveContainer : liveContainers.values()) {
+ Resource resource = liveContainer.getContainer().getResource();
+ oldMetrics.releaseResources(user, 1, resource);
+ newMetrics.allocateResources(user, 1, resource, false);
+ }
+ for (Map<NodeId, RMContainer> map : reservedContainers.values()) {
+ for (RMContainer reservedContainer : map.values()) {
+ Resource resource = reservedContainer.getReservedResource();
+ oldMetrics.unreserveResource(user, resource);
+ newMetrics.reserveResource(user, resource);
+ }
+ }
+
+ appSchedulingInfo.move(newQueue);
+ this.queue = newQueue;
+ }
}
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Fri Feb 7 01:57:21 2014
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
/**
@@ -180,4 +182,16 @@ public interface YarnScheduler extends E
@LimitedPrivate("yarn")
@Unstable
public RMContainer getRMContainer(ContainerId containerId);
+
+ /**
+ * Moves the given application to the given queue
+ * @param appId
+ * @param newQueue
+ * @return the name of the queue the application was placed into
+ * @throws YarnException if the move cannot be carried out
+ */
+ @LimitedPrivate("yarn")
+ @Evolving
+ public String moveApplication(ApplicationId appId, String newQueue)
+ throws YarnException;
}
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Fri Feb 7 01:57:21 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configurab
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -195,6 +196,7 @@ public class CapacityScheduler extends A
private ResourceCalculator calculator;
private boolean usePortForNodeName;
+ private boolean useLocalConfigurationProvider;
public CapacityScheduler() {}
@@ -261,7 +263,13 @@ public class CapacityScheduler extends A
public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException {
if (!initialized) {
- this.conf = new CapacitySchedulerConfiguration(conf);
+ this.useLocalConfigurationProvider =
+ (LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
+ YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+ LocalConfigurationProvider.class)));
+ this.conf =
+ new CapacitySchedulerConfiguration(conf,
+ this.useLocalConfigurationProvider);
validateConf(this.conf);
this.minimumAllocation = this.conf.getMinimumAllocation();
this.maximumAllocation = this.conf.getMaximumAllocation();
@@ -279,9 +287,10 @@ public class CapacityScheduler extends A
"minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
"maximumAllocation=<" + getMaximumResourceCapability() + ">");
} else {
-
CapacitySchedulerConfiguration oldConf = this.conf;
- this.conf = new CapacitySchedulerConfiguration(conf);
+ this.conf =
+ new CapacitySchedulerConfiguration(conf,
+ this.useLocalConfigurationProvider);
validateConf(this.conf);
try {
LOG.info("Re-initializing queues...");
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Fri Feb 7 01:57:21 2014
@@ -140,10 +140,17 @@ public class CapacitySchedulerConfigurat
}
public CapacitySchedulerConfiguration(Configuration configuration) {
+ this(configuration, true);
+ }
+
+ public CapacitySchedulerConfiguration(Configuration configuration,
+ boolean useLocalConfigurationProvider) {
super(configuration);
- addResource(CS_CONFIGURATION_FILE);
+ if (useLocalConfigurationProvider) {
+ addResource(CS_CONFIGURATION_FILE);
+ }
}
-
+
private String getQueuePrefix(String queue) {
String queueName = PREFIX + queue + DOT;
return queueName;
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java Fri Feb 7 01:57:21 2014
@@ -39,7 +39,8 @@ public class AllocationConfiguration {
// Minimum resource allocation for each queue
private final Map<String, Resource> minQueueResources;
// Maximum amount of resources per queue
- private final Map<String, Resource> maxQueueResources;
+ @VisibleForTesting
+ final Map<String, Resource> maxQueueResources;
// Sharing weights for each queue
private final Map<String, ResourceWeights> queueWeights;
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Fri Feb 7 01:57:21 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@@ -54,11 +55,14 @@ public class FSLeafQueue extends FSQueue
private long lastTimeAtMinShare;
private long lastTimeAtHalfFairShare;
+ private final ActiveUsersManager activeUsersManager;
+
public FSLeafQueue(String name, FairScheduler scheduler,
FSParentQueue parent) {
super(name, scheduler, parent);
this.lastTimeAtMinShare = scheduler.getClock().getTime();
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
+ activeUsersManager = new ActiveUsersManager(getMetrics());
}
public void addApp(FSSchedulerApp app, boolean runnable) {
@@ -245,4 +249,9 @@ public class FSLeafQueue extends FSQueue
public int getNumRunnableApps() {
return runnableAppScheds.size();
}
+
+ @Override
+ public ActiveUsersManager getActiveUsersManager() {
+ return activeUsersManager;
+ }
}
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Fri Feb 7 01:57:21 2014
@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@Private
@Unstable
@@ -194,4 +194,10 @@ public class FSParentQueue extends FSQue
childQueue.collectSchedulerApplications(apps);
}
}
+
+ @Override
+ public ActiveUsersManager getActiveUsersManager() {
+ // Should never be called since all applications are submitted to LeafQueues
+ return null;
+ }
}
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Feb 7 01:57:21 2014
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
@@ -75,7 +76,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
@@ -121,8 +121,7 @@ import com.google.common.annotations.Vis
@LimitedPrivate("yarn")
@Unstable
@SuppressWarnings("unchecked")
-public class FairScheduler extends AbstractYarnScheduler implements
- ResourceScheduler {
+public class FairScheduler extends AbstractYarnScheduler {
private boolean initialized;
private FairSchedulerConfiguration conf;
private Resource minimumAllocation;
@@ -767,7 +766,9 @@ public class FairScheduler extends Abstr
boolean wasRunnable = queue.removeApp(attempt);
if (wasRunnable) {
- maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
+ maxRunningEnforcer.untrackRunnableApp(attempt);
+ maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt,
+ attempt.getQueue());
} else {
maxRunningEnforcer.untrackNonRunnableApp(attempt);
}
@@ -1356,4 +1357,119 @@ public class FairScheduler extends Abstr
queue.collectSchedulerApplications(apps);
return apps;
}
+
+ @Override
+ public synchronized String moveApplication(ApplicationId appId,
+ String queueName) throws YarnException {
+ SchedulerApplication app = applications.get(appId);
+ if (app == null) {
+ throw new YarnException("App to be moved " + appId + " not found.");
+ }
+ FSSchedulerApp attempt = (FSSchedulerApp) app.getCurrentAppAttempt();
+
+ FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
+ FSLeafQueue targetQueue = queueMgr.getLeafQueue(queueName, false);
+ if (targetQueue == null) {
+ throw new YarnException("Target queue " + queueName
+ + " not found or is not a leaf queue.");
+ }
+ if (targetQueue == oldQueue) {
+ return oldQueue.getQueueName();
+ }
+
+ if (oldQueue.getRunnableAppSchedulables().contains(
+ attempt.getAppSchedulable())) {
+ verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
+ }
+
+ executeMove(app, attempt, oldQueue, targetQueue);
+ return targetQueue.getQueueName();
+ }
+
+ private void verifyMoveDoesNotViolateConstraints(FSSchedulerApp app,
+ FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException {
+ String queueName = targetQueue.getQueueName();
+ ApplicationAttemptId appAttId = app.getApplicationAttemptId();
+ // When checking maxResources and maxRunningApps, only need to consider
+ // queues before the lowest common ancestor of the two queues because the
+ // total running apps in queues above will not be changed.
+ FSQueue lowestCommonAncestor = findLowestCommonAncestorQueue(oldQueue,
+ targetQueue);
+ Resource consumption = app.getCurrentConsumption();
+
+ // Check whether the move would go over maxRunningApps or maxShare
+ FSQueue cur = targetQueue;
+ while (cur != lowestCommonAncestor) {
+ // maxRunningApps
+ if (cur.getNumRunnableApps() == allocConf.getQueueMaxApps(cur.getQueueName())) {
+ throw new YarnException("Moving app attempt " + appAttId + " to queue "
+ + queueName + " would violate queue maxRunningApps constraints on"
+ + " queue " + cur.getQueueName());
+ }
+
+ // maxShare
+ if (!Resources.fitsIn(Resources.add(cur.getResourceUsage(), consumption),
+ cur.getMaxShare())) {
+ throw new YarnException("Moving app attempt " + appAttId + " to queue "
+ + queueName + " would violate queue maxShare constraints on"
+ + " queue " + cur.getQueueName());
+ }
+
+ cur = cur.getParent();
+ }
+ }
+
+ /**
+ * Helper for moveApplication, which is synchronized, so all operations will
+ * be atomic.
+ */
+ private void executeMove(SchedulerApplication app, FSSchedulerApp attempt,
+ FSLeafQueue oldQueue, FSLeafQueue newQueue) {
+ boolean wasRunnable = oldQueue.removeApp(attempt);
+ // if app was not runnable before, it may be runnable now
+ boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,
+ attempt.getUser());
+ if (wasRunnable && !nowRunnable) {
+ throw new IllegalStateException("Should have already verified that app "
+ + attempt.getApplicationId() + " would be runnable in new queue");
+ }
+
+ if (wasRunnable) {
+ maxRunningEnforcer.untrackRunnableApp(attempt);
+ } else if (nowRunnable) {
+ // App has changed from non-runnable to runnable
+ maxRunningEnforcer.untrackNonRunnableApp(attempt);
+ }
+
+ attempt.move(newQueue); // This updates all the metrics
+ app.setQueue(newQueue);
+ newQueue.addApp(attempt, nowRunnable);
+
+ if (nowRunnable) {
+ maxRunningEnforcer.trackRunnableApp(attempt);
+ }
+ if (wasRunnable) {
+ maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt, oldQueue);
+ }
+ }
+
+ private FSQueue findLowestCommonAncestorQueue(FSQueue queue1, FSQueue queue2) {
+ // Because queue names include ancestors, separated by periods, we can find
+ // the lowest common ancestors by going from the start of the names until
+ // there's a character that doesn't match.
+ String name1 = queue1.getName();
+ String name2 = queue2.getName();
+ // We keep track of the last period we encounter to avoid returning root.apple
+ // when the queues are root.applepie and root.appletart
+ int lastPeriodIndex = -1;
+ for (int i = 0; i < Math.max(name1.length(), name2.length()); i++) {
+ if (name1.length() <= i || name2.length() <= i ||
+ name1.charAt(i) != name2.charAt(i)) {
+ return queueMgr.getQueue(name1.substring(lastPeriodIndex));
+ } else if (name1.charAt(i) == '.') {
+ lastPeriodIndex = i;
+ }
+ }
+ return queue1; // names are identical
+ }
}
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java Fri Feb 7 01:57:21 2014
@@ -105,26 +105,15 @@ public class MaxRunningAppsEnforcer {
}
/**
- * Updates the relevant tracking variables after a runnable app with the given
- * queue and user has been removed. Checks to see whether any other applications
- * are now runnable and makes them so.
+ * Checks to see whether any other applications runnable now that the given
+ * application has been removed from the given queue. And makes them so.
*
* Runs in O(n log(n)) where n is the number of queues that are under the
* highest queue that went from having no slack to having slack.
*/
- public void updateRunnabilityOnAppRemoval(FSSchedulerApp app) {
+ public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
- // Update usersRunnableApps
- String user = app.getUser();
- int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
- if (newUserNumRunning == 0) {
- usersNumRunnableApps.remove(user);
- } else {
- usersNumRunnableApps.put(user, newUserNumRunning);
- }
-
- // Update runnable app bookkeeping for queues:
// childqueueX might have no pending apps itself, but if a queue higher up
// in the hierarchy parentqueueY has a maxRunningApps set, an app completion
// in childqueueX could allow an app in some other distant child of
@@ -133,16 +122,14 @@ public class MaxRunningAppsEnforcer {
// the queue was already at its max before the removal.
// Thus we find the ancestor queue highest in the tree for which the app
// that was at its maxRunningApps before the removal.
- FSLeafQueue queue = app.getQueue();
FSQueue highestQueueWithAppsNowRunnable = (queue.getNumRunnableApps() ==
allocConf.getQueueMaxApps(queue.getName()) - 1) ? queue : null;
FSParentQueue parent = queue.getParent();
while (parent != null) {
if (parent.getNumRunnableApps() == allocConf.getQueueMaxApps(parent
- .getName())) {
+ .getName()) - 1) {
highestQueueWithAppsNowRunnable = parent;
}
- parent.decrementRunnableApps();
parent = parent.getParent();
}
@@ -157,7 +144,12 @@ public class MaxRunningAppsEnforcer {
gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable,
appsNowMaybeRunnable);
}
- if (newUserNumRunning == allocConf.getUserMaxApps(user) - 1) {
+ String user = app.getUser();
+ Integer userNumRunning = usersNumRunnableApps.get(user);
+ if (userNumRunning == null) {
+ userNumRunning = 0;
+ }
+ if (userNumRunning == allocConf.getUserMaxApps(user) - 1) {
List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user);
if (userWaitingApps != null) {
appsNowMaybeRunnable.add(userWaitingApps);
@@ -209,6 +201,29 @@ public class MaxRunningAppsEnforcer {
}
/**
+ * Updates the relevant tracking variables after a runnable app with the given
+ * queue and user has been removed.
+ */
+ public void untrackRunnableApp(FSSchedulerApp app) {
+ // Update usersRunnableApps
+ String user = app.getUser();
+ int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
+ if (newUserNumRunning == 0) {
+ usersNumRunnableApps.remove(user);
+ } else {
+ usersNumRunnableApps.put(user, newUserNumRunning);
+ }
+
+ // Update runnable app bookkeeping for queues
+ FSLeafQueue queue = app.getQueue();
+ FSParentQueue parent = queue.getParent();
+ while (parent != null) {
+ parent.decrementRunnableApps();
+ parent = parent.getParent();
+ }
+ }
+
+ /**
* Stops tracking the given non-runnable app
*/
public void untrackNonRunnableApp(FSSchedulerApp app) {
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1565515&r1=1565514&r2=1565515&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri Feb 7 01:57:21 2014
@@ -77,7 +77,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@@ -106,7 +105,7 @@ import com.google.common.annotations.Vis
@Evolving
@SuppressWarnings("unchecked")
public class FifoScheduler extends AbstractYarnScheduler implements
- ResourceScheduler, Configurable {
+ Configurable {
private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
@@ -184,6 +183,11 @@ public class FifoScheduler extends Abstr
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
return getQueueAcls().get(acl).isUserAllowed(user);
}
+
+ @Override
+ public ActiveUsersManager getActiveUsersManager() {
+ return activeUsersManager;
+ }
};
@Override