You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by at...@apache.org on 2014/11/17 22:07:00 UTC
[2/2] hadoop git commit: HADOOP-11157. ZKDelegationTokenSecretManager
never shuts down listenerThreadPool. Contributed by Arun Suresh. (cherry
picked from commit 07d489e6230682e0553840bb1a0e446acb9f8d19)
HADOOP-11157. ZKDelegationTokenSecretManager never shuts down listenerThreadPool. Contributed by Arun Suresh.
(cherry picked from commit 07d489e6230682e0553840bb1a0e446acb9f8d19)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d35eba7b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d35eba7b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d35eba7b
Branch: refs/heads/branch-2
Commit: d35eba7b1ff98e2e542a6c6c5b389fcc20d885c7
Parents: 6eb88c2
Author: Aaron T. Myers <at...@apache.org>
Authored: Mon Nov 17 12:57:52 2014 -0800
Committer: Aaron T. Myers <at...@apache.org>
Committed: Mon Nov 17 13:02:59 2014 -0800
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../ZKDelegationTokenSecretManager.java | 126 +++++++--
.../TestZKDelegationTokenSecretManager.java | 275 +++++++++++++++----
3 files changed, 331 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35eba7b/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index ec26ac0..1942b9f 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -85,6 +85,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11294. Nfs3FileAttributes should not change the values of rdev,
nlink and size in the constructor. (Brandon Li via wheat9)
+ HADOOP-11157. ZKDelegationTokenSecretManager never shuts down
+ listenerThreadPool. (Arun Suresh via atm)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35eba7b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
index ebc45a5..d6bc995 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import javax.security.auth.login.AppConfigurationEntry;
@@ -38,6 +39,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@@ -48,6 +50,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -80,6 +83,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
+ "zkSessionTimeout";
public static final String ZK_DTSM_ZK_CONNECTION_TIMEOUT = ZK_CONF_PREFIX
+ "zkConnectionTimeout";
+ public static final String ZK_DTSM_ZK_SHUTDOWN_TIMEOUT = ZK_CONF_PREFIX
+ + "zkShutdownTimeout";
public static final String ZK_DTSM_ZNODE_WORKING_PATH = ZK_CONF_PREFIX
+ "znodeWorkingPath";
public static final String ZK_DTSM_ZK_AUTH_TYPE = ZK_CONF_PREFIX
@@ -94,6 +99,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000;
+ public static final int ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT = 10000;
public static final String ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT = "zkdtsm";
private static Logger LOG = LoggerFactory
@@ -125,6 +131,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
private PathChildrenCache keyCache;
private PathChildrenCache tokenCache;
private ExecutorService listenerThreadPool;
+ private final long shutdownTimeout;
public ZKDelegationTokenSecretManager(Configuration conf) {
super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
@@ -135,6 +142,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
DelegationTokenManager.RENEW_INTERVAL_DEFAULT * 1000),
conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL,
DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
+ shutdownTimeout = conf.getLong(ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
+ ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
if (CURATOR_TL.get() != null) {
zkClient =
CURATOR_TL.get().usingNamespace(
@@ -199,7 +208,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
.build();
isExternalClient = false;
}
- listenerThreadPool = Executors.newFixedThreadPool(2);
}
private String setJaasConfiguration(Configuration config) throws Exception {
@@ -290,6 +298,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
throw new IOException("Could not start Curator Framework", e);
}
}
+ listenerThreadPool = Executors.newSingleThreadExecutor();
try {
delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
if (delTokSeqCounter != null) {
@@ -315,7 +324,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
try {
keyCache = new PathChildrenCache(zkClient, ZK_DTSM_MASTER_KEY_ROOT, true);
if (keyCache != null) {
- keyCache.start(StartMode.POST_INITIALIZED_EVENT);
+ keyCache.start(StartMode.BUILD_INITIAL_CACHE);
keyCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client,
@@ -343,7 +352,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
try {
tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
if (tokenCache != null) {
- tokenCache.start(StartMode.POST_INITIALIZED_EVENT);
+ tokenCache.start(StartMode.BUILD_INITIAL_CACHE);
tokenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
@@ -351,13 +360,13 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
- processTokenAddOrUpdate(event.getData().getData());
+ processTokenAddOrUpdate(event.getData());
break;
case CHILD_UPDATED:
- processTokenAddOrUpdate(event.getData().getData());
+ processTokenAddOrUpdate(event.getData());
break;
case CHILD_REMOVED:
- processTokenRemoved(event.getData().getData());
+ processTokenRemoved(event.getData());
break;
default:
break;
@@ -376,7 +385,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
DataInputStream din = new DataInputStream(bin);
DelegationKey key = new DelegationKey();
key.readFields(din);
- allKeys.put(key.getKeyId(), key);
+ synchronized (this) {
+ allKeys.put(key.getKeyId(), key);
+ }
}
private void processKeyRemoved(String path) {
@@ -386,13 +397,15 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
int j = tokSeg.indexOf('_');
if (j > 0) {
int keyId = Integer.parseInt(tokSeg.substring(j + 1));
- allKeys.remove(keyId);
+ synchronized (this) {
+ allKeys.remove(keyId);
+ }
}
}
}
- private void processTokenAddOrUpdate(byte[] data) throws IOException {
- ByteArrayInputStream bin = new ByteArrayInputStream(data);
+ private void processTokenAddOrUpdate(ChildData data) throws IOException {
+ ByteArrayInputStream bin = new ByteArrayInputStream(data.getData());
DataInputStream din = new DataInputStream(bin);
TokenIdent ident = createIdentifier();
ident.readFields(din);
@@ -403,41 +416,78 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
if (numRead > -1) {
DelegationTokenInformation tokenInfo =
new DelegationTokenInformation(renewDate, password);
- currentTokens.put(ident, tokenInfo);
+ synchronized (this) {
+ currentTokens.put(ident, tokenInfo);
+ // The cancel task might be waiting
+ notifyAll();
+ }
}
}
- private void processTokenRemoved(byte[] data) throws IOException {
- ByteArrayInputStream bin = new ByteArrayInputStream(data);
+ private void processTokenRemoved(ChildData data) throws IOException {
+ ByteArrayInputStream bin = new ByteArrayInputStream(data.getData());
DataInputStream din = new DataInputStream(bin);
TokenIdent ident = createIdentifier();
ident.readFields(din);
- currentTokens.remove(ident);
+ synchronized (this) {
+ currentTokens.remove(ident);
+ // The cancel task might be waiting
+ notifyAll();
+ }
}
@Override
public void stopThreads() {
+ super.stopThreads();
try {
- if (!isExternalClient && (zkClient != null)) {
- zkClient.close();
+ if (tokenCache != null) {
+ tokenCache.close();
}
+ } catch (Exception e) {
+ LOG.error("Could not stop Delegation Token Cache", e);
+ }
+ try {
if (delTokSeqCounter != null) {
delTokSeqCounter.close();
}
+ } catch (Exception e) {
+ LOG.error("Could not stop Delegation Token Counter", e);
+ }
+ try {
if (keyIdSeqCounter != null) {
keyIdSeqCounter.close();
}
+ } catch (Exception e) {
+ LOG.error("Could not stop Key Id Counter", e);
+ }
+ try {
if (keyCache != null) {
keyCache.close();
}
- if (tokenCache != null) {
- tokenCache.close();
+ } catch (Exception e) {
+ LOG.error("Could not stop KeyCache", e);
+ }
+ try {
+ if (!isExternalClient && (zkClient != null)) {
+ zkClient.close();
}
} catch (Exception e) {
LOG.error("Could not stop Curator Framework", e);
- // Ignore
}
- super.stopThreads();
+ if (listenerThreadPool != null) {
+ listenerThreadPool.shutdown();
+ try {
+ // wait for existing tasks to terminate
+ if (!listenerThreadPool.awaitTermination(shutdownTimeout,
+ TimeUnit.MILLISECONDS)) {
+ LOG.error("Forcing Listener threadPool to shutdown !!");
+ listenerThreadPool.shutdownNow();
+ }
+ } catch (InterruptedException ie) {
+ listenerThreadPool.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
}
private void createPersistentNode(String nodePath) throws Exception {
@@ -460,6 +510,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
try {
while (!delTokSeqCounter.trySetCount(delTokSeqCounter.getCount() + 1)) {
}
+ } catch (InterruptedException e) {
+ // The ExpirationThread is just finishing.. so dont do anything..
+ LOG.debug("Thread interrupted while performing token counter increment", e);
+ Thread.currentThread().interrupt();
} catch (Exception e) {
throw new RuntimeException("Could not increment shared counter !!", e);
}
@@ -485,6 +539,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
try {
while (!keyIdSeqCounter.trySetCount(keyIdSeqCounter.getCount() + 1)) {
}
+ } catch (InterruptedException e) {
+ // The ExpirationThread is just finishing.. so dont do anything..
+ LOG.debug("Thread interrupted while performing keyId increment", e);
+ Thread.currentThread().interrupt();
} catch (Exception e) {
throw new RuntimeException("Could not increment shared keyId counter !!", e);
}
@@ -588,13 +646,11 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
@Override
protected void storeDelegationKey(DelegationKey key) throws IOException {
- allKeys.put(key.getKeyId(), key);
addOrUpdateDelegationKey(key, false);
}
@Override
protected void updateDelegationKey(DelegationKey key) throws IOException {
- allKeys.put(key.getKeyId(), key);
addOrUpdateDelegationKey(key, true);
}
@@ -658,7 +714,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
@Override
protected void storeToken(TokenIdent ident,
DelegationTokenInformation tokenInfo) throws IOException {
- currentTokens.put(ident, tokenInfo);
try {
addOrUpdateToken(ident, tokenInfo, false);
} catch (Exception e) {
@@ -669,7 +724,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
@Override
protected void updateToken(TokenIdent ident,
DelegationTokenInformation tokenInfo) throws IOException {
- currentTokens.put(ident, tokenInfo);
String nodeRemovePath =
getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+ ident.getSequenceNumber());
@@ -711,6 +765,25 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
}
}
+ @Override
+ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
+ String canceller) throws IOException {
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ TokenIdent id = createIdentifier();
+ id.readFields(in);
+ try {
+ if (!currentTokens.containsKey(id)) {
+ // See if token can be retrieved and placed in currentTokens
+ getTokenInfo(id);
+ }
+ return super.cancelToken(token, canceller);
+ } catch (Exception e) {
+ LOG.error("Exception while checking if token exist !!", e);
+ return id;
+ }
+ }
+
private void addOrUpdateToken(TokenIdent ident,
DelegationTokenInformation info, boolean isUpdate) throws Exception {
String nodeCreatePath =
@@ -772,4 +845,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
static String getNodePath(String root, String nodeName) {
return (root + "/" + nodeName);
}
+
+ @VisibleForTesting
+ public ExecutorService getListenerThreadPool() {
+ return listenerThreadPool;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35eba7b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
index b3049c4..6435c0b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
@@ -18,6 +18,10 @@
package org.apache.hadoop.security.token.delegation;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -36,6 +40,12 @@ import org.junit.Test;
public class TestZKDelegationTokenSecretManager {
+ private static final int TEST_RETRIES = 2;
+
+ private static final int RETRY_COUNT = 5;
+
+ private static final int RETRY_WAIT = 1000;
+
private static final long DAY_IN_SECS = 86400;
private TestingServer zkServer;
@@ -59,6 +69,7 @@ public class TestZKDelegationTokenSecretManager {
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString);
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath");
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none");
+ conf.setLong(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, 100);
conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
@@ -69,80 +80,246 @@ public class TestZKDelegationTokenSecretManager {
@SuppressWarnings("unchecked")
@Test
public void testMultiNodeOperations() throws Exception {
- DelegationTokenManager tm1, tm2 = null;
- String connectString = zkServer.getConnectString();
- Configuration conf = getSecretConf(connectString);
- tm1 = new DelegationTokenManager(conf, new Text("bla"));
- tm1.init();
- tm2 = new DelegationTokenManager(conf, new Text("bla"));
- tm2.init();
+ for (int i = 0; i < TEST_RETRIES; i++) {
+ DelegationTokenManager tm1, tm2 = null;
+ String connectString = zkServer.getConnectString();
+ Configuration conf = getSecretConf(connectString);
+ tm1 = new DelegationTokenManager(conf, new Text("bla"));
+ tm1.init();
+ tm2 = new DelegationTokenManager(conf, new Text("bla"));
+ tm2.init();
- Token<DelegationTokenIdentifier> token =
- (Token<DelegationTokenIdentifier>) tm1.createToken(
- UserGroupInformation.getCurrentUser(), "foo");
- Assert.assertNotNull(token);
- tm2.verifyToken(token);
- tm2.renewToken(token, "foo");
- tm1.verifyToken(token);
- tm1.cancelToken(token, "foo");
- try {
+ Token<DelegationTokenIdentifier> token =
+ (Token<DelegationTokenIdentifier>) tm1.createToken(
+ UserGroupInformation.getCurrentUser(), "foo");
+ Assert.assertNotNull(token);
tm2.verifyToken(token);
- fail("Expected InvalidToken");
- } catch (SecretManager.InvalidToken it) {
- // Ignore
- }
+ tm2.renewToken(token, "foo");
+ tm1.verifyToken(token);
+ tm1.cancelToken(token, "foo");
+ try {
+ verifyTokenFail(tm2, token);
+ fail("Expected InvalidToken");
+ } catch (SecretManager.InvalidToken it) {
+ // Ignore
+ }
- token = (Token<DelegationTokenIdentifier>) tm2.createToken(
- UserGroupInformation.getCurrentUser(), "bar");
- Assert.assertNotNull(token);
- tm1.verifyToken(token);
- tm1.renewToken(token, "bar");
- tm2.verifyToken(token);
- tm2.cancelToken(token, "bar");
- try {
+ token = (Token<DelegationTokenIdentifier>) tm2.createToken(
+ UserGroupInformation.getCurrentUser(), "bar");
+ Assert.assertNotNull(token);
tm1.verifyToken(token);
- fail("Expected InvalidToken");
- } catch (SecretManager.InvalidToken it) {
- // Ignore
+ tm1.renewToken(token, "bar");
+ tm2.verifyToken(token);
+ tm2.cancelToken(token, "bar");
+ try {
+ verifyTokenFail(tm1, token);
+ fail("Expected InvalidToken");
+ } catch (SecretManager.InvalidToken it) {
+ // Ignore
+ }
+ verifyDestroy(tm1, conf);
+ verifyDestroy(tm2, conf);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testNodeUpAferAWhile() throws Exception {
+ for (int i = 0; i < TEST_RETRIES; i++) {
+ String connectString = zkServer.getConnectString();
+ Configuration conf = getSecretConf(connectString);
+ DelegationTokenManager tm1 = new DelegationTokenManager(conf, new Text("bla"));
+ tm1.init();
+ Token<DelegationTokenIdentifier> token1 =
+ (Token<DelegationTokenIdentifier>) tm1.createToken(
+ UserGroupInformation.getCurrentUser(), "foo");
+ Assert.assertNotNull(token1);
+ Token<DelegationTokenIdentifier> token2 =
+ (Token<DelegationTokenIdentifier>) tm1.createToken(
+ UserGroupInformation.getCurrentUser(), "bar");
+ Assert.assertNotNull(token2);
+ Token<DelegationTokenIdentifier> token3 =
+ (Token<DelegationTokenIdentifier>) tm1.createToken(
+ UserGroupInformation.getCurrentUser(), "boo");
+ Assert.assertNotNull(token3);
+
+ tm1.verifyToken(token1);
+ tm1.verifyToken(token2);
+ tm1.verifyToken(token3);
+
+ // Cancel one token
+ tm1.cancelToken(token1, "foo");
+
+ // Start second node after some time..
+ Thread.sleep(1000);
+ DelegationTokenManager tm2 = new DelegationTokenManager(conf, new Text("bla"));
+ tm2.init();
+
+ tm2.verifyToken(token2);
+ tm2.verifyToken(token3);
+ try {
+ verifyTokenFail(tm2, token1);
+ fail("Expected InvalidToken");
+ } catch (SecretManager.InvalidToken it) {
+ // Ignore
+ }
+
+ // Create a new token thru the new ZKDTSM
+ Token<DelegationTokenIdentifier> token4 =
+ (Token<DelegationTokenIdentifier>) tm2.createToken(
+ UserGroupInformation.getCurrentUser(), "xyz");
+ Assert.assertNotNull(token4);
+ tm2.verifyToken(token4);
+ tm1.verifyToken(token4);
+
+ // Bring down tm2
+ verifyDestroy(tm2, conf);
+
+ // Start third node after some time..
+ Thread.sleep(1000);
+ DelegationTokenManager tm3 = new DelegationTokenManager(conf, new Text("bla"));
+ tm3.init();
+
+ tm3.verifyToken(token2);
+ tm3.verifyToken(token3);
+ tm3.verifyToken(token4);
+ try {
+ verifyTokenFail(tm3, token1);
+ fail("Expected InvalidToken");
+ } catch (SecretManager.InvalidToken it) {
+ // Ignore
+ }
+
+ verifyDestroy(tm3, conf);
+ verifyDestroy(tm1, conf);
}
}
@SuppressWarnings("unchecked")
@Test
public void testRenewTokenSingleManager() throws Exception {
- DelegationTokenManager tm1 = null;
- String connectString = zkServer.getConnectString();
- Configuration conf = getSecretConf(connectString);
- tm1 = new DelegationTokenManager(conf, new Text("foo"));
- tm1.init();
+ for (int i = 0; i < TEST_RETRIES; i++) {
+ DelegationTokenManager tm1 = null;
+ String connectString = zkServer.getConnectString();
+ Configuration conf = getSecretConf(connectString);
+ tm1 = new DelegationTokenManager(conf, new Text("foo"));
+ tm1.init();
- Token<DelegationTokenIdentifier> token =
- (Token<DelegationTokenIdentifier>)
- tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
- Assert.assertNotNull(token);
- tm1.renewToken(token, "foo");
- tm1.verifyToken(token);
+ Token<DelegationTokenIdentifier> token =
+ (Token<DelegationTokenIdentifier>)
+ tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+ Assert.assertNotNull(token);
+ tm1.renewToken(token, "foo");
+ tm1.verifyToken(token);
+ verifyDestroy(tm1, conf);
+ }
}
@SuppressWarnings("unchecked")
@Test
public void testCancelTokenSingleManager() throws Exception {
+ for (int i = 0; i < TEST_RETRIES; i++) {
+ DelegationTokenManager tm1 = null;
+ String connectString = zkServer.getConnectString();
+ Configuration conf = getSecretConf(connectString);
+ tm1 = new DelegationTokenManager(conf, new Text("foo"));
+ tm1.init();
+
+ Token<DelegationTokenIdentifier> token =
+ (Token<DelegationTokenIdentifier>)
+ tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+ Assert.assertNotNull(token);
+ tm1.cancelToken(token, "foo");
+ try {
+ verifyTokenFail(tm1, token);
+ fail("Expected InvalidToken");
+ } catch (SecretManager.InvalidToken it) {
+ it.printStackTrace();
+ }
+ verifyDestroy(tm1, conf);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ protected void verifyDestroy(DelegationTokenManager tm, Configuration conf)
+ throws Exception {
+ AbstractDelegationTokenSecretManager sm =
+ tm.getDelegationTokenSecretManager();
+ ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager) sm;
+ ExecutorService es = zksm.getListenerThreadPool();
+ tm.destroy();
+ Assert.assertTrue(es.isShutdown());
+ // wait for the pool to terminate
+ long timeout =
+ conf.getLong(
+ ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
+ ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
+ Thread.sleep(timeout * 3);
+ Assert.assertTrue(es.isTerminated());
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testStopThreads() throws Exception {
DelegationTokenManager tm1 = null;
String connectString = zkServer.getConnectString();
+
+ // let's make the update interval short and the shutdown interval
+ // comparatively longer, so if the update thread runs after shutdown,
+ // it will cause an error.
+ final long updateIntervalSeconds = 1;
+ final long shutdownTimeoutMillis = updateIntervalSeconds * 1000 * 5;
Configuration conf = getSecretConf(connectString);
+ conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, updateIntervalSeconds);
+ conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, updateIntervalSeconds);
+ conf.setLong(DelegationTokenManager.RENEW_INTERVAL, updateIntervalSeconds);
+
+ conf.setLong(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, shutdownTimeoutMillis);
tm1 = new DelegationTokenManager(conf, new Text("foo"));
tm1.init();
Token<DelegationTokenIdentifier> token =
- (Token<DelegationTokenIdentifier>)
- tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+ (Token<DelegationTokenIdentifier>)
+ tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token);
- tm1.cancelToken(token, "foo");
+
+ AbstractDelegationTokenSecretManager sm = tm1.getDelegationTokenSecretManager();
+ ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager)sm;
+ ExecutorService es = zksm.getListenerThreadPool();
+ es.submit(new Callable<Void>() {
+ public Void call() throws Exception {
+ Thread.sleep(shutdownTimeoutMillis * 2); // force this to be shutdownNow
+ return null;
+ }
+ });
+
+ tm1.destroy();
+ }
+
+ // Since it is possible that there can be a delay for the cancel token message
+ // initiated by one node to reach another node.. The second node can ofcourse
+ // verify with ZK directly if the token that needs verification has been
+ // cancelled but.. that would mean having to make an RPC call for every
+ // verification request.
+ // Thus, the eventual consistency tradef-off should be acceptable here...
+ private void verifyTokenFail(DelegationTokenManager tm,
+ Token<DelegationTokenIdentifier> token) throws IOException,
+ InterruptedException {
+ verifyTokenFailWithRetry(tm, token, RETRY_COUNT);
+ }
+
+ private void verifyTokenFailWithRetry(DelegationTokenManager tm,
+ Token<DelegationTokenIdentifier> token, int retryCount)
+ throws IOException, InterruptedException {
try {
- tm1.verifyToken(token);
- fail("Expected InvalidToken");
- } catch (SecretManager.InvalidToken it) {
- it.printStackTrace();
+ tm.verifyToken(token);
+ } catch (SecretManager.InvalidToken er) {
+ throw er;
+ }
+ if (retryCount > 0) {
+ Thread.sleep(RETRY_WAIT);
+ verifyTokenFailWithRetry(tm, token, retryCount - 1);
}
}
+
}