You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC
svn commit: r1619012 [10/26] - in
/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./
hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Tue Aug 19 23:49:39 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
@@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
@@ -79,6 +81,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
@@ -88,6 +91,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -107,12 +111,15 @@ public class ApplicationMasterService ex
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
private final AllocateResponse resync =
recordFactory.newRecordInstance(AllocateResponse.class);
+ private final AllocateResponse shutdown =
+ recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext;
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
super(ApplicationMasterService.class.getName());
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler;
+ this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN);
this.resync.setAMCommand(AMCommand.AM_RESYNC);
this.rmContext = rmContext;
}
@@ -123,6 +130,7 @@ public class ApplicationMasterService ex
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress masterServiceAddress = conf.getSocketAddr(
+ YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
@@ -155,7 +163,9 @@ public class ApplicationMasterService ex
this.server.start();
this.bindAddress =
- conf.updateConnectAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
+ YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
server.getListenerAddress());
super.serviceStart();
}
@@ -182,7 +192,7 @@ public class ApplicationMasterService ex
return result;
}
- private ApplicationAttemptId authorizeRequest()
+ private AMRMTokenIdentifier authorizeRequest()
throws YarnException {
UserGroupInformation remoteUgi;
@@ -219,7 +229,7 @@ public class ApplicationMasterService ex
throw RPCUtil.getRemoteException(message);
}
- return appTokenIdentifier.getApplicationAttemptId();
+ return appTokenIdentifier;
}
@Override
@@ -227,7 +237,9 @@ public class ApplicationMasterService ex
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
- ApplicationAttemptId applicationAttemptId = authorizeRequest();
+ AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+ ApplicationAttemptId applicationAttemptId =
+ amrmTokenIdentifier.getApplicationAttemptId();
ApplicationId appID = applicationAttemptId.getApplicationId();
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
@@ -298,9 +310,12 @@ public class ApplicationMasterService ex
List<NMToken> nmTokens = new ArrayList<NMToken>();
for (Container container : transferredContainers) {
try {
- nmTokens.add(rmContext.getNMTokenSecretManager()
- .createAndGetNMToken(app.getUser(), applicationAttemptId,
- container));
+ NMToken token = rmContext.getNMTokenSecretManager()
+ .createAndGetNMToken(app.getUser(), applicationAttemptId,
+ container);
+ if (null != token) {
+ nmTokens.add(token);
+ }
} catch (IllegalArgumentException e) {
// if it's a DNS issue, throw UnknowHostException directly and that
// will be automatically retried by RMProxy in RPC layer.
@@ -323,7 +338,8 @@ public class ApplicationMasterService ex
FinishApplicationMasterRequest request) throws YarnException,
IOException {
- ApplicationAttemptId applicationAttemptId = authorizeRequest();
+ ApplicationAttemptId applicationAttemptId =
+ authorizeRequest().getApplicationAttemptId();
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
if (lock == null) {
@@ -343,9 +359,9 @@ public class ApplicationMasterService ex
AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService",
message, applicationAttemptId.getApplicationId(),
applicationAttemptId);
- throw new InvalidApplicationMasterRequestException(message);
+ throw new ApplicationMasterNotRegisteredException(message);
}
-
+
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
RMApp rmApp =
@@ -398,7 +414,10 @@ public class ApplicationMasterService ex
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
- ApplicationAttemptId appAttemptId = authorizeRequest();
+ AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+
+ ApplicationAttemptId appAttemptId =
+ amrmTokenIdentifier.getApplicationAttemptId();
this.amLivelinessMonitor.receivedPing(appAttemptId);
@@ -406,22 +425,23 @@ public class ApplicationMasterService ex
AllocateResponseLock lock = responseMap.get(appAttemptId);
if (lock == null) {
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
- return resync;
+ return shutdown;
}
synchronized (lock) {
AllocateResponse lastResponse = lock.getAllocateResponse();
if (!hasApplicationMasterRegistered(appAttemptId)) {
String message =
- "Application Master is trying to allocate before registering for: "
- + appAttemptId.getApplicationId();
- LOG.error(message);
+ "Application Master is not registered for known application: "
+ + appAttemptId.getApplicationId()
+ + ". Let AM resync.";
+ LOG.info(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
.getUser(), AuditConstants.REGISTER_AM, "",
"ApplicationMasterService", message,
appAttemptId.getApplicationId(),
appAttemptId);
- throw new InvalidApplicationMasterRequestException(message);
+ return resync;
}
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
@@ -546,6 +566,23 @@ public class ApplicationMasterService ex
allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation));
+ // update AMRMToken if the token is rolled-up
+ MasterKeyData nextMasterKey =
+ this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();
+
+ if (nextMasterKey != null
+ && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
+ .getKeyId()) {
+ Token<AMRMTokenIdentifier> amrmToken =
+ rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+ appAttemptId);
+ ((RMAppAttemptImpl)appAttempt).setAMRMToken(amrmToken);
+ allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
+ .newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
+ .toString(), amrmToken.getPassword(), amrmToken.getService()
+ .toString()));
+ }
+
/*
* As we are updating the response inside the lock object so we don't
* need to worry about unregister call occurring in between (which
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Tue Aug 19 23:49:39 2014
@@ -199,7 +199,9 @@ public class ClientRMService extends Abs
}
this.server.start();
- clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
+ clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
+ YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS,
server.getListenerAddress());
super.serviceStart();
}
@@ -213,7 +215,9 @@ public class ClientRMService extends Abs
}
InetSocketAddress getBindAddress(Configuration conf) {
- return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+ return conf.getSocketAddr(
+ YarnConfiguration.RM_BIND_HOST,
+ YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
}
@@ -919,7 +923,7 @@ public class ClientRMService extends Abs
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
new Text(protoToken.getKind()), new Text(protoToken.getService()));
- String user = getRenewerForToken(token);
+ String user = UserGroupInformation.getCurrentUser().getUserName();
rmDTSecretManager.cancelToken(token, user);
return Records.newRecord(CancelDelegationTokenResponse.class);
} catch (IOException e) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java Tue Aug 19 23:49:39 2014
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.ActiveStandbyElector;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.ServiceFailedException;
@@ -60,7 +61,7 @@ public class EmbeddedElectorService exte
}
@Override
- protected synchronized void serviceInit(Configuration conf)
+ protected void serviceInit(Configuration conf)
throws Exception {
conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf);
@@ -85,8 +86,11 @@ public class EmbeddedElectorService exte
List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
+ int maxRetryNum = conf.getInt(
+ CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
+ CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
- electionZNode, zkAcls, zkAuths, this);
+ electionZNode, zkAcls, zkAuths, this, maxRetryNum);
elector.ensureParentZNode();
if (!isParentZnodeSafe(clusterId)) {
@@ -98,20 +102,20 @@ public class EmbeddedElectorService exte
}
@Override
- protected synchronized void serviceStart() throws Exception {
+ protected void serviceStart() throws Exception {
elector.joinElection(localActiveNodeInfo);
super.serviceStart();
}
@Override
- protected synchronized void serviceStop() throws Exception {
+ protected void serviceStop() throws Exception {
elector.quitElection(false);
elector.terminateConnection();
super.serviceStop();
}
@Override
- public synchronized void becomeActive() throws ServiceFailedException {
+ public void becomeActive() throws ServiceFailedException {
try {
rmContext.getRMAdminService().transitionToActive(req);
} catch (Exception e) {
@@ -120,7 +124,7 @@ public class EmbeddedElectorService exte
}
@Override
- public synchronized void becomeStandby() {
+ public void becomeStandby() {
try {
rmContext.getRMAdminService().transitionToStandby(req);
} catch (Exception e) {
@@ -139,13 +143,13 @@ public class EmbeddedElectorService exte
@SuppressWarnings(value = "unchecked")
@Override
- public synchronized void notifyFatalError(String errorMessage) {
+ public void notifyFatalError(String errorMessage) {
rmContext.getDispatcher().getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage));
}
@Override
- public synchronized void fenceOldActive(byte[] oldActiveData) {
+ public void fenceOldActive(byte[] oldActiveData) {
if (LOG.isDebugEnabled()) {
LOG.debug("Request to fence old active being ignored, " +
"as embedded leader election doesn't support fencing");
@@ -162,7 +166,7 @@ public class EmbeddedElectorService exte
.toByteArray();
}
- private synchronized boolean isParentZnodeSafe(String clusterId)
+ private boolean isParentZnodeSafe(String clusterId)
throws InterruptedException, IOException, KeeperException {
byte[] data;
try {
@@ -190,4 +194,9 @@ public class EmbeddedElectorService exte
}
return true;
}
+
+ public void resetLeaderElection() {
+ elector.quitElection(false);
+ elector.joinElection(localActiveNodeInfo);
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java Tue Aug 19 23:49:39 2014
@@ -99,4 +99,8 @@ public interface RMContext {
RMApplicationHistoryWriter rmApplicationHistoryWriter);
ConfigurationProvider getConfigurationProvider();
+
+ boolean isWorkPreservingRecoveryEnabled();
+
+ int getEpoch();
}
\ No newline at end of file
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Tue Aug 19 23:49:39 2014
@@ -60,6 +60,7 @@ public class RMContextImpl implements RM
= new ConcurrentHashMap<String, RMNode>();
private boolean isHAEnabled;
+ private boolean isWorkPreservingRecoveryEnabled;
private HAServiceState haServiceState =
HAServiceProtocol.HAServiceState.INITIALIZING;
@@ -81,6 +82,7 @@ public class RMContextImpl implements RM
private ApplicationMasterService applicationMasterService;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private ConfigurationProvider configurationProvider;
+ private int epoch;
/**
* Default constructor. To be used in conjunction with setter methods for
@@ -329,6 +331,15 @@ public class RMContextImpl implements RM
}
}
+ public void setWorkPreservingRecoveryEnabled(boolean enabled) {
+ this.isWorkPreservingRecoveryEnabled = enabled;
+ }
+
+ @Override
+ public boolean isWorkPreservingRecoveryEnabled() {
+ return this.isWorkPreservingRecoveryEnabled;
+ }
+
@Override
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return rmApplicationHistoryWriter;
@@ -349,4 +360,13 @@ public class RMContextImpl implements RM
ConfigurationProvider configurationProvider) {
this.configurationProvider = configurationProvider;
}
+
+ @Override
+ public int getEpoch() {
+ return this.epoch;
+ }
+
+ void setEpoch(int epoch) {
+ this.epoch = epoch;
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java Tue Aug 19 23:49:39 2014
@@ -60,7 +60,7 @@ public class RMSecretManagerService exte
clientToAMSecretManager = createClientToAMTokenSecretManager();
rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager);
- amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
+ amRmTokenSecretManager = createAMRMTokenSecretManager(conf, this.rmContext);
rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager);
rmDTSecretManager =
@@ -115,8 +115,8 @@ public class RMSecretManagerService exte
}
protected AMRMTokenSecretManager createAMRMTokenSecretManager(
- Configuration conf) {
- return new AMRMTokenSecretManager(conf);
+ Configuration conf, RMContext rmContext) {
+ return new AMRMTokenSecretManager(conf, rmContext);
}
protected ClientToAMTokenSecretManagerInRM createClientToAMTokenSecretManager() {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java Tue Aug 19 23:49:39 2014
@@ -28,6 +28,7 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -43,6 +44,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Utility methods to aid serving RM data through the REST and RPC APIs
@@ -225,4 +228,13 @@ public class RMServerUtils {
}
}
+ /**
+ * Statically defined dummy ApplicationResourceUsageREport. Used as
+ * a return value when a valid report cannot be found.
+ */
+ public static final ApplicationResourceUsageReport
+ DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
+ BuilderUtils.newApplicationResourceUsageReport(-1, -1,
+ Resources.createResource(-1, -1), Resources.createResource(-1, -1),
+ Resources.createResource(-1, -1));
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Tue Aug 19 23:49:39 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -32,11 +33,14 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.security.AuthenticationFilterInitializer;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
@@ -88,8 +92,11 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMAuthenticationHandler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
+import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
@@ -150,7 +157,8 @@ public class ResourceManager extends Com
private AppReportFetcher fetcher = null;
protected ResourceTrackerService resourceTracker;
- private String webAppAddress;
+ @VisibleForTesting
+ protected String webAppAddress;
private ConfigurationProvider configurationProvider = null;
/** End of Active services */
@@ -225,7 +233,9 @@ public class ResourceManager extends Com
}
createAndInitActiveServices();
- webAppAddress = WebAppUtils.getRMWebAppURLWithoutScheme(this.conf);
+ webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
+ YarnConfiguration.RM_BIND_HOST,
+ WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
this.rmLoginUGI = UserGroupInformation.getCurrentUser();
@@ -327,7 +337,7 @@ public class ResourceManager extends Com
* RMActiveServices handles all the Active services in the RM.
*/
@Private
- class RMActiveServices extends CompositeService {
+ public class RMActiveServices extends CompositeService {
private DelegationTokenRenewer delegationTokenRenewer;
private EventHandler<SchedulerEvent> schedulerDispatcher;
@@ -364,9 +374,15 @@ public class ResourceManager extends Com
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
RMStateStore rmStore = null;
- if(isRecoveryEnabled) {
+ if (isRecoveryEnabled) {
recoveryEnabled = true;
- rmStore = RMStateStoreFactory.getStore(conf);
+ rmStore = RMStateStoreFactory.getStore(conf);
+ boolean isWorkPreservingRecoveryEnabled =
+ conf.getBoolean(
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
+ YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
+ rmContext
+ .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
} else {
recoveryEnabled = false;
rmStore = new NullRMStateStore();
@@ -401,6 +417,8 @@ public class ResourceManager extends Com
// Initialize the scheduler
scheduler = createScheduler();
+ scheduler.setRMContext(rmContext);
+ addIfService(scheduler);
rmContext.setScheduler(scheduler);
schedulerDispatcher = createSchedulerEventDispatcher();
@@ -429,12 +447,6 @@ public class ResourceManager extends Com
DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics.initSingleton("ResourceManager", null);
- try {
- scheduler.reinitialize(conf, rmContext);
- } catch (IOException ioe) {
- throw new RuntimeException("Failed to initialize scheduler", ioe);
- }
-
// creating monitors that handle preemption
createPolicyMonitors();
@@ -451,7 +463,6 @@ public class ResourceManager extends Com
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
clientRM = createClientRMService();
- rmContext.setClientRMService(clientRM);
addService(clientRM);
rmContext.setClientRMService(clientRM);
@@ -480,6 +491,9 @@ public class ResourceManager extends Com
if(recoveryEnabled) {
try {
rmStore.checkVersion();
+ if (rmContext.isWorkPreservingRecoveryEnabled()) {
+ rmContext.setEpoch(rmStore.getAndIncrementEpoch());
+ }
RMState state = rmStore.loadState();
recover(state);
} catch (Exception e) {
@@ -524,11 +538,9 @@ public class ResourceManager extends Com
(PreemptableResourceScheduler) scheduler));
for (SchedulingEditPolicy policy : policies) {
LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
- policy.init(conf, rmContext.getDispatcher().getEventHandler(),
- (PreemptableResourceScheduler) scheduler);
// periodically check whether we need to take action to guarantee
// constraints
- SchedulingMonitor mon = new SchedulingMonitor(policy);
+ SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
addService(mon);
}
} else {
@@ -664,6 +676,7 @@ public class ResourceManager extends Com
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
rm.transitionToStandby(true);
+ rm.adminService.resetLeaderElection();
return;
} catch (Exception e) {
LOG.fatal("Failed to transition RM to Standby mode.");
@@ -785,6 +798,88 @@ public class ResourceManager extends Com
}
protected void startWepApp() {
+
+ // Use the customized yarn filter instead of the standard kerberos filter to
+ // allow users to authenticate using delegation tokens
+ // 4 conditions need to be satisfied -
+ // 1. security is enabled
+ // 2. http auth type is set to kerberos
+ // 3. "yarn.resourcemanager.webapp.use-yarn-filter" override is set to true
+ // 4. hadoop.http.filter.initializers container AuthenticationFilterInitializer
+
+ Configuration conf = getConfig();
+ boolean useYarnAuthenticationFilter =
+ conf.getBoolean(
+ YarnConfiguration.RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER,
+ YarnConfiguration.DEFAULT_RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER);
+ String authPrefix = "hadoop.http.authentication.";
+ String authTypeKey = authPrefix + "type";
+ String filterInitializerConfKey = "hadoop.http.filter.initializers";
+ String actualInitializers = "";
+ Class<?>[] initializersClasses =
+ conf.getClasses(filterInitializerConfKey);
+
+ boolean hasHadoopAuthFilterInitializer = false;
+ boolean hasRMAuthFilterInitializer = false;
+ if (initializersClasses != null) {
+ for (Class<?> initializer : initializersClasses) {
+ if (initializer.getName().equals(
+ AuthenticationFilterInitializer.class.getName())) {
+ hasHadoopAuthFilterInitializer = true;
+ }
+ if (initializer.getName().equals(
+ RMAuthenticationFilterInitializer.class.getName())) {
+ hasRMAuthFilterInitializer = true;
+ }
+ }
+ if (UserGroupInformation.isSecurityEnabled()
+ && useYarnAuthenticationFilter
+ && hasHadoopAuthFilterInitializer
+ && conf.get(authTypeKey, "").equals(
+ KerberosAuthenticationHandler.TYPE)) {
+ ArrayList<String> target = new ArrayList<String>();
+ for (Class<?> filterInitializer : initializersClasses) {
+ if (filterInitializer.getName().equals(
+ AuthenticationFilterInitializer.class.getName())) {
+ if (hasRMAuthFilterInitializer == false) {
+ target.add(RMAuthenticationFilterInitializer.class.getName());
+ }
+ continue;
+ }
+ target.add(filterInitializer.getName());
+ }
+ actualInitializers = StringUtils.join(",", target);
+
+ LOG.info("Using RM authentication filter(kerberos/delegation-token)"
+ + " for RM webapp authentication");
+ RMAuthenticationHandler
+ .setSecretManager(getClientRMService().rmDTSecretManager);
+ String yarnAuthKey =
+ authPrefix + RMAuthenticationFilter.AUTH_HANDLER_PROPERTY;
+ conf.setStrings(yarnAuthKey, RMAuthenticationHandler.class.getName());
+ conf.set(filterInitializerConfKey, actualInitializers);
+ }
+ }
+
+ // if security is not enabled and the default filter initializer has not
+ // been set, set the initializer to include the
+ // RMAuthenticationFilterInitializer which in turn will set up the simple
+ // auth filter.
+
+ String initializers = conf.get(filterInitializerConfKey);
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ if (initializersClasses == null || initializersClasses.length == 0) {
+ conf.set(filterInitializerConfKey,
+ RMAuthenticationFilterInitializer.class.getName());
+ conf.set(authTypeKey, "simple");
+ } else if (initializers.equals(StaticUserWebFilter.class.getName())) {
+ conf.set(filterInitializerConfKey,
+ RMAuthenticationFilterInitializer.class.getName() + ","
+ + initializers);
+ conf.set(authTypeKey, "simple");
+ }
+ }
+
Builder<ApplicationMasterService> builder =
WebApps
.$for("cluster", ApplicationMasterService.class, masterService,
@@ -1022,6 +1117,9 @@ public class ResourceManager extends Com
// recover RMdelegationTokenSecretManager
rmContext.getRMDelegationTokenSecretManager().recover(state);
+ // recover AMRMTokenSecretManager
+ rmContext.getAMRMTokenSecretManager().recover(state);
+
// recover applications
rmAppManager.recover(state);
}
@@ -1031,12 +1129,17 @@ public class ResourceManager extends Com
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
try {
Configuration conf = new YarnConfiguration();
- ResourceManager resourceManager = new ResourceManager();
- ShutdownHookManager.get().addShutdownHook(
- new CompositeServiceShutdownHook(resourceManager),
- SHUTDOWN_HOOK_PRIORITY);
- resourceManager.init(conf);
- resourceManager.start();
+ // If -format-state-store, then delete RMStateStore; else startup normally
+ if (argv.length == 1 && argv[0].equals("-format-state-store")) {
+ deleteRMStateStore(conf);
+ } else {
+ ResourceManager resourceManager = new ResourceManager();
+ ShutdownHookManager.get().addShutdownHook(
+ new CompositeServiceShutdownHook(resourceManager),
+ SHUTDOWN_HOOK_PRIORITY);
+ resourceManager.init(conf);
+ resourceManager.start();
+ }
} catch (Throwable t) {
LOG.fatal("Error starting ResourceManager", t);
System.exit(-1);
@@ -1058,6 +1161,9 @@ public class ResourceManager extends Com
((Service)dispatcher).init(this.conf);
((Service)dispatcher).start();
removeService((Service)rmDispatcher);
+ // Need to stop previous rmDispatcher before assigning new dispatcher
+ // otherwise causes "AsyncDispatcher event handler" thread leak
+ ((Service) rmDispatcher).stop();
rmDispatcher = dispatcher;
addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher);
@@ -1073,4 +1179,23 @@ public class ResourceManager extends Com
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
}
+
+ /**
+ * Deletes the RMStateStore
+ *
+ * @param conf
+ * @throws Exception
+ */
+ private static void deleteRMStateStore(Configuration conf) throws Exception {
+ RMStateStore rmStore = RMStateStoreFactory.getStore(conf);
+ rmStore.init(conf);
+ rmStore.start();
+ try {
+ LOG.info("Deleting ResourceManager state store...");
+ rmStore.deleteStore();
+ LOG.info("State store deleted");
+ } finally {
+ rmStore.stop();
+ }
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Tue Aug 19 23:49:39 2014
@@ -32,7 +32,6 @@ import org.apache.hadoop.service.Abstrac
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -45,6 +44,7 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -120,6 +121,7 @@ public class ResourceTrackerService exte
@Override
protected void serviceInit(Configuration conf) throws Exception {
resourceTrackerAddress = conf.getSocketAddr(
+ YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
@@ -174,9 +176,11 @@ public class ResourceTrackerService exte
}
refreshServiceAcls(conf, RMPolicyProvider.getInstance());
}
-
+
this.server.start();
- conf.updateConnectAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
+ YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
server.getListenerAddress());
}
@@ -195,7 +199,7 @@ public class ResourceTrackerService exte
*/
@SuppressWarnings("unchecked")
@VisibleForTesting
- void handleContainerStatus(ContainerStatus containerStatus) {
+ void handleNMContainerStatus(NMContainerStatus containerStatus) {
ApplicationAttemptId appAttemptId =
containerStatus.getContainerId().getApplicationAttemptId();
RMApp rmApp =
@@ -219,11 +223,14 @@ public class ResourceTrackerService exte
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
Container masterContainer = rmAppAttempt.getMasterContainer();
if (masterContainer.getId().equals(containerStatus.getContainerId())
- && containerStatus.getState() == ContainerState.COMPLETE) {
+ && containerStatus.getContainerState() == ContainerState.COMPLETE) {
+ ContainerStatus status =
+ ContainerStatus.newInstance(containerStatus.getContainerId(),
+ containerStatus.getContainerState(), containerStatus.getDiagnostics(),
+ containerStatus.getContainerExitStatus());
// sending master container finished event.
RMAppAttemptContainerFinishedEvent evt =
- new RMAppAttemptContainerFinishedEvent(appAttemptId,
- containerStatus);
+ new RMAppAttemptContainerFinishedEvent(appAttemptId, status);
rmContext.getDispatcher().getEventHandler().handle(evt);
}
}
@@ -240,13 +247,6 @@ public class ResourceTrackerService exte
Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion();
- if (!request.getContainerStatuses().isEmpty()) {
- LOG.info("received container statuses on node manager register :"
- + request.getContainerStatuses());
- for (ContainerStatus containerStatus : request.getContainerStatuses()) {
- handleContainerStatus(containerStatus);
- }
- }
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
@@ -305,17 +305,31 @@ public class ResourceTrackerService exte
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) {
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
+ new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(),
+ request.getRunningApplications()));
} else {
LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeReconnectEvent(nodeId, rmNode));
+ new RMNodeReconnectEvent(nodeId, rmNode,
+ request.getRunningApplications()));
}
// On every node manager register we will be clearing NMToken keys if
// present for any running application.
this.nmTokenSecretManager.removeNodeKey(nodeId);
this.nmLivelinessMonitor.register(nodeId);
+
+ // Handle received container status, this should be processed after new
+ // RMNode inserted
+ if (!rmContext.isWorkPreservingRecoveryEnabled()) {
+ if (!request.getNMContainerStatuses().isEmpty()) {
+ LOG.info("received container statuses on node manager register :"
+ + request.getNMContainerStatuses());
+ for (NMContainerStatus status : request.getNMContainerStatuses()) {
+ handleNMContainerStatus(status);
+ }
+ }
+ }
String message =
"NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: "
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java Tue Aug 19 23:49:39 2014
@@ -269,7 +269,7 @@ public class RMApplicationHistoryWriter
new WritingContainerStartEvent(container.getContainerId(),
ContainerStartData.newInstance(container.getContainerId(),
container.getAllocatedResource(), container.getAllocatedNode(),
- container.getAllocatedPriority(), container.getStartTime())));
+ container.getAllocatedPriority(), container.getCreationTime())));
}
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Tue Aug 19 23:49:39 2014
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -226,7 +227,7 @@ public class AMLauncher implements Runna
}
// Add AMRMToken
- Token<AMRMTokenIdentifier> amrmToken = getAMRMToken();
+ Token<AMRMTokenIdentifier> amrmToken = createAndSetAMRMToken();
if (amrmToken != null) {
credentials.addToken(amrmToken.getService(), amrmToken);
}
@@ -236,8 +237,12 @@ public class AMLauncher implements Runna
}
@VisibleForTesting
- protected Token<AMRMTokenIdentifier> getAMRMToken() {
- return application.getAMRMToken();
+ protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
+ Token<AMRMTokenIdentifier> amrmToken =
+ this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+ application.getAppAttemptId());
+ ((RMAppAttemptImpl)application).setAMRMToken(amrmToken);
+ return amrmToken;
}
@SuppressWarnings("unchecked")
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java Tue Aug 19 23:49:39 2014
@@ -21,6 +21,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import com.google.common.annotations.VisibleForTesting;
@@ -34,18 +36,29 @@ public class SchedulingMonitor extends A
private Thread checkerThread;
private volatile boolean stopped;
private long monitorInterval;
+ private RMContext rmContext;
- public SchedulingMonitor(SchedulingEditPolicy scheduleEditPolicy) {
+ public SchedulingMonitor(RMContext rmContext,
+ SchedulingEditPolicy scheduleEditPolicy) {
super("SchedulingMonitor (" + scheduleEditPolicy.getPolicyName() + ")");
this.scheduleEditPolicy = scheduleEditPolicy;
- this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
+ this.rmContext = rmContext;
}
public long getMonitorInterval() {
return monitorInterval;
}
+
+ @VisibleForTesting
+ public synchronized SchedulingEditPolicy getSchedulingEditPolicy() {
+ return scheduleEditPolicy;
+ }
+ @SuppressWarnings("unchecked")
public void serviceInit(Configuration conf) throws Exception {
+ scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(),
+ (PreemptableResourceScheduler) rmContext.getScheduler());
+ this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
super.serviceInit(conf);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -110,7 +111,7 @@ public class ProportionalCapacityPreempt
public static final String NATURAL_TERMINATION_FACTOR =
"yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
- //the dispatcher to send preempt and kill events
+ // the dispatcher to send preempt and kill events
public EventHandler<ContainerPreemptEvent> dispatcher;
private final Clock clock;
@@ -164,12 +165,17 @@ public class ProportionalCapacityPreempt
observeOnly = config.getBoolean(OBSERVE_ONLY, false);
rc = scheduler.getResourceCalculator();
}
+
+ @VisibleForTesting
+ public ResourceCalculator getResourceCalculator() {
+ return rc;
+ }
@Override
public void editSchedule(){
CSQueue root = scheduler.getRootQueue();
Resource clusterResources =
- Resources.clone(scheduler.getClusterResources());
+ Resources.clone(scheduler.getClusterResource());
containerBasedPreemptOrKill(root, clusterResources);
}
@@ -202,7 +208,9 @@ public class ProportionalCapacityPreempt
Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
getContainersToPreempt(queues, clusterResources);
- logToCSV(queues);
+ if (LOG.isDebugEnabled()) {
+ logToCSV(queues);
+ }
// if we are in observeOnly mode return before any action is taken
if (observeOnly) {
@@ -293,34 +301,31 @@ public class ProportionalCapacityPreempt
// with the total capacity for this set of queues
Resource unassigned = Resources.clone(tot_guarant);
- //assign all cluster resources until no more demand, or no resources are left
- while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
- unassigned, Resources.none())) {
- Resource wQassigned = Resource.newInstance(0, 0);
-
- // we compute normalizedGuarantees capacity based on currently active
- // queues
- resetCapacity(rc, unassigned, qAlloc);
-
- // offer for each queue their capacity first and in following invocations
- // their share of over-capacity
- for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
- TempQueue sub = i.next();
- Resource wQavail =
- Resources.multiply(unassigned, sub.normalizedGuarantee);
- Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
- Resource wQdone = Resources.subtract(wQavail, wQidle);
- // if the queue returned a value > 0 it means it is fully satisfied
- // and it is removed from the list of active queues qAlloc
- if (!Resources.greaterThan(rc, tot_guarant,
- wQdone, Resources.none())) {
- i.remove();
- }
- Resources.addTo(wQassigned, wQdone);
+ // group queues based on whether they have non-zero guaranteed capacity
+ Set<TempQueue> nonZeroGuarQueues = new HashSet<TempQueue>();
+ Set<TempQueue> zeroGuarQueues = new HashSet<TempQueue>();
+
+ for (TempQueue q : qAlloc) {
+ if (Resources
+ .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
+ nonZeroGuarQueues.add(q);
+ } else {
+ zeroGuarQueues.add(q);
}
- Resources.subtractFrom(unassigned, wQassigned);
}
+ // first compute the allocation as a fixpoint based on guaranteed capacity
+ computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned,
+ false);
+
+ // if any capacity is left unassigned, distributed among zero-guarantee
+ // queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
+ if (!zeroGuarQueues.isEmpty()
+ && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) {
+ computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned,
+ true);
+ }
+
// based on ideal assignment computed above and current assignment we derive
// how much preemption is required overall
Resource totPreemptionNeeded = Resource.newInstance(0, 0);
@@ -353,6 +358,46 @@ public class ProportionalCapacityPreempt
}
}
+
+ /**
+ * Given a set of queues compute the fix-point distribution of unassigned
+ * resources among them. As pending request of a queue are exhausted, the
+ * queue is removed from the set and remaining capacity redistributed among
+ * remaining queues. The distribution is weighted based on guaranteed
+ * capacity, unless asked to ignoreGuarantee, in which case resources are
+ * distributed uniformly.
+ */
+ private void computeFixpointAllocation(ResourceCalculator rc,
+ Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned,
+ boolean ignoreGuarantee) {
+ //assign all cluster resources until no more demand, or no resources are left
+ while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
+ unassigned, Resources.none())) {
+ Resource wQassigned = Resource.newInstance(0, 0);
+
+ // we compute normalizedGuarantees capacity based on currently active
+ // queues
+ resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee);
+
+ // offer for each queue their capacity first and in following invocations
+ // their share of over-capacity
+ for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
+ TempQueue sub = i.next();
+ Resource wQavail =
+ Resources.multiply(unassigned, sub.normalizedGuarantee);
+ Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
+ Resource wQdone = Resources.subtract(wQavail, wQidle);
+ // if the queue returned a value > 0 it means it is fully satisfied
+ // and it is removed from the list of active queues qAlloc
+ if (!Resources.greaterThan(rc, tot_guarant,
+ wQdone, Resources.none())) {
+ i.remove();
+ }
+ Resources.addTo(wQassigned, wQdone);
+ }
+ Resources.subtractFrom(unassigned, wQassigned);
+ }
+ }
/**
* Computes a normalizedGuaranteed capacity based on active queues
@@ -361,14 +406,21 @@ public class ProportionalCapacityPreempt
* @param queues the list of queues to consider
*/
private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
- List<TempQueue> queues) {
+ Collection<TempQueue> queues, boolean ignoreGuar) {
Resource activeCap = Resource.newInstance(0, 0);
- for (TempQueue q : queues) {
- Resources.addTo(activeCap, q.guaranteed);
- }
- for (TempQueue q : queues) {
- q.normalizedGuarantee = Resources.divide(rc, clusterResource,
- q.guaranteed, activeCap);
+
+ if (ignoreGuar) {
+ for (TempQueue q : queues) {
+ q.normalizedGuarantee = (float) 1.0f / ((float) queues.size());
+ }
+ } else {
+ for (TempQueue q : queues) {
+ Resources.addTo(activeCap, q.guaranteed);
+ }
+ for (TempQueue q : queues) {
+ q.normalizedGuarantee = Resources.divide(rc, clusterResource,
+ q.guaranteed, activeCap);
+ }
}
}
@@ -385,8 +437,9 @@ public class ProportionalCapacityPreempt
private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
List<TempQueue> queues, Resource clusterResource) {
- Map<ApplicationAttemptId,Set<RMContainer>> list =
+ Map<ApplicationAttemptId,Set<RMContainer>> preemptMap =
new HashMap<ApplicationAttemptId,Set<RMContainer>>();
+ List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
for (TempQueue qT : queues) {
// we act only if we are violating balance by more than
@@ -397,26 +450,83 @@ public class ProportionalCapacityPreempt
// accounts for natural termination of containers
Resource resToObtain =
Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
+ Resource skippedAMSize = Resource.newInstance(0, 0);
// lock the leafqueue while we scan applications and unreserve
- synchronized(qT.leafQueue) {
- NavigableSet<FiCaSchedulerApp> ns =
- (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
+ synchronized (qT.leafQueue) {
+ NavigableSet<FiCaSchedulerApp> ns =
+ (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
qT.actuallyPreempted = Resources.clone(resToObtain);
while (desc.hasNext()) {
FiCaSchedulerApp fc = desc.next();
- if (Resources.lessThanOrEqual(rc, clusterResource,
- resToObtain, Resources.none())) {
+ if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
+ Resources.none())) {
break;
}
- list.put(fc.getApplicationAttemptId(),
- preemptFrom(fc, clusterResource, resToObtain));
+ preemptMap.put(
+ fc.getApplicationAttemptId(),
+ preemptFrom(fc, clusterResource, resToObtain,
+ skippedAMContainerlist, skippedAMSize));
}
+ Resource maxAMCapacityForThisQueue = Resources.multiply(
+ Resources.multiply(clusterResource,
+ qT.leafQueue.getAbsoluteCapacity()),
+ qT.leafQueue.getMaxAMResourcePerQueuePercent());
+
+ // Can try preempting AMContainers (still saving atmost
+ // maxAMCapacityForThisQueue AMResource's) if more resources are
+ // required to be preempted from this Queue.
+ preemptAMContainers(clusterResource, preemptMap,
+ skippedAMContainerlist, resToObtain, skippedAMSize,
+ maxAMCapacityForThisQueue);
}
}
}
- return list;
+ return preemptMap;
+ }
+
+ /**
+ * As more resources are needed for preemption, saved AMContainers has to be
+ * rescanned. Such AMContainers can be preempted based on resToObtain, but
+ * maxAMCapacityForThisQueue resources will be still retained.
+ *
+ * @param clusterResource
+ * @param preemptMap
+ * @param skippedAMContainerlist
+ * @param resToObtain
+ * @param skippedAMSize
+ * @param maxAMCapacityForThisQueue
+ */
+ private void preemptAMContainers(Resource clusterResource,
+ Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+ List<RMContainer> skippedAMContainerlist, Resource resToObtain,
+ Resource skippedAMSize, Resource maxAMCapacityForThisQueue) {
+ for (RMContainer c : skippedAMContainerlist) {
+ // Got required amount of resources for preemption, can stop now
+ if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
+ Resources.none())) {
+ break;
+ }
+ // Once skippedAMSize reaches down to maxAMCapacityForThisQueue,
+ // container selection iteration for preemption will be stopped.
+ if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize,
+ maxAMCapacityForThisQueue)) {
+ break;
+ }
+ Set<RMContainer> contToPrempt = preemptMap.get(c
+ .getApplicationAttemptId());
+ if (null == contToPrempt) {
+ contToPrempt = new HashSet<RMContainer>();
+ preemptMap.put(c.getApplicationAttemptId(), contToPrempt);
+ }
+ contToPrempt.add(c);
+
+ Resources.subtractFrom(resToObtain, c.getContainer().getResource());
+ Resources.subtractFrom(skippedAMSize, c.getContainer()
+ .getResource());
+ }
+ skippedAMContainerlist.clear();
}
/**
@@ -428,8 +538,9 @@ public class ProportionalCapacityPreempt
* @param rsrcPreempt
* @return
*/
- private Set<RMContainer> preemptFrom(
- FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) {
+ private Set<RMContainer> preemptFrom(FiCaSchedulerApp app,
+ Resource clusterResource, Resource rsrcPreempt,
+ List<RMContainer> skippedAMContainerlist, Resource skippedAMSize) {
Set<RMContainer> ret = new HashSet<RMContainer>();
ApplicationAttemptId appId = app.getApplicationAttemptId();
@@ -461,6 +572,12 @@ public class ProportionalCapacityPreempt
rsrcPreempt, Resources.none())) {
return ret;
}
+ // Skip AM Container from preemption for now.
+ if (c.isAMContainer()) {
+ skippedAMContainerlist.add(c);
+ Resources.addTo(skippedAMSize, c.getContainer().getResource());
+ continue;
+ }
ret.add(c);
Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
}
@@ -515,18 +632,25 @@ public class ProportionalCapacityPreempt
private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
TempQueue ret;
synchronized (root) {
- float absUsed = root.getAbsoluteUsedCapacity();
+ String queueName = root.getQueueName();
+ float absUsed = root.getAbsoluteUsedCapacity();
+ float absCap = root.getAbsoluteCapacity();
+ float absMaxCap = root.getAbsoluteMaximumCapacity();
+
Resource current = Resources.multiply(clusterResources, absUsed);
- Resource guaranteed =
- Resources.multiply(clusterResources, root.getAbsoluteCapacity());
+ Resource guaranteed = Resources.multiply(clusterResources, absCap);
+ Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
if (root instanceof LeafQueue) {
LeafQueue l = (LeafQueue) root;
Resource pending = l.getTotalResourcePending();
- ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
+ ret = new TempQueue(queueName, current, pending, guaranteed,
+ maxCapacity);
+
ret.setLeafQueue(l);
} else {
Resource pending = Resource.newInstance(0, 0);
- ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
+ ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
+ maxCapacity);
for (CSQueue c : root.getChildQueues()) {
ret.addChild(cloneQueues(c, clusterResources));
}
@@ -551,7 +675,7 @@ public class ProportionalCapacityPreempt
sb.append(", ");
tq.appendLogString(sb);
}
- LOG.info(sb.toString());
+ LOG.debug(sb.toString());
}
/**
@@ -563,6 +687,7 @@ public class ProportionalCapacityPreempt
final Resource current;
final Resource pending;
final Resource guaranteed;
+ final Resource maxCapacity;
Resource idealAssigned;
Resource toBePreempted;
Resource actuallyPreempted;
@@ -573,11 +698,12 @@ public class ProportionalCapacityPreempt
LeafQueue leafQueue;
TempQueue(String queueName, Resource current, Resource pending,
- Resource guaranteed) {
+ Resource guaranteed, Resource maxCapacity) {
this.queueName = queueName;
this.current = current;
this.pending = pending;
this.guaranteed = guaranteed;
+ this.maxCapacity = maxCapacity;
this.idealAssigned = Resource.newInstance(0, 0);
this.actuallyPreempted = Resource.newInstance(0, 0);
this.toBePreempted = Resource.newInstance(0, 0);
@@ -614,12 +740,12 @@ public class ProportionalCapacityPreempt
// the unused ones
Resource offer(Resource avail, ResourceCalculator rc,
Resource clusterResource) {
- // remain = avail - min(avail, current + pending - assigned)
- Resource accepted = Resources.min(rc, clusterResource,
- avail,
- Resources.subtract(
- Resources.add(current, pending),
- idealAssigned));
+ // remain = avail - min(avail, (max - assigned), (current + pending - assigned))
+ Resource accepted =
+ Resources.min(rc, clusterResource,
+ Resources.subtract(maxCapacity, idealAssigned),
+ Resources.min(rc, clusterResource, avail, Resources.subtract(
+ Resources.add(current, pending), idealAssigned)));
Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted);
return remain;
@@ -628,13 +754,15 @@ public class ProportionalCapacityPreempt
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("CUR: ").append(current)
+ sb.append(" NAME: " + queueName)
+ .append(" CUR: ").append(current)
.append(" PEN: ").append(pending)
.append(" GAR: ").append(guaranteed)
.append(" NORM: ").append(normalizedGuarantee)
.append(" IDEAL_ASSIGNED: ").append(idealAssigned)
.append(" IDEAL_PREEMPT: ").append(toBePreempted)
- .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted);
+ .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted)
+ .append("\n");
return sb.toString();
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Tue Aug 19 23:49:39 2014
@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -43,14 +44,22 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
-import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -62,14 +71,20 @@ import com.google.common.annotations.Vis
* FileSystem interface. Does not use directories so that simple key-value
* stores can be used. The retry policy for the real filesystem client must be
* configured separately to enable retry of filesystem operations when needed.
+ *
+ * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
+ * separately. The currentMasterkey and nextMasterkey have been stored.
+ * Also, AMRMToken has been removed from ApplicationAttemptState.
*/
public class FileSystemRMStateStore extends RMStateStore {
public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
- protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
- .newInstance(1, 0);
+ protected static final Version CURRENT_VERSION_INFO = Version
+ .newInstance(1, 2);
+ protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
+ "AMRMTokenSecretManagerNode";
protected FileSystem fs;
@@ -83,6 +98,7 @@ public class FileSystemRMStateStore exte
@VisibleForTesting
Path fsWorkingPath;
+ Path amrmTokenSecretManagerRoot;
@Override
public synchronized void initInternal(Configuration conf)
throws Exception{
@@ -90,6 +106,8 @@ public class FileSystemRMStateStore exte
rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
+ amrmTokenSecretManagerRoot =
+ new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT);
}
@Override
@@ -107,6 +125,7 @@ public class FileSystemRMStateStore exte
fs = fsWorkingPath.getFileSystem(conf);
fs.mkdirs(rmDTSecretManagerRoot);
fs.mkdirs(rmAppRoot);
+ fs.mkdirs(amrmTokenSecretManagerRoot);
}
@Override
@@ -115,18 +134,18 @@ public class FileSystemRMStateStore exte
}
@Override
- protected RMStateVersion getCurrentVersion() {
+ protected Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
@Override
- protected synchronized RMStateVersion loadVersion() throws Exception {
+ protected synchronized Version loadVersion() throws Exception {
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
if (fs.exists(versionNodePath)) {
FileStatus status = fs.getFileStatus(versionNodePath);
byte[] data = readFile(versionNodePath, status.getLen());
- RMStateVersion version =
- new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
+ Version version =
+ new VersionPBImpl(VersionProto.parseFrom(data));
return version;
}
return null;
@@ -136,14 +155,37 @@ public class FileSystemRMStateStore exte
protected synchronized void storeVersion() throws Exception {
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
byte[] data =
- ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+ ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (fs.exists(versionNodePath)) {
updateFile(versionNodePath, data);
} else {
writeFile(versionNodePath, data);
}
}
-
+
+ @Override
+ public synchronized int getAndIncrementEpoch() throws Exception {
+ Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
+ int currentEpoch = 0;
+ if (fs.exists(epochNodePath)) {
+ // load current epoch
+ FileStatus status = fs.getFileStatus(epochNodePath);
+ byte[] data = readFile(epochNodePath, status.getLen());
+ Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
+ currentEpoch = epoch.getEpoch();
+ // increment epoch and store it
+ byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+ .toByteArray();
+ updateFile(epochNodePath, storeData);
+ } else {
+ // initialize epoch file with 1 for the next time.
+ byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+ .toByteArray();
+ writeFile(epochNodePath, storeData);
+ }
+ return currentEpoch;
+ }
+
@Override
public synchronized RMState loadState() throws Exception {
RMState rmState = new RMState();
@@ -151,9 +193,32 @@ public class FileSystemRMStateStore exte
loadRMDTSecretManagerState(rmState);
// recover RM applications
loadRMAppState(rmState);
+ // recover AMRMTokenSecretManager
+ loadAMRMTokenSecretManagerState(rmState);
return rmState;
}
+ private void loadAMRMTokenSecretManagerState(RMState rmState)
+ throws Exception {
+ checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
+ Path amrmTokenSecretManagerStateDataDir =
+ new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
+ FileStatus status;
+ try {
+ status = fs.getFileStatus(amrmTokenSecretManagerStateDataDir);
+ assert status.isFile();
+ } catch (FileNotFoundException ex) {
+ return;
+ }
+ byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen());
+ AMRMTokenSecretManagerStatePBImpl stateData =
+ new AMRMTokenSecretManagerStatePBImpl(
+ AMRMTokenSecretManagerStateProto.parseFrom(data));
+ rmState.amrmTokenSecretManagerState =
+ AMRMTokenSecretManagerState.newInstance(
+ stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
+ }
+
private void loadRMAppState(RMState rmState) throws Exception {
try {
List<ApplicationAttemptState> attempts =
@@ -214,7 +279,8 @@ public class FileSystemRMStateStore exte
attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
- attemptStateData.getFinalApplicationStatus());
+ attemptStateData.getFinalApplicationStatus(),
+ attemptStateData.getAMContainerExitStatus());
// assert child node name is same as application attempt id
assert attemptId.equals(attemptState.getAttemptId());
@@ -314,7 +380,7 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void storeApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+ ApplicationStateData appStateDataPB) throws Exception {
String appIdStr = appId.toString();
Path appDirPath = getAppDir(rmAppRoot, appIdStr);
fs.mkdirs(appDirPath);
@@ -334,7 +400,7 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+ ApplicationStateData appStateDataPB) throws Exception {
String appIdStr = appId.toString();
Path appDirPath = getAppDir(rmAppRoot, appIdStr);
Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
@@ -354,7 +420,7 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+ ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
Path appDirPath =
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
@@ -375,7 +441,7 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+ ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
Path appDirPath =
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
@@ -485,6 +551,13 @@ public class FileSystemRMStateStore exte
deleteFile(nodeCreatePath);
}
+ @Override
+ public synchronized void deleteStore() throws IOException {
+ if (fs.exists(rootDirPath)) {
+ fs.delete(rootDirPath, true);
+ }
+ }
+
private Path getAppDir(Path root, String appId) {
return getNodePath(root, appId);
}
@@ -560,4 +633,25 @@ public class FileSystemRMStateStore exte
return new Path(root, nodeName);
}
+ @Override
+ public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
+ AMRMTokenSecretManagerState amrmTokenSecretManagerState,
+ boolean isUpdate){
+ Path nodeCreatePath =
+ getNodePath(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
+ AMRMTokenSecretManagerState data =
+ AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
+ byte[] stateData = data.getProto().toByteArray();
+ try {
+ if (isUpdate) {
+ updateFile(nodeCreatePath, stateData);
+ } else {
+ writeFile(nodeCreatePath, stateData);
+ }
+ } catch (Exception ex) {
+ LOG.info("Error storing info for AMRMTokenSecretManager", ex);
+ notifyStoreOperationFailed(ex);
+ }
+ }
+
}