You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/11/19 06:21:45 UTC
svn commit: r1543313 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/...
Author: vinodkv
Date: Tue Nov 19 05:21:45 2013
New Revision: 1543313
URL: http://svn.apache.org/r1543313
Log:
YARN-674. Fixed ResourceManager to renew DelegationTokens on submission asynchronously to work around potential slowness in state-store. Contributed by Omkar Vinit Joshi.
svn merge --ignore-ancestry -c 1543312 ../../trunk/
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Tue Nov 19 05:21:45 2013
@@ -89,6 +89,10 @@ Release 2.3.0 - UNRELEASED
ensuring that previous AM exited or after expiry time. (Omkar Vinit Joshi via
vinodkv)
+ YARN-674. Fixed ResourceManager to renew DelegationTokens on submission
+ asynchronously to work around potential slowness in state-store. (Omkar Vinit
+ Joshi via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Tue Nov 19 05:21:45 2013
@@ -504,6 +504,11 @@ public class YarnConfiguration extends C
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
30000l;
+
+ /** Delegation Token renewer thread count */
+ public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT =
+ RM_PREFIX + "delegation-token-renewer.thread-count";
+ public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50;
/** Whether to enable log aggregation */
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Tue Nov 19 05:21:45 2013
@@ -318,7 +318,7 @@ public class ClientRMService extends Abs
try {
// call RMAppManager to submit application directly
rmAppManager.submitApplication(submissionContext,
- System.currentTimeMillis(), false, user);
+ System.currentTimeMillis(), user, false, null);
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Tue Nov 19 05:21:45 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -236,35 +237,63 @@ public class RMAppManager implements Eve
this.applicationACLsManager.removeApplication(removeId);
}
}
-
+
@SuppressWarnings("unchecked")
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
- boolean isRecovered, String user) throws YarnException {
+ String user, boolean isRecovered, RMState state) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
- // Validation of the ApplicationSubmissionContext needs to be completed
- // here. Only those fields that are dependent on RM's configuration are
- // checked here as they have to be validated whether they are part of new
- // submission or just being recovered.
+ RMAppImpl application =
+ createAndPopulateNewRMApp(submissionContext, submitTime, user);
- // Check whether AM resource requirements are within required limits
- if (!submissionContext.getUnmanagedAM()) {
- ResourceRequest amReq = BuilderUtils.newResourceRequest(
- RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
- submissionContext.getResource(), 1);
- try {
- SchedulerUtils.validateResourceRequest(amReq,
- scheduler.getMaximumResourceCapability());
- } catch (InvalidResourceRequestException e) {
- LOG.warn("RM app submission failed in validating AM resource request"
- + " for application " + applicationId, e);
- throw e;
+ if (isRecovered) {
+ recoverApplication(state, application);
+ RMAppState rmAppState =
+ state.getApplicationState().get(applicationId).getState();
+ if (isApplicationInFinalState(rmAppState)) {
+ // We are synchronously moving the application into final state so that
+ // momentarily client will not see this application in NEW state. Also
+ // for finished applications we will avoid renewing tokens.
+ application
+ .handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER));
+ return;
}
}
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ Credentials credentials = null;
+ try {
+ credentials = parseCredentials(submissionContext);
+ } catch (Exception e) {
+ LOG.warn(
+ "Unable to parse credentials.", e);
+ // Sending APP_REJECTED is fine, since we assume that the
+ // RMApp is in NEW state and thus we haven't yet informed the
+ // scheduler about the existence of the application
+ assert application.getState() == RMAppState.NEW;
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppRejectedEvent(applicationId, e.getMessage()));
+ throw RPCUtil.getRemoteException(e);
+ }
+ this.rmContext.getDelegationTokenRenewer().addApplication(
+ applicationId, credentials,
+ submissionContext.getCancelTokensWhenComplete(), isRecovered);
+ } else {
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(applicationId,
+ isRecovered ? RMAppEventType.RECOVER : RMAppEventType.START));
+ }
+ }
+ private RMAppImpl createAndPopulateNewRMApp(
+ ApplicationSubmissionContext submissionContext,
+ long submitTime, String user)
+ throws YarnException {
+ ApplicationId applicationId = submissionContext.getApplicationId();
+ validateResourceRequest(submissionContext);
// Create RMApp
- RMApp application =
+ RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(), user,
submissionContext.getQueue(),
@@ -281,35 +310,52 @@ public class RMAppManager implements Eve
LOG.warn(message);
throw RPCUtil.getRemoteException(message);
}
-
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());
+ return application;
+ }
+ private void validateResourceRequest(
+ ApplicationSubmissionContext submissionContext)
+ throws InvalidResourceRequestException {
+ // Validation of the ApplicationSubmissionContext needs to be completed
+ // here. Only those fields that are dependent on RM's configuration are
+ // checked here as they have to be validated whether they are part of new
+ // submission or just being recovered.
+
+ // Check whether AM resource requirements are within required limits
+ if (!submissionContext.getUnmanagedAM()) {
+ ResourceRequest amReq = BuilderUtils.newResourceRequest(
+ RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
+ submissionContext.getResource(), 1);
+ try {
+ SchedulerUtils.validateResourceRequest(amReq,
+ scheduler.getMaximumResourceCapability());
+ } catch (InvalidResourceRequestException e) {
+ LOG.warn("RM app submission failed in validating AM resource request"
+ + " for application " + submissionContext.getApplicationId(), e);
+ throw e;
+ }
+ }
+ }
+
+ private void recoverApplication(RMState state, RMAppImpl application)
+ throws YarnException {
try {
- // Setup tokens for renewal
- if (UserGroupInformation.isSecurityEnabled()) {
- this.rmContext.getDelegationTokenRenewer().addApplication(
- applicationId,parseCredentials(submissionContext),
- submissionContext.getCancelTokensWhenComplete()
- );
- }
- } catch (IOException ie) {
- LOG.warn(
- "Unable to add the application to the delegation token renewer.",
- ie);
- // Sending APP_REJECTED is fine, since we assume that the
- // RMApp is in NEW state and thus we havne't yet informed the
- // Scheduler about the existence of the application
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppRejectedEvent(applicationId, ie.getMessage()));
- throw RPCUtil.getRemoteException(ie);
+ application.recover(state);
+ } catch (Exception e) {
+ LOG.error("Error recovering application", e);
+ throw new YarnException(e);
}
+ }
- if (!isRecovered) {
- // All done, start the RMApp
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId, RMAppEventType.START));
+ private boolean isApplicationInFinalState(RMAppState rmAppState) {
+ if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED
+ || rmAppState == RMAppState.KILLED) {
+ return true;
+ } else {
+ return false;
}
}
@@ -335,17 +381,9 @@ public class RMAppManager implements Eve
LOG.info("Recovering " + appStates.size() + " applications");
for (ApplicationState appState : appStates.values()) {
LOG.info("Recovering application " + appState.getAppId());
+
submitApplication(appState.getApplicationSubmissionContext(),
- appState.getSubmitTime(), true, appState.getUser());
- // re-populate attempt information in application
- RMAppImpl appImpl =
- (RMAppImpl) rmContext.getRMApps().get(appState.getAppId());
- appImpl.recover(state);
- // Recover the app synchronously, as otherwise client is possible to see
- // the application not recovered before it is actually recovered because
- // ClientRMService is already started at this point of time.
- appImpl.handle(new RMAppEvent(appImpl.getApplicationId(),
- RMAppEventType.RECOVER));
+ appState.getSubmitTime(), appState.getUser(), true, state);
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Tue Nov 19 05:21:45 2013
@@ -34,6 +34,10 @@ import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
@@ -48,10 +52,15 @@ import org.apache.hadoop.service.Abstrac
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Service to renew application delegation tokens.
@@ -72,7 +81,8 @@ public class DelegationTokenRenewer exte
// delegation token canceler thread
private DelegationTokenCancelThread dtCancelThread =
new DelegationTokenCancelThread();
-
+ private ThreadPoolExecutor renewerService;
+
// managing the list of tokens using Map
// appId=>List<tokens>
private Set<DelegationTokenToRenew> delegationTokens =
@@ -84,9 +94,9 @@ public class DelegationTokenRenewer exte
private long tokenRemovalDelayMs;
private Thread delayedRemovalThread;
- private boolean isServiceStarted = false;
- private List<DelegationTokenToRenew> pendingTokenForRenewal =
- new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
+ private ReadWriteLock serviceStateLock = new ReentrantReadWriteLock();
+ private volatile boolean isServiceStarted;
+ private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
private boolean tokenKeepAliveEnabled;
@@ -102,9 +112,27 @@ public class DelegationTokenRenewer exte
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+ renewerService = createNewThreadPoolService(conf);
+ pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
super.serviceInit(conf);
}
+ protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) {
+ int nThreads = conf.getInt(
+ YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT);
+
+ ThreadFactory tf = new ThreadFactoryBuilder()
+ .setNameFormat("DelegationTokenRenewer #%d")
+ .build();
+ ThreadPoolExecutor pool =
+ new ThreadPoolExecutor((5 < nThreads ? 5 : nThreads), nThreads, 3L,
+ TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ pool.setThreadFactory(tf);
+ pool.allowCoreThreadTimeOut(true);
+ return pool;
+ }
+
@Override
protected void serviceStart() throws Exception {
dtCancelThread.start();
@@ -119,21 +147,36 @@ public class DelegationTokenRenewer exte
RMDelegationTokenIdentifier.Renewer.setSecretManager(
rmContext.getRMDelegationTokenSecretManager(),
rmContext.getClientRMService().getBindAddress());
- // Delegation token renewal is delayed until ClientRMService starts. As
- // it is required to short circuit the token renewal calls.
+ serviceStateLock.writeLock().lock();
isServiceStarted = true;
- renewIfServiceIsStarted(pendingTokenForRenewal);
- pendingTokenForRenewal.clear();
+ serviceStateLock.writeLock().unlock();
+ while(!pendingEventQueue.isEmpty()) {
+ processDelegationTokenRewewerEvent(pendingEventQueue.take());
+ }
super.serviceStart();
}
+ private void processDelegationTokenRewewerEvent(
+ DelegationTokenRenewerEvent evt) {
+ serviceStateLock.readLock().lock();
+ try {
+ if (isServiceStarted) {
+ renewerService.execute(new DelegationTokenRenewerRunnable(evt));
+ } else {
+ pendingEventQueue.add(evt);
+ }
+ } finally {
+ serviceStateLock.readLock().unlock();
+ }
+ }
+
@Override
protected void serviceStop() {
if (renewalTimer != null) {
renewalTimer.cancel();
}
delegationTokens.clear();
-
+ this.renewerService.shutdown();
dtCancelThread.interrupt();
try {
dtCancelThread.join(1000);
@@ -290,47 +333,50 @@ public class DelegationTokenRenewer exte
* @throws IOException
*/
public void addApplication(
- ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
+ ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd,
+ boolean isApplicationRecovered) {
+ processDelegationTokenRewewerEvent(new DelegationTokenRenewerAppSubmitEvent(
+ applicationId, ts,
+ shouldCancelAtEnd, isApplicationRecovered));
+ }
+
+ private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
throws IOException {
+ ApplicationId applicationId = evt.getApplicationId();
+ Credentials ts = evt.getCredentials();
+ boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
if (ts == null) {
- return; //nothing to add
+ return; // nothing to add
}
-
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Registering tokens for renewal for:" +
+ LOG.debug("Registering tokens for renewal for:" +
" appId = " + applicationId);
}
-
- Collection <Token<?>> tokens = ts.getAllTokens();
+
+ Collection<Token<?>> tokens = ts.getAllTokens();
long now = System.currentTimeMillis();
-
+
// find tokens for renewal, but don't add timers until we know
// all renewable tokens are valid
// At RM restart it is safe to assume that all the previously added tokens
// are valid
List<DelegationTokenToRenew> tokenList =
new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
- for(Token<?> token : tokens) {
+ for (Token<?> token : tokens) {
if (token.isManaged()) {
tokenList.add(new DelegationTokenToRenew(applicationId,
token, getConfig(), now, shouldCancelAtEnd));
}
}
- if (!tokenList.isEmpty()){
- renewIfServiceIsStarted(tokenList);
- }
- }
-
- protected void renewIfServiceIsStarted(List<DelegationTokenToRenew> dtrs)
- throws IOException {
- if (isServiceStarted) {
+ if (!tokenList.isEmpty()) {
// Renewing token and adding it to timer calls are separated purposefully
// If user provides incorrect token then it should not be added for
// renewal.
- for (DelegationTokenToRenew dtr : dtrs) {
+ for (DelegationTokenToRenew dtr : tokenList) {
renewToken(dtr);
}
- for (DelegationTokenToRenew dtr : dtrs) {
+ for (DelegationTokenToRenew dtr : tokenList) {
addTokenToList(dtr);
setTimerForTokenRenewal(dtr);
if (LOG.isDebugEnabled()) {
@@ -338,11 +384,9 @@ public class DelegationTokenRenewer exte
+ dtr.token.getService() + " for appId = " + dtr.applicationId);
}
}
- } else {
- pendingTokenForRenewal.addAll(dtrs);
}
}
-
+
/**
* Task - to renew a token
*
@@ -449,14 +493,20 @@ public class DelegationTokenRenewer exte
* @param applicationId completed application
*/
public void applicationFinished(ApplicationId applicationId) {
+ processDelegationTokenRewewerEvent(new DelegationTokenRenewerEvent(
+ applicationId,
+ DelegationTokenRenewerEventType.FINISH_APPLICATION));
+ }
+
+ private void handleAppFinishEvent(DelegationTokenRenewerEvent evt) {
if (!tokenKeepAliveEnabled) {
- removeApplicationFromRenewal(applicationId);
+ removeApplicationFromRenewal(evt.getApplicationId());
} else {
- delayedRemovalMap.put(applicationId, System.currentTimeMillis()
+ delayedRemovalMap.put(evt.getApplicationId(), System.currentTimeMillis()
+ tokenRemovalDelayMs);
}
}
-
+
/**
* Add a list of applications to the keep alive list. If an appId already
* exists, update it's keep-alive time.
@@ -546,4 +596,111 @@ public class DelegationTokenRenewer exte
public void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
+
+ /*
+ * This will run as a separate thread and will process individual events. It
+ * is done in this way to make sure that the token renewal as a part of
+ * application submission and token removal as a part of application finish
+ * is asynchronous in nature.
+ */
+ private final class DelegationTokenRenewerRunnable
+ implements Runnable {
+
+ private DelegationTokenRenewerEvent evt;
+
+ public DelegationTokenRenewerRunnable(DelegationTokenRenewerEvent evt) {
+ this.evt = evt;
+ }
+
+ @Override
+ public void run() {
+ if (evt instanceof DelegationTokenRenewerAppSubmitEvent) {
+ DelegationTokenRenewerAppSubmitEvent appSubmitEvt =
+ (DelegationTokenRenewerAppSubmitEvent) evt;
+ handleDTRenewerAppSubmitEvent(appSubmitEvt);
+ } else if (evt.getType().equals(
+ DelegationTokenRenewerEventType.FINISH_APPLICATION)) {
+ DelegationTokenRenewer.this.handleAppFinishEvent(evt);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void handleDTRenewerAppSubmitEvent(
+ DelegationTokenRenewerAppSubmitEvent event) {
+ /*
+ * For applications submitted with delegation tokens we are not submitting
+ * the application to scheduler from RMAppManager. Instead we are doing
+ * it from here. The primary goal is to make token renewal as a part of
+ * application submission asynchronous so that client thread is not
+ * blocked during app submission.
+ */
+ try {
+ // Setup tokens for renewal
+ DelegationTokenRenewer.this.handleAppSubmitEvent(event);
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(event.getApplicationId(),
+ event.isApplicationRecovered() ? RMAppEventType.RECOVER
+ : RMAppEventType.START));
+ } catch (Throwable t) {
+ LOG.warn(
+ "Unable to add the application to the delegation token renewer.",
+ t);
+ // Sending APP_REJECTED is fine, since we assume that the
+ // RMApp is in NEW state and thus we havne't yet informed the
+ // Scheduler about the existence of the application
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppRejectedEvent(event.getApplicationId(), t.getMessage()));
+ }
+ }
+ }
+
+ class DelegationTokenRenewerAppSubmitEvent extends
+ DelegationTokenRenewerEvent {
+
+ private Credentials credentials;
+ private boolean shouldCancelAtEnd;
+ private boolean isAppRecovered;
+
+ public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
+ Credentials credentails, boolean shouldCancelAtEnd,
+ boolean isApplicationRecovered) {
+ super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
+ this.credentials = credentails;
+ this.shouldCancelAtEnd = shouldCancelAtEnd;
+ this.isAppRecovered = isApplicationRecovered;
+ }
+
+ public Credentials getCredentials() {
+ return credentials;
+ }
+
+ public boolean shouldCancelAtEnd() {
+ return shouldCancelAtEnd;
+ }
+
+ public boolean isApplicationRecovered() {
+ return isAppRecovered;
+ }
+ }
+
+ enum DelegationTokenRenewerEventType {
+ VERIFY_AND_START_APPLICATION,
+ FINISH_APPLICATION
+ }
+
+ class DelegationTokenRenewerEvent extends
+ AbstractEvent<DelegationTokenRenewerEventType> {
+
+ private ApplicationId appId;
+
+ public DelegationTokenRenewerEvent(ApplicationId appId,
+ DelegationTokenRenewerEventType type) {
+ super(type);
+ this.appId = appId;
+ }
+
+ public ApplicationId getApplicationId() {
+ return appId;
+ }
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java Tue Nov 19 05:21:45 2013
@@ -172,7 +172,7 @@ public class TestAppManager{
ApplicationSubmissionContext submissionContext, String user)
throws YarnException {
super.submitApplication(submissionContext, System.currentTimeMillis(),
- false, user);
+ user, false, null);
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Tue Nov 19 05:21:45 2013
@@ -1009,6 +1009,10 @@ public class TestRMRestart {
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
rm2.start();
+ // Need to wait for a while as now token renewal happens on another thread
+ // and is asynchronous in nature.
+ waitForTokensToBeRenewed(rm2);
+
// verify tokens are properly populated back to rm2 DelegationTokenRenewer
Assert.assertEquals(tokenSet, rm2.getRMContext()
.getDelegationTokenRenewer().getDelegationTokens());
@@ -1018,6 +1022,21 @@ public class TestRMRestart {
rm2.stop();
}
+ private void waitForTokensToBeRenewed(MockRM rm2) throws Exception {
+ int waitCnt = 20;
+ boolean atleastOneAppInNEWState = true;
+ while (waitCnt-- > 0 && atleastOneAppInNEWState) {
+ atleastOneAppInNEWState = false;
+ for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) {
+ if (rmApp.getState() == RMAppState.NEW) {
+ Thread.sleep(1000);
+ atleastOneAppInNEWState = true;
+ break;
+ }
+ }
+ }
+ }
+
@Test
public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java Tue Nov 19 05:21:45 2013
@@ -31,13 +31,24 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -46,16 +57,29 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Before;
@@ -66,14 +90,18 @@ import org.mockito.stubbing.Answer;
/**
* unit test -
- * tests addition/deletion/cancelation of renewals of delegation tokens
+ * tests addition/deletion/cancellation of renewals of delegation tokens
*
*/
+@SuppressWarnings("rawtypes")
public class TestDelegationTokenRenewer {
private static final Log LOG =
LogFactory.getLog(TestDelegationTokenRenewer.class);
private static final Text KIND = new Text("TestDelegationTokenRenewer.Token");
+ private static BlockingQueue<Event> eventQueue;
+ private static volatile AtomicInteger counter;
+ private static AsyncDispatcher dispatcher;
public static class Renewer extends TokenRenewer {
private static int counter = 0;
private static Token<?> lastRenewed = null;
@@ -143,11 +171,20 @@ public class TestDelegationTokenRenewer
@Before
public void setUp() throws Exception {
+ counter = new AtomicInteger(0);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ eventQueue = new LinkedBlockingQueue<Event>();
+ dispatcher = new AsyncDispatcher(eventQueue);
Renewer.reset();
- delegationTokenRenewer = new DelegationTokenRenewer();
+ delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
delegationTokenRenewer.init(conf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
+ when(mockContext.getDelegationTokenRenewer()).thenReturn(
+ delegationTokenRenewer);
+ when(mockContext.getDispatcher()).thenReturn(dispatcher);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
@@ -285,7 +322,7 @@ public class TestDelegationTokenRenewer
* @throws IOException
* @throws URISyntaxException
*/
- @Test
+ @Test(timeout=60000)
public void testDTRenewal () throws Exception {
MyFS dfs = (MyFS)FileSystem.get(conf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
@@ -316,8 +353,9 @@ public class TestDelegationTokenRenewer
// register the tokens for renewal
ApplicationId applicationId_0 =
BuilderUtils.newApplicationId(0, 0);
- delegationTokenRenewer.addApplication(applicationId_0, ts, true);
-
+ delegationTokenRenewer.addApplication(applicationId_0, ts, true, false);
+ waitForEventsToGetProcessed(delegationTokenRenewer);
+
// first 3 initial renewals + 1 real
int numberOfExpectedRenewals = 3+1;
@@ -355,9 +393,10 @@ public class TestDelegationTokenRenewer
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
- delegationTokenRenewer.addApplication(applicationId_1, ts, true);
+ delegationTokenRenewer.addApplication(applicationId_1, ts, true, false);
+ waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1);
-
+ waitForEventsToGetProcessed(delegationTokenRenewer);
numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try {
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
@@ -377,8 +416,8 @@ public class TestDelegationTokenRenewer
}
}
- @Test
- public void testInvalidDTWithAddApplication() throws Exception {
+ @Test(timeout=60000)
+ public void testAppRejectionWithCancelledDelegationToken() throws Exception {
MyFS dfs = (MyFS)FileSystem.get(conf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
@@ -390,12 +429,21 @@ public class TestDelegationTokenRenewer
// register the tokens for renewal
ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
- try {
- delegationTokenRenewer.addApplication(appId, ts, true);
- fail("App submission with a cancelled token should have failed");
- } catch (InvalidToken e) {
- // expected
+ delegationTokenRenewer.addApplication(appId, ts, true, false);
+ int waitCnt = 20;
+ while (waitCnt-- >0) {
+ if (!eventQueue.isEmpty()) {
+ Event evt = eventQueue.take();
+ if (evt.getType() == RMAppEventType.APP_REJECTED) {
+ Assert.assertTrue(
+ ((RMAppEvent) evt).getApplicationId().equals(appId));
+ return;
+ }
+ } else {
+ Thread.sleep(500);
+ }
}
+ fail("App submission with a cancelled token should have failed");
}
/**
@@ -408,7 +456,7 @@ public class TestDelegationTokenRenewer
* @throws IOException
* @throws URISyntaxException
*/
- @Test
+ @Test(timeout=60000)
public void testDTRenewalWithNoCancel () throws Exception {
MyFS dfs = (MyFS)FileSystem.get(conf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
@@ -425,9 +473,10 @@ public class TestDelegationTokenRenewer
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
- delegationTokenRenewer.addApplication(applicationId_1, ts, false);
+ delegationTokenRenewer.addApplication(applicationId_1, ts, false, false);
+ waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1);
-
+ waitForEventsToGetProcessed(delegationTokenRenewer);
int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try {
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
@@ -454,9 +503,8 @@ public class TestDelegationTokenRenewer
* @throws IOException
* @throws URISyntaxException
*/
- @Test
+ @Test(timeout=60000)
public void testDTKeepAlive1 () throws Exception {
- DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
Configuration lconf = new Configuration(conf);
lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
//Keep tokens alive for 6 seconds.
@@ -465,10 +513,15 @@ public class TestDelegationTokenRenewer
lconf.setLong(
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
1000l);
+ DelegationTokenRenewer localDtr =
+ createNewDelegationTokenRenewer(lconf, counter);
localDtr.init(lconf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+ when(mockContext.getDelegationTokenRenewer()).thenReturn(
+ localDtr);
+ when(mockContext.getDispatcher()).thenReturn(dispatcher);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
@@ -487,16 +540,25 @@ public class TestDelegationTokenRenewer
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
- localDtr.addApplication(applicationId_0, ts, true);
+ localDtr.addApplication(applicationId_0, ts, true, false);
+ waitForEventsToGetProcessed(localDtr);
+ if (!eventQueue.isEmpty()){
+ Event evt = eventQueue.take();
+ if (evt instanceof RMAppEvent) {
+ Assert.assertEquals(((RMAppEvent)evt).getType(), RMAppEventType.START);
+ } else {
+ fail("RMAppEvent.START was expected!!");
+ }
+ }
+
localDtr.applicationFinished(applicationId_0);
-
- Thread.sleep(3000l);
+ waitForEventsToGetProcessed(localDtr);
//Token should still be around. Renewal should not fail.
token1.renew(lconf);
//Allow the keepalive time to run out
- Thread.sleep(6000l);
+ Thread.sleep(10000l);
//The token should have been cancelled at this point. Renewal will fail.
try {
@@ -518,9 +580,8 @@ public class TestDelegationTokenRenewer
* @throws IOException
* @throws URISyntaxException
*/
- @Test
+ @Test(timeout=60000)
public void testDTKeepAlive2() throws Exception {
- DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
Configuration lconf = new Configuration(conf);
lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
//Keep tokens alive for 6 seconds.
@@ -529,10 +590,15 @@ public class TestDelegationTokenRenewer
lconf.setLong(
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
1000l);
+ DelegationTokenRenewer localDtr =
+ createNewDelegationTokenRenewer(conf, counter);
localDtr.init(lconf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+ when(mockContext.getDelegationTokenRenewer()).thenReturn(
+ localDtr);
+ when(mockContext.getDispatcher()).thenReturn(dispatcher);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
@@ -551,22 +617,18 @@ public class TestDelegationTokenRenewer
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
- localDtr.addApplication(applicationId_0, ts, true);
+ localDtr.addApplication(applicationId_0, ts, true, false);
localDtr.applicationFinished(applicationId_0);
-
- Thread.sleep(4000l);
-
+ waitForEventsToGetProcessed(delegationTokenRenewer);
//Send another keep alive.
localDtr.updateKeepAliveApplications(Collections
.singletonList(applicationId_0));
//Renewal should not fail.
token1.renew(lconf);
-
//Token should be around after this.
Thread.sleep(4500l);
//Renewal should not fail. - ~1.5 seconds for keepalive timeout.
token1.renew(lconf);
-
//Allow the keepalive time to run out
Thread.sleep(3000l);
//The token should have been cancelled at this point. Renewal will fail.
@@ -575,61 +637,127 @@ public class TestDelegationTokenRenewer
fail("Renewal of cancelled token should have failed");
} catch (InvalidToken ite) {}
}
-
- @Test(timeout=20000)
- public void testConncurrentAddApplication()
- throws IOException, InterruptedException, BrokenBarrierException {
- final CyclicBarrier startBarrier = new CyclicBarrier(2);
- final CyclicBarrier endBarrier = new CyclicBarrier(2);
-
- // this token uses barriers to block during renew
- final Credentials creds1 = new Credentials();
- final Token<?> token1 = mock(Token.class);
- creds1.addToken(new Text("token"), token1);
- doReturn(true).when(token1).isManaged();
- doAnswer(new Answer<Long>() {
- public Long answer(InvocationOnMock invocation)
- throws InterruptedException, BrokenBarrierException {
- startBarrier.await();
- endBarrier.await();
- return Long.MAX_VALUE;
- }}).when(token1).renew(any(Configuration.class));
-
- // this dummy token fakes renewing
- final Credentials creds2 = new Credentials();
- final Token<?> token2 = mock(Token.class);
- creds2.addToken(new Text("token"), token2);
- doReturn(true).when(token2).isManaged();
- doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
-
- // fire up the renewer
- final DelegationTokenRenewer dtr = new DelegationTokenRenewer();
- dtr.init(conf);
- RMContext mockContext = mock(RMContext.class);
- ClientRMService mockClientRMService = mock(ClientRMService.class);
- when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
- InetSocketAddress sockAddr =
- InetSocketAddress.createUnresolved("localhost", 1234);
- when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
- dtr.setRMContext(mockContext);
- dtr.start();
-
- // submit a job that blocks during renewal
- Thread submitThread = new Thread() {
+
+ private DelegationTokenRenewer createNewDelegationTokenRenewer(
+ Configuration conf, final AtomicInteger counter) {
+ return new DelegationTokenRenewer() {
+
@Override
- public void run() {
- try {
- dtr.addApplication(mock(ApplicationId.class), creds1, false);
- } catch (IOException e) {}
+ protected ThreadPoolExecutor
+ createNewThreadPoolService(Configuration conf) {
+ ThreadPoolExecutor pool =
+ new ThreadPoolExecutor(5, 5, 3L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>()) {
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ counter.decrementAndGet();
+ super.afterExecute(r, t);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ counter.incrementAndGet();
+ super.execute(command);
+ }
+ };
+ return pool;
}
};
- submitThread.start();
-
+ }
+
+ private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr)
+ throws InterruptedException {
+ int wait = 40;
+ while (wait-- > 0
+ && counter.get() > 0) {
+ Thread.sleep(200);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testConcurrentAddApplication()
+ throws IOException, InterruptedException, BrokenBarrierException {
+ final CyclicBarrier startBarrier = new CyclicBarrier(2);
+ final CyclicBarrier endBarrier = new CyclicBarrier(2);
+
+ // this token uses barriers to block during renew
+ final Credentials creds1 = new Credentials();
+ final Token<?> token1 = mock(Token.class);
+ creds1.addToken(new Text("token"), token1);
+ doReturn(true).when(token1).isManaged();
+ doAnswer(new Answer<Long>() {
+ public Long answer(InvocationOnMock invocation)
+ throws InterruptedException, BrokenBarrierException {
+ startBarrier.await();
+ endBarrier.await();
+ return Long.MAX_VALUE;
+ }}).when(token1).renew(any(Configuration.class));
+
+ // this dummy token fakes renewing
+ final Credentials creds2 = new Credentials();
+ final Token<?> token2 = mock(Token.class);
+ creds2.addToken(new Text("token"), token2);
+ doReturn(true).when(token2).isManaged();
+ doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
+
+ // fire up the renewer
+ final DelegationTokenRenewer dtr =
+ createNewDelegationTokenRenewer(conf, counter);
+ dtr.init(conf);
+ RMContext mockContext = mock(RMContext.class);
+ ClientRMService mockClientRMService = mock(ClientRMService.class);
+ when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+ InetSocketAddress sockAddr =
+ InetSocketAddress.createUnresolved("localhost", 1234);
+ when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
+ dtr.setRMContext(mockContext);
+ when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
+ dtr.start();
+ // submit a job that blocks during renewal
+ Thread submitThread = new Thread() {
+ @Override
+ public void run() {
+ dtr.addApplication(mock(ApplicationId.class), creds1, false, false);
+ }
+ };
+ submitThread.start();
+
// wait till 1st submit blocks, then submit another
- startBarrier.await();
- dtr.addApplication(mock(ApplicationId.class), creds2, false);
- // signal 1st to complete
- endBarrier.await();
- submitThread.join();
+ startBarrier.await();
+ dtr.addApplication(mock(ApplicationId.class), creds2, false, false);
+ // signal 1st to complete
+ endBarrier.await();
+ submitThread.join();
+ }
+
+ @Test(timeout=20000)
+ public void testAppSubmissionWithInvalidDelegationToken() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ MockRM rm = new MockRM(conf);
+ ByteBuffer tokens = ByteBuffer.wrap("BOGUS".getBytes());
+ ContainerLaunchContext amContainer =
+ ContainerLaunchContext.newInstance(
+ new HashMap<String, LocalResource>(), new HashMap<String, String>(),
+ new ArrayList<String>(), new HashMap<String, ByteBuffer>(), tokens,
+ new HashMap<ApplicationAccessType, String>());
+ ApplicationSubmissionContext appSubContext =
+ ApplicationSubmissionContext.newInstance(
+ ApplicationId.newInstance(1234121, 0),
+ "BOGUS", "default", Priority.UNDEFINED, amContainer, false,
+ true, 1, Resource.newInstance(1024, 1), "BOGUS");
+ SubmitApplicationRequest request =
+ SubmitApplicationRequest.newInstance(appSubContext);
+ try {
+ rm.getClientRMService().submitApplication(request);
+ fail("Error was excepted.");
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage().contains(
+ "Bad header found in token storage"));
+ }
}
}