You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2020/06/23 20:12:41 UTC

[hadoop] branch trunk updated: HDFS-15383. RBF: Add support for router delegation token without watch (#2047)

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 84110d8  HDFS-15383. RBF: Add support for router delegation token without watch (#2047)
84110d8 is described below

commit 84110d850e2bc2a9ff4afcc7508fecd81cb5b7e5
Author: lfengnan <lf...@uber.com>
AuthorDate: Tue Jun 23 13:12:29 2020 -0700

    HDFS-15383. RBF: Add support for router delegation token without watch (#2047)
    
    Improving router's performance for delegation tokens related operations. It achieves the goal by removing watchers from router on tokens since based on our experience. The huge number of watches inside Zookeeper is degrading Zookeeper's performance pretty hard. The current limit is about 1.2-1.5 million.
---
 .../AbstractDelegationTokenSecretManager.java      |   6 +-
 .../delegation/ZKDelegationTokenSecretManager.java | 141 ++++++++-----
 .../TestZKDelegationTokenSecretManager.java        |  12 +-
 .../token/ZKDelegationTokenSecretManagerImpl.java  | 174 ++++++++++++++-
 .../TestZKDelegationTokenSecretManagerImpl.java    | 234 +++++++++++++++++++++
 5 files changed, 498 insertions(+), 69 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
index f329acc..3a22cee 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
@@ -23,11 +23,11 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.security.MessageDigest;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.crypto.SecretKey;
 
@@ -63,7 +63,7 @@ extends AbstractDelegationTokenIdentifier>
    * to DelegationTokenInformation. Protected by this object lock.
    */
   protected final Map<TokenIdent, DelegationTokenInformation> currentTokens 
-      = new HashMap<TokenIdent, DelegationTokenInformation>();
+      = new ConcurrentHashMap<>();
   
   /**
    * Sequence number to create DelegationTokenIdentifier.
@@ -75,7 +75,7 @@ extends AbstractDelegationTokenIdentifier>
    * Access to allKeys is protected by this object lock
    */
   protected final Map<Integer, DelegationKey> allKeys 
-      = new HashMap<Integer, DelegationKey>();
+      = new ConcurrentHashMap<>();
   
   /**
    * Access to currentId is protected by this object lock.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
index cd3b8c0..f50035d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import static org.apache.hadoop.util.Time.now;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -79,7 +80,7 @@ import com.google.common.base.Preconditions;
 public abstract class ZKDelegationTokenSecretManager<TokenIdent extends AbstractDelegationTokenIdentifier>
     extends AbstractDelegationTokenSecretManager<TokenIdent> {
 
-  private static final String ZK_CONF_PREFIX = "zk-dt-secret-manager.";
+  public static final String ZK_CONF_PREFIX = "zk-dt-secret-manager.";
   public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX
       + "zkNumRetries";
   public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX
@@ -100,6 +101,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
       + "kerberos.principal";
   public static final String ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE = ZK_CONF_PREFIX
       + "token.seqnum.batch.size";
+  public static final String ZK_DTSM_TOKEN_WATCHER_ENABLED = ZK_CONF_PREFIX
+      + "token.watcher.enabled";
+  public static final boolean ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT = true;
 
   public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
   public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
@@ -118,7 +122,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot";
   private static final String ZK_DTSM_SEQNUM_ROOT = "/ZKDTSMSeqNumRoot";
   private static final String ZK_DTSM_KEYID_ROOT = "/ZKDTSMKeyIdRoot";
-  private static final String ZK_DTSM_TOKENS_ROOT = "/ZKDTSMTokensRoot";
+  protected static final String ZK_DTSM_TOKENS_ROOT = "/ZKDTSMTokensRoot";
   private static final String ZK_DTSM_MASTER_KEY_ROOT = "/ZKDTSMMasterKeyRoot";
 
   private static final String DELEGATION_KEY_PREFIX = "DK_";
@@ -132,7 +136,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   }
 
   private final boolean isExternalClient;
-  private final CuratorFramework zkClient;
+  protected final CuratorFramework zkClient;
   private SharedCount delTokSeqCounter;
   private SharedCount keyIdSeqCounter;
   private PathChildrenCache keyCache;
@@ -143,6 +147,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   private int currentSeqNum;
   private int currentMaxSeqNum;
 
+  private final boolean isTokenWatcherEnabled;
+
   public ZKDelegationTokenSecretManager(Configuration conf) {
     super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
         DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000,
@@ -156,6 +162,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
         ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
     seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
         ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
+    isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
+        ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT);
     if (CURATOR_TL.get() != null) {
       zkClient =
           CURATOR_TL.get().usingNamespace(
@@ -383,34 +391,37 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     } catch (Exception e) {
       throw new IOException("Could not start PathChildrenCache for keys", e);
     }
-    try {
-      tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
-      if (tokenCache != null) {
-        tokenCache.start(StartMode.BUILD_INITIAL_CACHE);
-        tokenCache.getListenable().addListener(new PathChildrenCacheListener() {
-
-          @Override
-          public void childEvent(CuratorFramework client,
-              PathChildrenCacheEvent event) throws Exception {
-            switch (event.getType()) {
-            case CHILD_ADDED:
-              processTokenAddOrUpdate(event.getData());
-              break;
-            case CHILD_UPDATED:
-              processTokenAddOrUpdate(event.getData());
-              break;
-            case CHILD_REMOVED:
-              processTokenRemoved(event.getData());
-              break;
-            default:
-              break;
+    if (isTokenWatcherEnabled) {
+      LOG.info("TokenCache is enabled");
+      try {
+        tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
+        if (tokenCache != null) {
+          tokenCache.start(StartMode.BUILD_INITIAL_CACHE);
+          tokenCache.getListenable().addListener(new PathChildrenCacheListener() {
+
+            @Override
+            public void childEvent(CuratorFramework client,
+                                   PathChildrenCacheEvent event) throws Exception {
+              switch (event.getType()) {
+                case CHILD_ADDED:
+                  processTokenAddOrUpdate(event.getData().getData());
+                  break;
+                case CHILD_UPDATED:
+                  processTokenAddOrUpdate(event.getData().getData());
+                  break;
+                case CHILD_REMOVED:
+                  processTokenRemoved(event.getData());
+                  break;
+                default:
+                  break;
+              }
             }
-          }
-        }, listenerThreadPool);
-        loadFromZKCache(true);
+          }, listenerThreadPool);
+          loadFromZKCache(true);
+        }
+      } catch (Exception e) {
+        throw new IOException("Could not start PathChildrenCache for tokens", e);
       }
-    } catch (Exception e) {
-      throw new IOException("Could not start PathChildrenCache for tokens", e);
     }
     super.startThreads();
   }
@@ -435,7 +446,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     for (ChildData child : children) {
       try {
         if (isTokenCache) {
-          processTokenAddOrUpdate(child);
+          processTokenAddOrUpdate(child.getData());
         } else {
           processKeyAddOrUpdate(child.getData());
         }
@@ -457,9 +468,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     DataInputStream din = new DataInputStream(bin);
     DelegationKey key = new DelegationKey();
     key.readFields(din);
-    synchronized (this) {
-      allKeys.put(key.getKeyId(), key);
-    }
+    allKeys.put(key.getKeyId(), key);
   }
 
   private void processKeyRemoved(String path) {
@@ -469,15 +478,13 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
       int j = tokSeg.indexOf('_');
       if (j > 0) {
         int keyId = Integer.parseInt(tokSeg.substring(j + 1));
-        synchronized (this) {
-          allKeys.remove(keyId);
-        }
+        allKeys.remove(keyId);
       }
     }
   }
 
-  private void processTokenAddOrUpdate(ChildData data) throws IOException {
-    ByteArrayInputStream bin = new ByteArrayInputStream(data.getData());
+  protected TokenIdent processTokenAddOrUpdate(byte[] data) throws IOException {
+    ByteArrayInputStream bin = new ByteArrayInputStream(data);
     DataInputStream din = new DataInputStream(bin);
     TokenIdent ident = createIdentifier();
     ident.readFields(din);
@@ -488,12 +495,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     if (numRead > -1) {
       DelegationTokenInformation tokenInfo =
           new DelegationTokenInformation(renewDate, password);
-      synchronized (this) {
-        currentTokens.put(ident, tokenInfo);
-        // The cancel task might be waiting
-        notifyAll();
-      }
+      currentTokens.put(ident, tokenInfo);
+      return ident;
     }
+    return null;
   }
 
   private void processTokenRemoved(ChildData data) throws IOException {
@@ -501,11 +506,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     DataInputStream din = new DataInputStream(bin);
     TokenIdent ident = createIdentifier();
     ident.readFields(din);
-    synchronized (this) {
-      currentTokens.remove(ident);
-      // The cancel task might be waiting
-      notifyAll();
-    }
+    currentTokens.remove(ident);
   }
 
   @Override
@@ -706,7 +707,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
    *
    * @param ident Identifier of the token
    */
-  private synchronized void syncLocalCacheWithZk(TokenIdent ident) {
+  protected void syncLocalCacheWithZk(TokenIdent ident) {
     try {
       DelegationTokenInformation tokenInfo = getTokenInfoFromZK(ident);
       if (tokenInfo != null && !currentTokens.containsKey(ident)) {
@@ -720,16 +721,21 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     }
   }
 
-  private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
+  protected DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
       throws IOException {
     return getTokenInfoFromZK(ident, false);
   }
 
-  private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident,
+  protected DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident,
       boolean quiet) throws IOException {
     String nodePath =
         getNodePath(ZK_DTSM_TOKENS_ROOT,
             DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber());
+    return getTokenInfoFromZK(nodePath, quiet);
+  }
+
+  protected DelegationTokenInformation getTokenInfoFromZK(String nodePath,
+      boolean quiet) throws IOException {
     try {
       byte[] data = zkClient.getData().forPath(nodePath);
       if ((data == null) || (data.length == 0)) {
@@ -864,15 +870,30 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   @Override
   protected void removeStoredToken(TokenIdent ident)
       throws IOException {
+    removeStoredToken(ident, false);
+  }
+
+  protected void removeStoredToken(TokenIdent ident,
+      boolean checkAgainstZkBeforeDeletion) throws IOException {
     String nodeRemovePath =
         getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
             + ident.getSequenceNumber());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Removing ZKDTSMDelegationToken_"
-          + ident.getSequenceNumber());
-    }
     try {
-      if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
+      DelegationTokenInformation dtInfo = getTokenInfoFromZK(ident, true);
+      if (dtInfo != null) {
+        // For the case there is no sync or watch miss, it is possible that the
+        // local storage has expired tokens which have been renewed by peer
+        // so double check again to avoid accidental delete
+        if (checkAgainstZkBeforeDeletion
+            && dtInfo.getRenewDate() > now()) {
+          LOG.info("Node already renewed by peer " + nodeRemovePath +
+              " so this token should not be deleted");
+          return;
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Removing ZKDTSMDelegationToken_"
+              + ident.getSequenceNumber());
+        }
         while(zkClient.checkExists().forPath(nodeRemovePath) != null){
           try {
             zkClient.delete().guaranteed().forPath(nodeRemovePath);
@@ -895,7 +916,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   }
 
   @Override
-  public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
+  public TokenIdent cancelToken(Token<TokenIdent> token,
       String canceller) throws IOException {
     ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
     DataInputStream in = new DataInputStream(buf);
@@ -906,7 +927,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     return super.cancelToken(token, canceller);
   }
 
-  private void addOrUpdateToken(TokenIdent ident,
+  protected void addOrUpdateToken(TokenIdent ident,
       DelegationTokenInformation info, boolean isUpdate) throws Exception {
     String nodeCreatePath =
         getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
@@ -933,6 +954,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     }
   }
 
+  public boolean isTokenWatcherEnabled() {
+    return isTokenWatcherEnabled;
+  }
+
   /**
    * Simple implementation of an {@link ACLProvider} that simply returns an ACL
    * that gives all permissions only to a single principal.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
index b2e1779..643da6a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
@@ -59,15 +59,15 @@ public class TestZKDelegationTokenSecretManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestZKDelegationTokenSecretManager.class);
 
-  private static final int TEST_RETRIES = 2;
+  protected static final int TEST_RETRIES = 2;
 
-  private static final int RETRY_COUNT = 5;
+  protected static final int RETRY_COUNT = 5;
 
-  private static final int RETRY_WAIT = 1000;
+  protected static final int RETRY_WAIT = 1000;
 
-  private static final long DAY_IN_SECS = 86400;
+  protected static final long DAY_IN_SECS = 86400;
 
-  private TestingServer zkServer;
+  protected TestingServer zkServer;
 
   @Rule
   public Timeout globalTimeout = new Timeout(300000);
@@ -425,7 +425,7 @@ public class TestZKDelegationTokenSecretManager {
   // cancelled but.. that would mean having to make an RPC call for every
   // verification request.
   // Thus, the eventual consistency tradef-off should be acceptable here...
-  private void verifyTokenFail(DelegationTokenManager tm,
+  protected void verifyTokenFail(DelegationTokenManager tm,
       Token<DelegationTokenIdentifier> token) throws IOException,
       InterruptedException {
     verifyTokenFailWithRetry(tm, token, RETRY_COUNT);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java
index 4a11118..2d55026 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java
@@ -19,13 +19,26 @@
 package org.apache.hadoop.hdfs.server.federation.router.security.token;
 
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
+import org.apache.hadoop.util.Time;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Zookeeper based router delegation token store implementation.
@@ -33,16 +46,44 @@ import java.io.IOException;
 public class ZKDelegationTokenSecretManagerImpl extends
     ZKDelegationTokenSecretManager<AbstractDelegationTokenIdentifier> {
 
+  public static final String ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL =
+      ZK_CONF_PREFIX + "router.token.sync.interval";
+  public static final int ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT = 5;
+
   private static final Logger LOG =
       LoggerFactory.getLogger(ZKDelegationTokenSecretManagerImpl.class);
 
-  private Configuration conf = null;
+  private Configuration conf;
+
+  private final ScheduledExecutorService scheduler =
+      Executors.newSingleThreadScheduledExecutor();
+
+  // Local cache of delegation tokens, used for deprecating tokens from
+  // currentTokenMap
+  private final Set<AbstractDelegationTokenIdentifier> localTokenCache =
+      new HashSet<>();
+  // Native zk client for getting all tokens
+  private ZooKeeper zookeeper;
+  private final String TOKEN_PATH = "/" + zkClient.getNamespace()
+      + ZK_DTSM_TOKENS_ROOT;
+  // The flag used to issue an extra check before deletion
+  // Since cancel token and token remover thread use the same
+  // API here and one router could have a token that is renewed
+  // by another router, thus token remover should always check ZK
+  // to confirm whether it has been renewed or not
+  private ThreadLocal<Boolean> checkAgainstZkBeforeDeletion =
+      new ThreadLocal<Boolean>() {
+        @Override
+        protected Boolean initialValue() {
+          return true;
+        }
+      };
 
   public ZKDelegationTokenSecretManagerImpl(Configuration conf) {
     super(conf);
     this.conf = conf;
     try {
-      super.startThreads();
+      startThreads();
     } catch (IOException e) {
       LOG.error("Error starting threads for zkDelegationTokens", e);
     }
@@ -50,7 +91,136 @@ public class ZKDelegationTokenSecretManagerImpl extends
   }
 
   @Override
+  public void startThreads() throws IOException {
+    super.startThreads();
+    // start token cache related work when watcher is disabled
+    if (!isTokenWatcherEnabled()) {
+      LOG.info("Watcher for tokens is disabled in this secret manager");
+      try {
+        // By default set this variable
+        checkAgainstZkBeforeDeletion.set(true);
+        // Ensure the token root path exists
+        if (zkClient.checkExists().forPath(ZK_DTSM_TOKENS_ROOT) == null) {
+          zkClient.create().creatingParentsIfNeeded()
+              .withMode(CreateMode.PERSISTENT)
+              .forPath(ZK_DTSM_TOKENS_ROOT);
+        }
+        // Set up zookeeper client
+        try {
+          zookeeper = zkClient.getZookeeperClient().getZooKeeper();
+        } catch (Exception e) {
+          LOG.info("Cannot get zookeeper client ", e);
+        } finally {
+          if (zookeeper == null) {
+            throw new IOException("Zookeeper client is null");
+          }
+        }
+
+        LOG.info("Start loading token cache");
+        long start = Time.now();
+        rebuildTokenCache(true);
+        LOG.info("Loaded token cache in {} milliseconds", Time.now() - start);
+
+        int syncInterval = conf.getInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL,
+            ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT);
+        scheduler.scheduleAtFixedRate(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              rebuildTokenCache(false);
+            } catch (Exception e) {
+              // ignore
+            }
+          }
+        }, syncInterval, syncInterval, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        LOG.error("Error rebuilding local cache for zkDelegationTokens ", e);
+      }
+    }
+  }
+
+  @Override
+  public void stopThreads() {
+    super.stopThreads();
+    scheduler.shutdown();
+  }
+
+  @Override
   public DelegationTokenIdentifier createIdentifier() {
     return new DelegationTokenIdentifier();
   }
+
+  /**
+   * This function will rebuild local token cache from zk storage.
+   * It is first called when the secret manager is initialized and
+   * then regularly at a configured interval.
+   *
+   * @param initial whether this is called during initialization
+   * @throws IOException
+   */
+  private void rebuildTokenCache(boolean initial) throws IOException {
+    localTokenCache.clear();
+    // Use bare zookeeper client to get all children since curator will
+    // wrap the same API with a sorting process. This is time consuming given
+    // millions of tokens
+    List<String> zkTokens;
+    try {
+      zkTokens = zookeeper.getChildren(TOKEN_PATH, false);
+    } catch (KeeperException | InterruptedException e) {
+      throw new IOException("Tokens cannot be fetched from path "
+          + TOKEN_PATH, e);
+    }
+    byte[] data;
+    for (String tokenPath : zkTokens) {
+      try {
+        data = zkClient.getData().forPath(
+            ZK_DTSM_TOKENS_ROOT + "/" + tokenPath);
+      } catch (KeeperException.NoNodeException e) {
+        LOG.debug("No node in path [" + tokenPath + "]");
+        continue;
+      } catch (Exception ex) {
+        throw new IOException(ex);
+      }
+      // Store data to currentTokenMap
+      AbstractDelegationTokenIdentifier ident = processTokenAddOrUpdate(data);
+      // Store data to localTokenCache for sync
+      localTokenCache.add(ident);
+    }
+    if (!initial) {
+      // Sync zkTokens with local cache, specifically
+      // 1) add/update tokens to local cache from zk, which is done through
+      //    processTokenAddOrUpdate above
+      // 2) remove tokens in local cache but not in zk anymore
+      for (AbstractDelegationTokenIdentifier ident : currentTokens.keySet()) {
+        if (!localTokenCache.contains(ident)) {
+            currentTokens.remove(ident);
+        }
+      }
+    }
+  }
+
+  @Override
+  public AbstractDelegationTokenIdentifier cancelToken(
+      Token<AbstractDelegationTokenIdentifier> token, String canceller)
+      throws IOException {
+    checkAgainstZkBeforeDeletion.set(false);
+    AbstractDelegationTokenIdentifier ident = super.cancelToken(token,
+        canceller);
+    checkAgainstZkBeforeDeletion.set(true);
+    return ident;
+  }
+
+  @Override
+  protected void removeStoredToken(AbstractDelegationTokenIdentifier ident)
+      throws IOException {
+    super.removeStoredToken(ident, checkAgainstZkBeforeDeletion.get());
+  }
+
+  @Override
+  protected void addOrUpdateToken(AbstractDelegationTokenIdentifier ident,
+      DelegationTokenInformation info, boolean isUpdate) throws Exception {
+    // Store the data in local memory first
+    currentTokens.put(ident, info);
+    super.addOrUpdateToken(ident, info, isUpdate);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java
new file mode 100644
index 0000000..3c7f8e8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.security.token;
+
+import static org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl.ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL;
+import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_WATCHER_ENABLED;
+import static org.apache.hadoop.security.token.delegation.web.DelegationTokenManager.REMOVAL_SCAN_INTERVAL;
+import static org.apache.hadoop.security.token.delegation.web.DelegationTokenManager.RENEW_INTERVAL;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.TestZKDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestZKDelegationTokenSecretManagerImpl
+    extends TestZKDelegationTokenSecretManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestZKDelegationTokenSecretManagerImpl.class);
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testMultiNodeOperationWithoutWatch() throws Exception {
+    String connectString = zkServer.getConnectString();
+    Configuration conf = getSecretConf(connectString);
+    // disable watch
+    conf.setBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, false);
+    conf.setInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, 3);
+
+    for (int i = 0; i < TEST_RETRIES; i++) {
+      ZKDelegationTokenSecretManagerImpl dtsm1 =
+          new ZKDelegationTokenSecretManagerImpl(conf);
+      ZKDelegationTokenSecretManagerImpl dtsm2 =
+          new ZKDelegationTokenSecretManagerImpl(conf);
+      DelegationTokenManager tm1, tm2;
+      tm1 = new DelegationTokenManager(conf, new Text("bla"));
+      tm1.setExternalDelegationTokenSecretManager(dtsm1);
+      tm2 = new DelegationTokenManager(conf, new Text("bla"));
+      tm2.setExternalDelegationTokenSecretManager(dtsm2);
+
+      // common token operation without watchers should still be working
+      Token<DelegationTokenIdentifier> token =
+          (Token<DelegationTokenIdentifier>) tm1.createToken(
+              UserGroupInformation.getCurrentUser(), "foo");
+      Assert.assertNotNull(token);
+      tm2.verifyToken(token);
+      tm2.renewToken(token, "foo");
+      tm1.verifyToken(token);
+      tm1.cancelToken(token, "foo");
+      try {
+        verifyTokenFail(tm2, token);
+        fail("Expected InvalidToken");
+      } catch (SecretManager.InvalidToken it) {
+        // Ignore
+      }
+
+      token = (Token<DelegationTokenIdentifier>) tm2.createToken(
+          UserGroupInformation.getCurrentUser(), "bar");
+      Assert.assertNotNull(token);
+      tm1.verifyToken(token);
+      tm1.renewToken(token, "bar");
+      tm2.verifyToken(token);
+      tm2.cancelToken(token, "bar");
+      try {
+        verifyTokenFail(tm1, token);
+        fail("Expected InvalidToken");
+      } catch (SecretManager.InvalidToken it) {
+        // Ignore
+      }
+
+      dtsm1.stopThreads();
+      dtsm2.stopThreads();
+      verifyDestroy(tm1, conf);
+      verifyDestroy(tm2, conf);
+    }
+  }
+
+  @Test
+  public void testMultiNodeTokenRemovalShortSyncWithoutWatch()
+      throws Exception {
+    String connectString = zkServer.getConnectString();
+    Configuration conf = getSecretConf(connectString);
+    // disable watch
+    conf.setBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, false);
+    // make sync quick
+    conf.setInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, 3);
+    // set the renew window and removal interval to be a
+    // short time to trigger the background cleanup
+    conf.setInt(RENEW_INTERVAL, 10);
+    conf.setInt(REMOVAL_SCAN_INTERVAL, 10);
+
+    for (int i = 0; i < TEST_RETRIES; i++) {
+      ZKDelegationTokenSecretManagerImpl dtsm1 =
+          new ZKDelegationTokenSecretManagerImpl(conf);
+      ZKDelegationTokenSecretManagerImpl dtsm2 =
+          new ZKDelegationTokenSecretManagerImpl(conf);
+      DelegationTokenManager tm1, tm2;
+      tm1 = new DelegationTokenManager(conf, new Text("bla"));
+      tm1.setExternalDelegationTokenSecretManager(dtsm1);
+      tm2 = new DelegationTokenManager(conf, new Text("bla"));
+      tm2.setExternalDelegationTokenSecretManager(dtsm2);
+
+      // time: X
+      // token expiry time:
+      //   tm1: X + 10
+      //   tm2: X + 10
+      Token<DelegationTokenIdentifier> token =
+          (Token<DelegationTokenIdentifier>) tm1.createToken(
+              UserGroupInformation.getCurrentUser(), "foo");
+      Assert.assertNotNull(token);
+      tm2.verifyToken(token);
+
+      // time: X + 9
+      // token expiry time:
+      //   tm1: X + 10
+      //   tm2: X + 19
+      Thread.sleep(9 * 1000);
+      tm2.renewToken(token, "foo");
+      tm1.verifyToken(token);
+
+      // time: X + 13
+      // token expiry time: (sync happened)
+      //   tm1: X + 19
+      //   tm2: X + 19
+      Thread.sleep(4 * 1000);
+      tm1.verifyToken(token);
+      tm2.verifyToken(token);
+
+      dtsm1.stopThreads();
+      dtsm2.stopThreads();
+      verifyDestroy(tm1, conf);
+      verifyDestroy(tm2, conf);
+    }
+  }
+
+  // This is very unlikely to happen in real case, but worth putting
+  // the case out
+  @Test
+  public void testMultiNodeTokenRemovalLongSyncWithoutWatch()
+      throws Exception {
+    String connectString = zkServer.getConnectString();
+    Configuration conf = getSecretConf(connectString);
+    // disable watch
+    conf.setBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, false);
+    // make sync quick
+    conf.setInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, 20);
+    // set the renew window and removal interval to be a
+    // short time to trigger the background cleanup
+    conf.setInt(RENEW_INTERVAL, 10);
+    conf.setInt(REMOVAL_SCAN_INTERVAL, 10);
+
+    for (int i = 0; i < TEST_RETRIES; i++) {
+      ZKDelegationTokenSecretManagerImpl dtsm1 =
+          new ZKDelegationTokenSecretManagerImpl(conf);
+      ZKDelegationTokenSecretManagerImpl dtsm2 =
+          new ZKDelegationTokenSecretManagerImpl(conf);
+      ZKDelegationTokenSecretManagerImpl dtsm3 =
+          new ZKDelegationTokenSecretManagerImpl(conf);
+      DelegationTokenManager tm1, tm2, tm3;
+      tm1 = new DelegationTokenManager(conf, new Text("bla"));
+      tm1.setExternalDelegationTokenSecretManager(dtsm1);
+      tm2 = new DelegationTokenManager(conf, new Text("bla"));
+      tm2.setExternalDelegationTokenSecretManager(dtsm2);
+      tm3 = new DelegationTokenManager(conf, new Text("bla"));
+      tm3.setExternalDelegationTokenSecretManager(dtsm3);
+
+      // time: X
+      // token expiry time:
+      //   tm1: X + 10
+      //   tm2: X + 10
+      //   tm3: No token due to no sync
+      Token<DelegationTokenIdentifier> token =
+          (Token<DelegationTokenIdentifier>) tm1.createToken(
+              UserGroupInformation.getCurrentUser(), "foo");
+      Assert.assertNotNull(token);
+      tm2.verifyToken(token);
+
+      // time: X + 9
+      // token expiry time:
+      //   tm1: X + 10
+      //   tm2: X + 19
+      //   tm3: No token due to no sync
+      Thread.sleep(9 * 1000);
+      long renewalTime = tm2.renewToken(token, "foo");
+      LOG.info("Renew for token {} at current time {} renewal time {}",
+          token.getIdentifier(), Time.formatTime(Time.now()),
+          Time.formatTime(renewalTime));
+      tm1.verifyToken(token);
+
+      // time: X + 13
+      // token expiry time: (sync din't happen)
+      //   tm1: X + 10
+      //   tm2: X + 19
+      //   tm3: X + 19 due to fetch from zk
+      Thread.sleep(4 * 1000);
+      tm2.verifyToken(token);
+      tm3.verifyToken(token);
+
+      dtsm1.stopThreads();
+      dtsm2.stopThreads();
+      dtsm3.stopThreads();
+      verifyDestroy(tm1, conf);
+      verifyDestroy(tm2, conf);
+      verifyDestroy(tm3, conf);
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org