You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by at...@apache.org on 2014/11/17 22:07:00 UTC

[2/2] hadoop git commit: HADOOP-11157. ZKDelegationTokenSecretManager never shuts down listenerThreadPool. Contributed by Arun Suresh. (cherry picked from commit 07d489e6230682e0553840bb1a0e446acb9f8d19)

HADOOP-11157. ZKDelegationTokenSecretManager never shuts down listenerThreadPool. Contributed by Arun Suresh.
(cherry picked from commit 07d489e6230682e0553840bb1a0e446acb9f8d19)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d35eba7b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d35eba7b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d35eba7b

Branch: refs/heads/branch-2
Commit: d35eba7b1ff98e2e542a6c6c5b389fcc20d885c7
Parents: 6eb88c2
Author: Aaron T. Myers <at...@apache.org>
Authored: Mon Nov 17 12:57:52 2014 -0800
Committer: Aaron T. Myers <at...@apache.org>
Committed: Mon Nov 17 13:02:59 2014 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../ZKDelegationTokenSecretManager.java         | 126 +++++++--
 .../TestZKDelegationTokenSecretManager.java     | 275 +++++++++++++++----
 3 files changed, 331 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35eba7b/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index ec26ac0..1942b9f 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -85,6 +85,9 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11294. Nfs3FileAttributes should not change the values of rdev,
     nlink and size in the constructor. (Brandon Li via wheat9)
 
+    HADOOP-11157. ZKDelegationTokenSecretManager never shuts down
+    listenerThreadPool. (Arun Suresh via atm)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35eba7b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
index ebc45a5..d6bc995 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import javax.security.auth.login.AppConfigurationEntry;
 
@@ -38,6 +39,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@@ -48,6 +50,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -80,6 +83,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
       + "zkSessionTimeout";
   public static final String ZK_DTSM_ZK_CONNECTION_TIMEOUT = ZK_CONF_PREFIX
       + "zkConnectionTimeout";
+  public static final String ZK_DTSM_ZK_SHUTDOWN_TIMEOUT = ZK_CONF_PREFIX
+      + "zkShutdownTimeout";
   public static final String ZK_DTSM_ZNODE_WORKING_PATH = ZK_CONF_PREFIX
       + "znodeWorkingPath";
   public static final String ZK_DTSM_ZK_AUTH_TYPE = ZK_CONF_PREFIX
@@ -94,6 +99,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
   public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
   public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000;
+  public static final int ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT = 10000;
   public static final String ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT = "zkdtsm";
 
   private static Logger LOG = LoggerFactory
@@ -125,6 +131,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   private PathChildrenCache keyCache;
   private PathChildrenCache tokenCache;
   private ExecutorService listenerThreadPool;
+  private final long shutdownTimeout;
 
   public ZKDelegationTokenSecretManager(Configuration conf) {
     super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
@@ -135,6 +142,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
             DelegationTokenManager.RENEW_INTERVAL_DEFAULT * 1000),
         conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL,
             DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
+    shutdownTimeout = conf.getLong(ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
+        ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
     if (CURATOR_TL.get() != null) {
       zkClient =
           CURATOR_TL.get().usingNamespace(
@@ -199,7 +208,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
           .build();
       isExternalClient = false;
     }
-    listenerThreadPool = Executors.newFixedThreadPool(2);
   }
 
   private String setJaasConfiguration(Configuration config) throws Exception {
@@ -290,6 +298,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
         throw new IOException("Could not start Curator Framework", e);
       }
     }
+    listenerThreadPool = Executors.newSingleThreadExecutor();
     try {
       delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
       if (delTokSeqCounter != null) {
@@ -315,7 +324,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     try {
       keyCache = new PathChildrenCache(zkClient, ZK_DTSM_MASTER_KEY_ROOT, true);
       if (keyCache != null) {
-        keyCache.start(StartMode.POST_INITIALIZED_EVENT);
+        keyCache.start(StartMode.BUILD_INITIAL_CACHE);
         keyCache.getListenable().addListener(new PathChildrenCacheListener() {
           @Override
           public void childEvent(CuratorFramework client,
@@ -343,7 +352,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     try {
       tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
       if (tokenCache != null) {
-        tokenCache.start(StartMode.POST_INITIALIZED_EVENT);
+        tokenCache.start(StartMode.BUILD_INITIAL_CACHE);
         tokenCache.getListenable().addListener(new PathChildrenCacheListener() {
 
           @Override
@@ -351,13 +360,13 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
               PathChildrenCacheEvent event) throws Exception {
             switch (event.getType()) {
             case CHILD_ADDED:
-              processTokenAddOrUpdate(event.getData().getData());
+              processTokenAddOrUpdate(event.getData());
               break;
             case CHILD_UPDATED:
-              processTokenAddOrUpdate(event.getData().getData());
+              processTokenAddOrUpdate(event.getData());
               break;
             case CHILD_REMOVED:
-              processTokenRemoved(event.getData().getData());
+              processTokenRemoved(event.getData());
               break;
             default:
               break;
@@ -376,7 +385,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     DataInputStream din = new DataInputStream(bin);
     DelegationKey key = new DelegationKey();
     key.readFields(din);
-    allKeys.put(key.getKeyId(), key);
+    synchronized (this) {
+      allKeys.put(key.getKeyId(), key);
+    }
   }
 
   private void processKeyRemoved(String path) {
@@ -386,13 +397,15 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
       int j = tokSeg.indexOf('_');
       if (j > 0) {
         int keyId = Integer.parseInt(tokSeg.substring(j + 1));
-        allKeys.remove(keyId);
+        synchronized (this) {
+          allKeys.remove(keyId);
+        }
       }
     }
   }
 
-  private void processTokenAddOrUpdate(byte[] data) throws IOException {
-    ByteArrayInputStream bin = new ByteArrayInputStream(data);
+  private void processTokenAddOrUpdate(ChildData data) throws IOException {
+    ByteArrayInputStream bin = new ByteArrayInputStream(data.getData());
     DataInputStream din = new DataInputStream(bin);
     TokenIdent ident = createIdentifier();
     ident.readFields(din);
@@ -403,41 +416,78 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     if (numRead > -1) {
       DelegationTokenInformation tokenInfo =
           new DelegationTokenInformation(renewDate, password);
-      currentTokens.put(ident, tokenInfo);
+      synchronized (this) {
+        currentTokens.put(ident, tokenInfo);
+        // The cancel task might be waiting
+        notifyAll();
+      }
     }
   }
 
-  private void processTokenRemoved(byte[] data) throws IOException {
-    ByteArrayInputStream bin = new ByteArrayInputStream(data);
+  private void processTokenRemoved(ChildData data) throws IOException {
+    ByteArrayInputStream bin = new ByteArrayInputStream(data.getData());
     DataInputStream din = new DataInputStream(bin);
     TokenIdent ident = createIdentifier();
     ident.readFields(din);
-    currentTokens.remove(ident);
+    synchronized (this) {
+      currentTokens.remove(ident);
+      // The cancel task might be waiting
+      notifyAll();
+    }
   }
 
   @Override
   public void stopThreads() {
+    super.stopThreads();
     try {
-      if (!isExternalClient && (zkClient != null)) {
-        zkClient.close();
+      if (tokenCache != null) {
+        tokenCache.close();
       }
+    } catch (Exception e) {
+      LOG.error("Could not stop Delegation Token Cache", e);
+    }
+    try {
       if (delTokSeqCounter != null) {
         delTokSeqCounter.close();
       }
+    } catch (Exception e) {
+      LOG.error("Could not stop Delegation Token Counter", e);
+    }
+    try {
       if (keyIdSeqCounter != null) {
         keyIdSeqCounter.close();
       }
+    } catch (Exception e) {
+      LOG.error("Could not stop Key Id Counter", e);
+    }
+    try {
       if (keyCache != null) {
         keyCache.close();
       }
-      if (tokenCache != null) {
-        tokenCache.close();
+    } catch (Exception e) {
+      LOG.error("Could not stop KeyCache", e);
+    }
+    try {
+      if (!isExternalClient && (zkClient != null)) {
+        zkClient.close();
       }
     } catch (Exception e) {
       LOG.error("Could not stop Curator Framework", e);
-      // Ignore
     }
-    super.stopThreads();
+    if (listenerThreadPool != null) {
+      listenerThreadPool.shutdown();
+      try {
+        // wait for existing tasks to terminate
+        if (!listenerThreadPool.awaitTermination(shutdownTimeout,
+            TimeUnit.MILLISECONDS)) {
+          LOG.error("Forcing Listener threadPool to shutdown !!");
+          listenerThreadPool.shutdownNow();
+        }
+      } catch (InterruptedException ie) {
+        listenerThreadPool.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+    }
   }
 
   private void createPersistentNode(String nodePath) throws Exception {
@@ -460,6 +510,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     try {
       while (!delTokSeqCounter.trySetCount(delTokSeqCounter.getCount() + 1)) {
       }
+    } catch (InterruptedException e) {
+      // The ExpirationThread is just finishing.. so dont do anything..
+      LOG.debug("Thread interrupted while performing token counter increment", e);
+      Thread.currentThread().interrupt();
     } catch (Exception e) {
       throw new RuntimeException("Could not increment shared counter !!", e);
     }
@@ -485,6 +539,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     try {
       while (!keyIdSeqCounter.trySetCount(keyIdSeqCounter.getCount() + 1)) {
       }
+    } catch (InterruptedException e) {
+      // The ExpirationThread is just finishing.. so dont do anything..
+      LOG.debug("Thread interrupted while performing keyId increment", e);
+      Thread.currentThread().interrupt();
     } catch (Exception e) {
       throw new RuntimeException("Could not increment shared keyId counter !!", e);
     }
@@ -588,13 +646,11 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
 
   @Override
   protected void storeDelegationKey(DelegationKey key) throws IOException {
-    allKeys.put(key.getKeyId(), key);
     addOrUpdateDelegationKey(key, false);
   }
 
   @Override
   protected void updateDelegationKey(DelegationKey key) throws IOException {
-    allKeys.put(key.getKeyId(), key);
     addOrUpdateDelegationKey(key, true);
   }
 
@@ -658,7 +714,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   @Override
   protected void storeToken(TokenIdent ident,
       DelegationTokenInformation tokenInfo) throws IOException {
-    currentTokens.put(ident, tokenInfo);
     try {
       addOrUpdateToken(ident, tokenInfo, false);
     } catch (Exception e) {
@@ -669,7 +724,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   @Override
   protected void updateToken(TokenIdent ident,
       DelegationTokenInformation tokenInfo) throws IOException {
-    currentTokens.put(ident, tokenInfo);
     String nodeRemovePath =
         getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
             + ident.getSequenceNumber());
@@ -711,6 +765,25 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     }
   }
 
+  @Override
+  public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
+      String canceller) throws IOException {
+    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    TokenIdent id = createIdentifier();
+    id.readFields(in);
+    try {
+      if (!currentTokens.containsKey(id)) {
+        // See if token can be retrieved and placed in currentTokens
+        getTokenInfo(id);
+      }
+      return super.cancelToken(token, canceller);
+    } catch (Exception e) {
+      LOG.error("Exception while checking if token exist !!", e);
+      return id;
+    }
+  }
+
   private void addOrUpdateToken(TokenIdent ident,
       DelegationTokenInformation info, boolean isUpdate) throws Exception {
     String nodeCreatePath =
@@ -772,4 +845,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   static String getNodePath(String root, String nodeName) {
     return (root + "/" + nodeName);
   }
+
+  @VisibleForTesting
+  public ExecutorService getListenerThreadPool() {
+    return listenerThreadPool;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35eba7b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
index b3049c4..6435c0b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.security.token.delegation;
 
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -36,6 +40,12 @@ import org.junit.Test;
 
 public class TestZKDelegationTokenSecretManager {
 
+  private static final int TEST_RETRIES = 2;
+
+  private static final int RETRY_COUNT = 5;
+
+  private static final int RETRY_WAIT = 1000;
+
   private static final long DAY_IN_SECS = 86400;
 
   private TestingServer zkServer;
@@ -59,6 +69,7 @@ public class TestZKDelegationTokenSecretManager {
    conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString);
    conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath");
    conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none");
+   conf.setLong(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, 100);
    conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
    conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
    conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
@@ -69,80 +80,246 @@ public class TestZKDelegationTokenSecretManager {
   @SuppressWarnings("unchecked")
   @Test
   public void testMultiNodeOperations() throws Exception {
-    DelegationTokenManager tm1, tm2 = null;
-    String connectString = zkServer.getConnectString();
-    Configuration conf = getSecretConf(connectString);
-    tm1 = new DelegationTokenManager(conf, new Text("bla"));
-    tm1.init();
-    tm2 = new DelegationTokenManager(conf, new Text("bla"));
-    tm2.init();
+    for (int i = 0; i < TEST_RETRIES; i++) {
+      DelegationTokenManager tm1, tm2 = null;
+      String connectString = zkServer.getConnectString();
+      Configuration conf = getSecretConf(connectString);
+      tm1 = new DelegationTokenManager(conf, new Text("bla"));
+      tm1.init();
+      tm2 = new DelegationTokenManager(conf, new Text("bla"));
+      tm2.init();
 
-    Token<DelegationTokenIdentifier> token =
-        (Token<DelegationTokenIdentifier>) tm1.createToken(
-            UserGroupInformation.getCurrentUser(), "foo");
-    Assert.assertNotNull(token);
-    tm2.verifyToken(token);
-    tm2.renewToken(token, "foo");
-    tm1.verifyToken(token);
-    tm1.cancelToken(token, "foo");
-    try {
+      Token<DelegationTokenIdentifier> token =
+          (Token<DelegationTokenIdentifier>) tm1.createToken(
+              UserGroupInformation.getCurrentUser(), "foo");
+      Assert.assertNotNull(token);
       tm2.verifyToken(token);
-      fail("Expected InvalidToken");
-    } catch (SecretManager.InvalidToken it) {
-      // Ignore
-    }
+      tm2.renewToken(token, "foo");
+      tm1.verifyToken(token);
+      tm1.cancelToken(token, "foo");
+      try {
+        verifyTokenFail(tm2, token);
+        fail("Expected InvalidToken");
+      } catch (SecretManager.InvalidToken it) {
+        // Ignore
+      }
 
-    token = (Token<DelegationTokenIdentifier>) tm2.createToken(
-        UserGroupInformation.getCurrentUser(), "bar");
-    Assert.assertNotNull(token);
-    tm1.verifyToken(token);
-    tm1.renewToken(token, "bar");
-    tm2.verifyToken(token);
-    tm2.cancelToken(token, "bar");
-    try {
+      token = (Token<DelegationTokenIdentifier>) tm2.createToken(
+          UserGroupInformation.getCurrentUser(), "bar");
+      Assert.assertNotNull(token);
       tm1.verifyToken(token);
-      fail("Expected InvalidToken");
-    } catch (SecretManager.InvalidToken it) {
-      // Ignore
+      tm1.renewToken(token, "bar");
+      tm2.verifyToken(token);
+      tm2.cancelToken(token, "bar");
+      try {
+        verifyTokenFail(tm1, token);
+        fail("Expected InvalidToken");
+      } catch (SecretManager.InvalidToken it) {
+        // Ignore
+      }
+      verifyDestroy(tm1, conf);
+      verifyDestroy(tm2, conf);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testNodeUpAferAWhile() throws Exception {
+    for (int i = 0; i < TEST_RETRIES; i++) {
+      String connectString = zkServer.getConnectString();
+      Configuration conf = getSecretConf(connectString);
+      DelegationTokenManager tm1 = new DelegationTokenManager(conf, new Text("bla"));
+      tm1.init();
+      Token<DelegationTokenIdentifier> token1 =
+          (Token<DelegationTokenIdentifier>) tm1.createToken(
+              UserGroupInformation.getCurrentUser(), "foo");
+      Assert.assertNotNull(token1);
+      Token<DelegationTokenIdentifier> token2 =
+          (Token<DelegationTokenIdentifier>) tm1.createToken(
+              UserGroupInformation.getCurrentUser(), "bar");
+      Assert.assertNotNull(token2);
+      Token<DelegationTokenIdentifier> token3 =
+          (Token<DelegationTokenIdentifier>) tm1.createToken(
+              UserGroupInformation.getCurrentUser(), "boo");
+      Assert.assertNotNull(token3);
+
+      tm1.verifyToken(token1);
+      tm1.verifyToken(token2);
+      tm1.verifyToken(token3);
+
+      // Cancel one token
+      tm1.cancelToken(token1, "foo");
+
+      // Start second node after some time..
+      Thread.sleep(1000);
+      DelegationTokenManager tm2 = new DelegationTokenManager(conf, new Text("bla"));
+      tm2.init();
+
+      tm2.verifyToken(token2);
+      tm2.verifyToken(token3);
+      try {
+        verifyTokenFail(tm2, token1);
+        fail("Expected InvalidToken");
+      } catch (SecretManager.InvalidToken it) {
+        // Ignore
+      }
+
+      // Create a new token thru the new ZKDTSM
+      Token<DelegationTokenIdentifier> token4 =
+          (Token<DelegationTokenIdentifier>) tm2.createToken(
+              UserGroupInformation.getCurrentUser(), "xyz");
+      Assert.assertNotNull(token4);
+      tm2.verifyToken(token4);
+      tm1.verifyToken(token4);
+
+      // Bring down tm2
+      verifyDestroy(tm2, conf);
+
+      // Start third node after some time..
+      Thread.sleep(1000);
+      DelegationTokenManager tm3 = new DelegationTokenManager(conf, new Text("bla"));
+      tm3.init();
+
+      tm3.verifyToken(token2);
+      tm3.verifyToken(token3);
+      tm3.verifyToken(token4);
+      try {
+        verifyTokenFail(tm3, token1);
+        fail("Expected InvalidToken");
+      } catch (SecretManager.InvalidToken it) {
+        // Ignore
+      }
+
+      verifyDestroy(tm3, conf);
+      verifyDestroy(tm1, conf);
     }
   }
 
   @SuppressWarnings("unchecked")
   @Test
   public void testRenewTokenSingleManager() throws Exception {
-    DelegationTokenManager tm1 = null;
-    String connectString = zkServer.getConnectString();
-    Configuration conf = getSecretConf(connectString);
-    tm1 = new DelegationTokenManager(conf, new Text("foo"));
-    tm1.init();
+    for (int i = 0; i < TEST_RETRIES; i++) {
+      DelegationTokenManager tm1 = null;
+      String connectString = zkServer.getConnectString();
+      Configuration conf = getSecretConf(connectString);
+      tm1 = new DelegationTokenManager(conf, new Text("foo"));
+      tm1.init();
 
-    Token<DelegationTokenIdentifier> token =
-        (Token<DelegationTokenIdentifier>)
-        tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
-    Assert.assertNotNull(token);
-    tm1.renewToken(token, "foo");
-    tm1.verifyToken(token);
+      Token<DelegationTokenIdentifier> token =
+          (Token<DelegationTokenIdentifier>)
+          tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+      Assert.assertNotNull(token);
+      tm1.renewToken(token, "foo");
+      tm1.verifyToken(token);
+      verifyDestroy(tm1, conf);
+    }
   }
 
   @SuppressWarnings("unchecked")
   @Test
   public void testCancelTokenSingleManager() throws Exception {
+    for (int i = 0; i < TEST_RETRIES; i++) {
+      DelegationTokenManager tm1 = null;
+      String connectString = zkServer.getConnectString();
+      Configuration conf = getSecretConf(connectString);
+      tm1 = new DelegationTokenManager(conf, new Text("foo"));
+      tm1.init();
+
+      Token<DelegationTokenIdentifier> token =
+          (Token<DelegationTokenIdentifier>)
+          tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+      Assert.assertNotNull(token);
+      tm1.cancelToken(token, "foo");
+      try {
+        verifyTokenFail(tm1, token);
+        fail("Expected InvalidToken");
+      } catch (SecretManager.InvalidToken it) {
+        it.printStackTrace();
+      }
+      verifyDestroy(tm1, conf);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  protected void verifyDestroy(DelegationTokenManager tm, Configuration conf)
+      throws Exception {
+    AbstractDelegationTokenSecretManager sm =
+        tm.getDelegationTokenSecretManager();
+    ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager) sm;
+    ExecutorService es = zksm.getListenerThreadPool();
+    tm.destroy();
+    Assert.assertTrue(es.isShutdown());
+    // wait for the pool to terminate
+    long timeout =
+        conf.getLong(
+            ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
+            ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
+    Thread.sleep(timeout * 3);
+    Assert.assertTrue(es.isTerminated());
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test
+  public void testStopThreads() throws Exception {
     DelegationTokenManager tm1 = null;
     String connectString = zkServer.getConnectString();
+
+    // let's make the update interval short and the shutdown interval
+    // comparatively longer, so if the update thread runs after shutdown,
+    // it will cause an error.
+    final long updateIntervalSeconds = 1;
+    final long shutdownTimeoutMillis = updateIntervalSeconds * 1000 * 5;
     Configuration conf = getSecretConf(connectString);
+    conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, updateIntervalSeconds);
+    conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, updateIntervalSeconds);
+    conf.setLong(DelegationTokenManager.RENEW_INTERVAL, updateIntervalSeconds);
+
+    conf.setLong(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, shutdownTimeoutMillis);
     tm1 = new DelegationTokenManager(conf, new Text("foo"));
     tm1.init();
 
     Token<DelegationTokenIdentifier> token =
-        (Token<DelegationTokenIdentifier>)
-        tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+      (Token<DelegationTokenIdentifier>)
+    tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
     Assert.assertNotNull(token);
-    tm1.cancelToken(token, "foo");
+
+    AbstractDelegationTokenSecretManager sm = tm1.getDelegationTokenSecretManager();
+    ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager)sm;
+    ExecutorService es = zksm.getListenerThreadPool();
+    es.submit(new Callable<Void>() {
+      public Void call() throws Exception {
+        Thread.sleep(shutdownTimeoutMillis * 2); // force this to be shutdownNow
+        return null;
+      }
+    });
+
+    tm1.destroy();
+  }
+
+  // Since it is possible that there can be a delay for the cancel token message
+  // initiated by one node to reach another node.. The second node can ofcourse
+  // verify with ZK directly if the token that needs verification has been
+  // cancelled but.. that would mean having to make an RPC call for every
+  // verification request.
+  // Thus, the eventual consistency tradef-off should be acceptable here...
+  private void verifyTokenFail(DelegationTokenManager tm,
+      Token<DelegationTokenIdentifier> token) throws IOException,
+      InterruptedException {
+    verifyTokenFailWithRetry(tm, token, RETRY_COUNT);
+  }
+
+  private void verifyTokenFailWithRetry(DelegationTokenManager tm,
+      Token<DelegationTokenIdentifier> token, int retryCount)
+      throws IOException, InterruptedException {
     try {
-      tm1.verifyToken(token);
-      fail("Expected InvalidToken");
-    } catch (SecretManager.InvalidToken it) {
-      it.printStackTrace();
+      tm.verifyToken(token);
+    } catch (SecretManager.InvalidToken er) {
+      throw er;
+    }
+    if (retryCount > 0) {
+      Thread.sleep(RETRY_WAIT);
+      verifyTokenFailWithRetry(tm, token, retryCount - 1);
     }
   }
+
 }