You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/09/02 06:34:59 UTC
[1/2] hadoop git commit: HDFS-8046. Allow better control of
getContentSummary. Contributed by Kihwal Lee. (cherry picked from commit
285b31e75e51ec8e3a796c2cb0208739368ca9b8)
Repository: hadoop
Updated Branches:
refs/heads/branch-2.6.1 31d30e811 -> 752e3da73
HDFS-8046. Allow better control of getContentSummary. Contributed by Kihwal Lee.
(cherry picked from commit 285b31e75e51ec8e3a796c2cb0208739368ca9b8)
(cherry picked from commit 7e622076d41a85fc9a8600fb270564a085f5cd83)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
(cherry picked from commit 1ef5e0b18066ca949adcf4c55a41f186c47e7264)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/619f7938
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/619f7938
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/619f7938
Branch: refs/heads/branch-2.6.1
Commit: 619f7938466e907f335941bbbbd928c6272a0482
Parents: 31d30e8
Author: Kihwal Lee <ki...@apache.org>
Authored: Wed Apr 8 15:39:25 2015 -0500
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Tue Sep 1 21:24:53 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 ++
.../main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +++-
.../server/namenode/ContentSummaryComputationContext.java | 10 +++++++---
.../apache/hadoop/hdfs/server/namenode/FSDirectory.java | 10 +++++++++-
4 files changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/619f7938/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 7efe993..231cc8e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -22,6 +22,8 @@ Release 2.6.1 - UNRELEASED
HDFS-7596. NameNode should prune dead storages from storageMap.
(Arpit Agarwal via cnauroth)
+ HDFS-8046. Allow better control of getContentSummary (kihwal)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/619f7938/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index fd313bb..85b740e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -272,7 +272,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit";
- public static final int DFS_CONTENT_SUMMARY_LIMIT_DEFAULT = 0;
+ public static final int DFS_CONTENT_SUMMARY_LIMIT_DEFAULT = 5000;
+ public static final String DFS_CONTENT_SUMMARY_SLEEP_MICROSEC_KEY = "dfs.content-summary.sleep-microsec";
+ public static final long DFS_CONTENT_SUMMARY_SLEEP_MICROSEC_DEFAULT = 500;
public static final String DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated";
public static final int DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;
public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/619f7938/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java
index dab64ec..17e16ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java
@@ -29,6 +29,8 @@ public class ContentSummaryComputationContext {
private long nextCountLimit = 0;
private long limitPerRun = 0;
private long yieldCount = 0;
+ private long sleepMilliSec = 0;
+ private int sleepNanoSec = 0;
/**
* Constructor
@@ -40,17 +42,19 @@ public class ContentSummaryComputationContext {
* no limit (i.e. no yielding)
*/
public ContentSummaryComputationContext(FSDirectory dir,
- FSNamesystem fsn, long limitPerRun) {
+ FSNamesystem fsn, long limitPerRun, long sleepMicroSec) {
this.dir = dir;
this.fsn = fsn;
this.limitPerRun = limitPerRun;
this.nextCountLimit = limitPerRun;
this.counts = Content.Counts.newInstance();
+ this.sleepMilliSec = sleepMicroSec/1000;
+ this.sleepNanoSec = (int)((sleepMicroSec%1000)*1000);
}
/** Constructor for blocking computation. */
public ContentSummaryComputationContext() {
- this(null, null, 0);
+ this(null, null, 0, 1000);
}
/** Return current yield count */
@@ -101,7 +105,7 @@ public class ContentSummaryComputationContext {
fsn.readUnlock();
try {
- Thread.sleep(1);
+ Thread.sleep(sleepMilliSec, sleepNanoSec);
} catch (InterruptedException ie) {
} finally {
// reacquire
http://git-wip-us.apache.org/repos/asf/hadoop/blob/619f7938/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 95877ab..9fd9699 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -144,6 +144,7 @@ public class FSDirectory implements Closeable {
private final int maxDirItems;
private final int lsLimit; // max list limit
private final int contentCountLimit; // max content summary counts per run
+ private final long contentSleepMicroSec;
private final INodeMap inodeMap; // Synchronized by dirLock
private long yieldCount = 0; // keep track of lock yield count.
private final int inodeXAttrsLimit; //inode xattrs max limit
@@ -204,6 +205,9 @@ public class FSDirectory implements Closeable {
this.contentCountLimit = conf.getInt(
DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY,
DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_DEFAULT);
+ this.contentSleepMicroSec = conf.getLong(
+ DFSConfigKeys.DFS_CONTENT_SUMMARY_SLEEP_MICROSEC_KEY,
+ DFSConfigKeys.DFS_CONTENT_SUMMARY_SLEEP_MICROSEC_DEFAULT);
// filesystem limits
this.maxComponentLength = conf.getInt(
@@ -252,6 +256,10 @@ public class FSDirectory implements Closeable {
return rootDir;
}
+ long getContentSleepMicroSec() {
+ return contentSleepMicroSec;
+ }
+
/**
* Shutdown the filestore
*/
@@ -2166,7 +2174,7 @@ public class FSDirectory implements Closeable {
ContentSummaryComputationContext cscc =
new ContentSummaryComputationContext(this, getFSNamesystem(),
- contentCountLimit);
+ contentCountLimit, contentCountLimit);
ContentSummary cs = targetNode.computeAndConvertContentSummary(cscc);
yieldCount += cscc.getYieldCount();
return cs;
[2/2] hadoop git commit: YARN-3055. Fixed ResourceManager's
DelegationTokenRenewer to not stop token renewal of applications part of a
bigger workflow. Contributed by Daryn Sharp.
Posted by vi...@apache.org.
YARN-3055. Fixed ResourceManager's DelegationTokenRenewer to not stop token renewal of applications part of a bigger workflow. Contributed by Daryn Sharp.
(cherry picked from commit 9c5911294e0ba71aefe4763731b0e780cde9d0ca)
(cherry picked from commit 1ff3fd33ed6f2ac09c774cc42b0107c5dbd9c19d)
(cherry picked from commit 82c722aae86669325672dd10840447434f15e7fd)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/752e3da7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/752e3da7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/752e3da7
Branch: refs/heads/branch-2.6.1
Commit: 752e3da738080cf7259b9085b736b42956d0bf1e
Parents: 619f793
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Thu Apr 9 13:08:53 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Tue Sep 1 21:31:00 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../security/DelegationTokenRenewer.java | 137 ++++++++++++-------
.../security/TestDelegationTokenRenewer.java | 87 +++++++++++-
3 files changed, 173 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/752e3da7/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d7e6622..fca01c0 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -120,6 +120,9 @@ Release 2.6.1 - UNRELEASED
YARN-3393. Getting application(s) goes wrong when app finishes before
starting the attempt. (Zhijie Shen via xgong)
+ YARN-3055. Fixed ResourceManager's DelegationTokenRenewer to not stop token
+ renewal of applications part of a bigger workflow. (Daryn Sharp via vinodkv)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/752e3da7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
index 70849fe..2cd31a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
@@ -229,15 +230,16 @@ public class DelegationTokenRenewer extends AbstractService {
@VisibleForTesting
protected static class DelegationTokenToRenew {
public final Token<?> token;
- public final ApplicationId applicationId;
+ public final Collection<ApplicationId> referringAppIds;
public final Configuration conf;
public long expirationDate;
- public TimerTask timerTask;
+ public RenewalTimerTask timerTask;
public volatile boolean shouldCancelAtEnd;
public long maxDate;
public String user;
- public DelegationTokenToRenew(ApplicationId jId, Token<?> token,
+ public DelegationTokenToRenew(Collection<ApplicationId> applicationIds,
+ Token<?> token,
Configuration conf, long expirationDate, boolean shouldCancelAtEnd,
String user) {
this.token = token;
@@ -251,20 +253,33 @@ public class DelegationTokenRenewer extends AbstractService {
throw new YarnRuntimeException(e);
}
}
- this.applicationId = jId;
+ this.referringAppIds = Collections.synchronizedSet(
+ new HashSet<ApplicationId>(applicationIds));
this.conf = conf;
this.expirationDate = expirationDate;
this.timerTask = null;
this.shouldCancelAtEnd = shouldCancelAtEnd;
}
- public void setTimerTask(TimerTask tTask) {
+ public void setTimerTask(RenewalTimerTask tTask) {
timerTask = tTask;
}
-
+
+ @VisibleForTesting
+ public void cancelTimer() {
+ if (timerTask != null) {
+ timerTask.cancel();
+ }
+ }
+
+ @VisibleForTesting
+ public boolean isTimerCancelled() {
+ return (timerTask != null) && timerTask.cancelled.get();
+ }
+
@Override
public String toString() {
- return token + ";exp=" + expirationDate;
+ return token + ";exp=" + expirationDate + "; apps=" + referringAppIds;
}
@Override
@@ -416,19 +431,16 @@ public class DelegationTokenRenewer extends AbstractService {
}
DelegationTokenToRenew dttr = allTokens.get(token);
- if (dttr != null) {
- // If any of the jobs sharing the same token doesn't want to cancel
- // the token, we should not cancel the token.
- if (!evt.shouldCancelAtEnd) {
- dttr.shouldCancelAtEnd = evt.shouldCancelAtEnd;
- LOG.info("Set shouldCancelAtEnd=" + shouldCancelAtEnd
- + " for token " + dttr.token);
+ if (dttr == null) {
+ dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token,
+ getConfig(), now, shouldCancelAtEnd, evt.getUser());
+ try {
+ renewToken(dttr);
+ } catch (IOException ioe) {
+ throw new IOException("Failed to renew token: " + dttr.token, ioe);
}
- continue;
}
-
- tokenList.add(new DelegationTokenToRenew(applicationId, token,
- getConfig(), now, shouldCancelAtEnd, evt.getUser()));
+ tokenList.add(dttr);
}
}
@@ -437,21 +449,21 @@ public class DelegationTokenRenewer extends AbstractService {
// If user provides incorrect token then it should not be added for
// renewal.
for (DelegationTokenToRenew dtr : tokenList) {
- try {
- renewToken(dtr);
- } catch (IOException ioe) {
- throw new IOException("Failed to renew token: " + dtr.token, ioe);
+ DelegationTokenToRenew currentDtr =
+ allTokens.putIfAbsent(dtr.token, dtr);
+ if (currentDtr != null) {
+ // another job beat us
+ currentDtr.referringAppIds.add(applicationId);
+ appTokens.get(applicationId).add(currentDtr);
+ } else {
+ appTokens.get(applicationId).add(dtr);
+ setTimerForTokenRenewal(dtr);
}
}
- for (DelegationTokenToRenew dtr : tokenList) {
- appTokens.get(applicationId).add(dtr);
- allTokens.put(dtr.token, dtr);
- setTimerForTokenRenewal(dtr);
- }
}
if (!hasHdfsToken) {
- requestNewHdfsDelegationToken(applicationId, evt.getUser(),
+ requestNewHdfsDelegationToken(Arrays.asList(applicationId), evt.getUser(),
shouldCancelAtEnd);
}
}
@@ -479,7 +491,7 @@ public class DelegationTokenRenewer extends AbstractService {
try {
requestNewHdfsDelegationTokenIfNeeded(dttr);
// if the token is not replaced by a new token, renew the token
- if (appTokens.get(dttr.applicationId).contains(dttr)) {
+ if (!dttr.isTimerCancelled()) {
renewToken(dttr);
setTimerForTokenRenewal(dttr);// set the next one
} else {
@@ -509,12 +521,12 @@ public class DelegationTokenRenewer extends AbstractService {
long expiresIn = token.expirationDate - System.currentTimeMillis();
long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
// need to create new task every time
- TimerTask tTask = new RenewalTimerTask(token);
+ RenewalTimerTask tTask = new RenewalTimerTask(token);
token.setTimerTask(tTask); // keep reference to the timer
renewalTimer.schedule(token.timerTask, new Date(renewIn));
LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = "
- + token.applicationId);
+ + token.referringAppIds);
}
// renew a token
@@ -536,7 +548,7 @@ public class DelegationTokenRenewer extends AbstractService {
throw new IOException(e);
}
LOG.info("Renewed delegation-token= [" + dttr + "], for "
- + dttr.applicationId);
+ + dttr.referringAppIds);
}
// Request new hdfs token if the token is about to expire, and remove the old
@@ -549,30 +561,37 @@ public class DelegationTokenRenewer extends AbstractService {
&& dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining
&& dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
+ final Collection<ApplicationId> applicationIds;
+ synchronized (dttr.referringAppIds) {
+ applicationIds = new HashSet<ApplicationId>(dttr.referringAppIds);
+ dttr.referringAppIds.clear();
+ }
// remove all old expiring hdfs tokens for this application.
- Set<DelegationTokenToRenew> tokenSet = appTokens.get(dttr.applicationId);
- if (tokenSet != null && !tokenSet.isEmpty()) {
+ for (ApplicationId appId : applicationIds) {
+ Set<DelegationTokenToRenew> tokenSet = appTokens.get(appId);
+ if (tokenSet == null || tokenSet.isEmpty()) {
+ continue;
+ }
Iterator<DelegationTokenToRenew> iter = tokenSet.iterator();
synchronized (tokenSet) {
while (iter.hasNext()) {
DelegationTokenToRenew t = iter.next();
if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
iter.remove();
- if (t.timerTask != null) {
- t.timerTask.cancel();
- }
+ t.cancelTimer();
LOG.info("Removed expiring token " + t);
}
}
}
}
LOG.info("Token= (" + dttr + ") is expiring, request new token.");
- requestNewHdfsDelegationToken(dttr.applicationId, dttr.user,
- dttr.shouldCancelAtEnd);
+ requestNewHdfsDelegationToken(applicationIds, dttr.user,
+ dttr.shouldCancelAtEnd);
}
}
- private void requestNewHdfsDelegationToken(ApplicationId applicationId,
+ private void requestNewHdfsDelegationToken(
+ Collection<ApplicationId> referringAppIds,
String user, boolean shouldCancelAtEnd) throws IOException,
InterruptedException {
if (!hasProxyUserPrivileges) {
@@ -584,18 +603,20 @@ public class DelegationTokenRenewer extends AbstractService {
Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);
// Add new tokens to the toRenew list.
- LOG.info("Received new tokens for " + applicationId + ". Received "
+ LOG.info("Received new tokens for " + referringAppIds + ". Received "
+ newTokens.length + " tokens.");
if (newTokens.length > 0) {
for (Token<?> token : newTokens) {
if (token.isManaged()) {
DelegationTokenToRenew tokenToRenew =
- new DelegationTokenToRenew(applicationId, token, getConfig(),
+ new DelegationTokenToRenew(referringAppIds, token, getConfig(),
Time.now(), shouldCancelAtEnd, user);
// renew the token to get the next expiration date.
renewToken(tokenToRenew);
setTimerForTokenRenewal(tokenToRenew);
- appTokens.get(applicationId).add(tokenToRenew);
+ for (ApplicationId applicationId : referringAppIds) {
+ appTokens.get(applicationId).add(tokenToRenew);
+ }
LOG.info("Received new token " + token);
}
}
@@ -603,7 +624,9 @@ public class DelegationTokenRenewer extends AbstractService {
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
- rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
+ for (ApplicationId applicationId : referringAppIds) {
+ rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
+ }
}
protected Token<?>[] obtainSystemTokensForUser(String user,
@@ -637,16 +660,18 @@ public class DelegationTokenRenewer extends AbstractService {
* @param applicationId
*/
private void removeFailedDelegationToken(DelegationTokenToRenew t) {
- ApplicationId applicationId = t.applicationId;
- LOG.error("removing failed delegation token for appid=" + applicationId
- + ";t=" + t.token.getService());
- appTokens.get(applicationId).remove(t);
+ Collection<ApplicationId> applicationIds = t.referringAppIds;
+ synchronized (applicationIds) {
+ LOG.error("removing failed delegation token for appid=" + applicationIds
+ + ";t=" + t.token.getService());
+ for (ApplicationId applicationId : applicationIds) {
+ appTokens.get(applicationId).remove(t);
+ }
+ }
allTokens.remove(t.token);
// cancel the timer
- if (t.timerTask != null) {
- t.timerTask.cancel();
- }
+ t.cancelTimer();
}
/**
@@ -699,9 +724,15 @@ public class DelegationTokenRenewer extends AbstractService {
+ "; token=" + dttr.token.getService());
}
+ // continue if the app list isn't empty
+ synchronized(dttr.referringAppIds) {
+ dttr.referringAppIds.remove(applicationId);
+ if (!dttr.referringAppIds.isEmpty()) {
+ continue;
+ }
+ }
// cancel the timer
- if (dttr.timerTask != null)
- dttr.timerTask.cancel();
+ dttr.cancelTimer();
// cancel the token
cancelToken(dttr);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/752e3da7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
index 5d31404..1a2f96c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer.DelegationTokenToRenew;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
@@ -123,7 +124,7 @@ public class TestDelegationTokenRenewer {
counter = 0;
lastRenewed = null;
tokenToRenewIn2Sec = null;
-
+ cancelled = false;
}
@Override
@@ -1022,4 +1023,88 @@ public class TestDelegationTokenRenewer {
// app2 completes, app1 is still running, check the token is not cancelled
Assert.assertFalse(Renewer.cancelled);
}
+
+ // Test submitting an application with the token obtained by a previously
+ // submitted application that is set to be cancelled. Token should be
+ // renewed while all apps are running, and then cancelled when all apps
+ // complete
+ @Test (timeout = 30000)
+ public void testCancelWithMultipleAppSubmissions() throws Exception{
+ MockRM rm = new TestSecurityMockRM(conf, null);
+ rm.start();
+ final MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+ nm1.registerNode();
+
+ //MyFS fs = (MyFS)FileSystem.get(conf);
+ //MyToken token1 = fs.getDelegationToken("user123");
+
+ // create Token1:
+ Text userText1 = new Text("user");
+ DelegationTokenIdentifier dtId1 =
+ new DelegationTokenIdentifier(userText1, new Text("renewer1"),
+ userText1);
+ final Token<DelegationTokenIdentifier> token1 =
+ new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
+ "password1".getBytes(), dtId1.getKind(), new Text("service1"));
+
+ Credentials credentials = new Credentials();
+ credentials.addToken(token1.getService(), token1);
+
+ DelegationTokenRenewer renewer =
+ rm.getRMContext().getDelegationTokenRenewer();
+ Assert.assertTrue(renewer.getAllTokens().isEmpty());
+ Assert.assertFalse(Renewer.cancelled);
+
+ RMApp app1 =
+ rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
+ null, true, false, false, null, 0, null, true);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+ rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+ DelegationTokenToRenew dttr = renewer.getAllTokens().get(token1);
+ Assert.assertNotNull(dttr);
+ Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId()));
+ RMApp app2 =
+ rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
+ null, true, false, false, null, 0, null, true);
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
+ rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING);
+ Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
+ Assert.assertTrue(dttr.referringAppIds.contains(app2.getApplicationId()));
+ Assert.assertTrue(dttr.referringAppIds.contains(app2.getApplicationId()));
+ Assert.assertFalse(Renewer.cancelled);
+
+ MockRM.finishAMAndVerifyAppState(app2, rm, nm1, am2);
+ // app2 completes, app1 is still running, check the token is not cancelled
+ Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
+ Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId()));
+ Assert.assertFalse(dttr.referringAppIds.contains(app2.getApplicationId()));
+ Assert.assertFalse(dttr.isTimerCancelled());
+ Assert.assertFalse(Renewer.cancelled);
+
+ RMApp app3 =
+ rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
+ null, true, false, false, null, 0, null, true);
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm1);
+ rm.waitForState(app3.getApplicationId(), RMAppState.RUNNING);
+ Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
+ Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId()));
+ Assert.assertTrue(dttr.referringAppIds.contains(app3.getApplicationId()));
+ Assert.assertFalse(dttr.isTimerCancelled());
+ Assert.assertFalse(Renewer.cancelled);
+
+ MockRM.finishAMAndVerifyAppState(app1, rm, nm1, am1);
+ Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
+ Assert.assertFalse(dttr.referringAppIds.contains(app1.getApplicationId()));
+ Assert.assertTrue(dttr.referringAppIds.contains(app3.getApplicationId()));
+ Assert.assertFalse(dttr.isTimerCancelled());
+ Assert.assertFalse(Renewer.cancelled);
+
+ MockRM.finishAMAndVerifyAppState(app3, rm, nm1, am3);
+ Assert.assertFalse(renewer.getAllTokens().containsKey(token1));
+ Assert.assertTrue(dttr.referringAppIds.isEmpty());
+ Assert.assertTrue(dttr.isTimerCancelled());
+ Assert.assertTrue(Renewer.cancelled);
+ }
}