You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/06/10 00:47:17 UTC

[hbase] branch master updated: HBASE-24403 FsDelegationToken Should Cache Token After Acquired A New One (#1743)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b396e9  HBASE-24403 FsDelegationToken Should Cache Token After Acquired A New One (#1743)
7b396e9 is described below

commit 7b396e9b8ca93361de6a6c4bc8a40442db77c4da
Author: Vico.Wu <58...@qq.com>
AuthorDate: Wed Jun 10 08:46:58 2020 +0800

    HBASE-24403 FsDelegationToken Should Cache Token After Acquired A New One (#1743)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
---
 .../hbase/security/token/FsDelegationToken.java    | 26 +++++++++++++++++++---
 .../hadoop/hbase/tool/BulkLoadHFilesTool.java      |  8 ++++++-
 .../security/token/TestFsDelegationToken.java      | 24 ++++++++++++++++++++
 3 files changed, 54 insertions(+), 4 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java
index 9a58006..06f3cf2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java
@@ -50,6 +50,8 @@ public class FsDelegationToken {
   private boolean hasForwardedToken = false;
   private Token<?> userToken = null;
   private FileSystem fs = null;
+  private long tokenExpireTime = -1L;
+  private long renewAheadTime = Long.MAX_VALUE;
 
   /*
    * @param renewer the account name that is allowed to renew the token.
@@ -60,6 +62,17 @@ public class FsDelegationToken {
   }
 
   /**
+   * @param renewer the account name that is allowed to renew the token.
+   * @param renewAheadTime how long in millis
+   */
+  public FsDelegationToken(final UserProvider userProvider, final String renewer,
+    long renewAheadTime) {
+    this.userProvider = userProvider;
+    this.renewer = renewer;
+    this.renewAheadTime = renewAheadTime;
+  }
+
+  /**
    * Acquire the delegation token for the specified filesystem.
    * Before requesting a new delegation token, tries to find one already available.
    * Currently supports checking existing delegation tokens for swebhdfs, webhdfs and hdfs.
@@ -100,13 +113,20 @@ public class FsDelegationToken {
     if (userProvider.isHadoopSecurityEnabled()) {
       this.fs = fs;
       userToken = userProvider.getCurrent().getToken(tokenKind, fs.getCanonicalServiceName());
-      if (userToken == null) {
+      //We should acquire token when never acquired before or token is expiring or already expired
+      if (userToken == null || tokenExpireTime <= 0
+        || System.currentTimeMillis() > tokenExpireTime - renewAheadTime) {
         hasForwardedToken = false;
         try {
           userToken = fs.getDelegationToken(renewer);
-        } catch (NullPointerException npe) {
+          //After acquired the new token,we quickly renew it to get the token expiration
+          //time to confirm to renew it before expiration
+          tokenExpireTime = userToken.renew(fs.getConf());
+          LOG.debug("Acquired new token " + userToken + ". Expiration time: " + tokenExpireTime);
+          userProvider.getCurrent().addToken(userToken);
+        } catch (InterruptedException | NullPointerException e) {
           // we need to handle NullPointerException in case HADOOP-10009 is missing
-          LOG.error("Failed to get token for " + renewer);
+          LOG.error("Failed to get token for " + renewer, e);
         }
       } else {
         hasForwardedToken = true;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
index e8b701b..7779f28 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
@@ -125,6 +125,10 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
    */
   public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family";
 
+  //HDFS DelegationToken is cached and should be renewed before token expiration
+  public static final String BULK_LOAD_RENEW_TOKEN_TIME_BUFFER
+    = "hbase.bulkload.renew.token.time.buffer";
+
   // We use a '.' prefix which is ignored when walking directory trees
   // above. It is invalid family name.
   static final String TMP_DIR = ".tmp";
@@ -142,6 +146,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
 
   private List<String> clusterIds = new ArrayList<>();
   private boolean replicate = true;
+  private final long retryAheadTime;
 
   public BulkLoadHFilesTool(Configuration conf) {
     // make a copy, just to be sure we're not overriding someone else's config
@@ -149,7 +154,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
     // disable blockcache for tool invocation, see HBASE-10500
     conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
     userProvider = UserProvider.instantiate(conf);
-    fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
+    retryAheadTime = conf.getLong(BULK_LOAD_RENEW_TOKEN_TIME_BUFFER, 60000L);
+    fsDelegationToken = new FsDelegationToken(userProvider, "renewer", retryAheadTime);
     assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
     maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
     nrThreads = conf.getInt("hbase.loadincremental.threads.max",
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestFsDelegationToken.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestFsDelegationToken.java
index 81347c7..d6a3c66 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestFsDelegationToken.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestFsDelegationToken.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenId
 import static org.apache.hadoop.hdfs.web.WebHdfsConstants.SWEBHDFS_TOKEN_KIND;
 import static org.apache.hadoop.hdfs.web.WebHdfsConstants.WEBHDFS_TOKEN_KIND;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
@@ -81,6 +82,9 @@ public class TestFsDelegationToken {
     when(hdfsToken.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
     when(webhdfsToken.getKind()).thenReturn(WEBHDFS_TOKEN_KIND);
     when(swebhdfsToken.getKind()).thenReturn(SWEBHDFS_TOKEN_KIND);
+    when(fileSystem.getDelegationToken("Renewer")).thenReturn(hdfsToken);
+    when(webHdfsFileSystem.getDelegationToken("Renewer")).thenReturn(webhdfsToken);
+    when(swebHdfsFileSystem.getDelegationToken("Renewer")).thenReturn(swebhdfsToken);
   }
 
   @Test
@@ -88,6 +92,10 @@ public class TestFsDelegationToken {
     fsDelegationToken.acquireDelegationToken(fileSystem);
     assertEquals(
         fsDelegationToken.getUserToken().getKind(), HDFS_DELEGATION_KIND);
+    assertNotNull(
+      "HDFS Token should exist in cache after acquired",
+      userProvider.getCurrent()
+        .getToken(HDFS_DELEGATION_KIND.toString(), fileSystem.getCanonicalServiceName()));
   }
 
   @Test
@@ -95,6 +103,10 @@ public class TestFsDelegationToken {
     fsDelegationToken.acquireDelegationToken(webHdfsFileSystem);
     assertEquals(
         fsDelegationToken.getUserToken().getKind(), WEBHDFS_TOKEN_KIND);
+    assertNotNull(
+      "Webhdfs token should exist in cache after acquired",
+      userProvider.getCurrent()
+        .getToken(WEBHDFS_TOKEN_KIND.toString(), webHdfsFileSystem.getCanonicalServiceName()));
   }
 
   @Test
@@ -102,6 +114,10 @@ public class TestFsDelegationToken {
     fsDelegationToken.acquireDelegationToken(swebHdfsFileSystem);
     assertEquals(
         fsDelegationToken.getUserToken().getKind(), SWEBHDFS_TOKEN_KIND);
+    assertNotNull(
+      "Swebhdfs token should exist in cache after acquired",
+      userProvider.getCurrent()
+        .getToken(SWEBHDFS_TOKEN_KIND.toString(), swebHdfsFileSystem.getCanonicalServiceName()));
   }
 
   @Test(expected = NullPointerException.class)
@@ -114,6 +130,10 @@ public class TestFsDelegationToken {
     fsDelegationToken
         .acquireDelegationToken(WEBHDFS_TOKEN_KIND.toString(), webHdfsFileSystem);
     assertEquals(fsDelegationToken.getUserToken().getKind(), WEBHDFS_TOKEN_KIND);
+    assertNotNull(
+      "Webhdfs token should exist in cache after acquired",
+      userProvider.getCurrent()
+        .getToken(WEBHDFS_TOKEN_KIND.toString(), webHdfsFileSystem.getCanonicalServiceName()));
   }
 
   @Test
@@ -122,5 +142,9 @@ public class TestFsDelegationToken {
         .acquireDelegationToken(
             SWEBHDFS_TOKEN_KIND.toString(), swebHdfsFileSystem);
     assertEquals(fsDelegationToken.getUserToken().getKind(), SWEBHDFS_TOKEN_KIND);
+    assertNotNull(
+      "Swebhdfs token should exist in cache after acquired",
+      userProvider.getCurrent()
+        .getToken(SWEBHDFS_TOKEN_KIND.toString(), swebHdfsFileSystem.getCanonicalServiceName()));
   }
 }