You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/01/02 13:20:15 UTC

[hbase] branch master updated: HBASE-26629 Add expiration for long time vacant scanners in Thrift2 (#3984)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new c811acd  HBASE-26629 Add expiration for long time vacant scanners in Thrift2 (#3984)
c811acd is described below

commit c811acdcab266cf014fa1e55236fa54a35c8d02a
Author: Yutong Xiao <yu...@gmail.com>
AuthorDate: Sun Jan 2 21:19:31 2022 +0800

    HBASE-26629 Add expiration for long time vacant scanners in Thrift2 (#3984)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hbase/thrift2/ThriftHBaseServiceHandler.java   | 36 +++++++++++++---------
 .../thrift2/TestThriftHBaseServiceHandler.java     | 27 ++++++++++++++++
 2 files changed, 48 insertions(+), 15 deletions(-)

diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
index ef6b8b0..19fee57 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.thrift2;
 
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
 import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED;
 import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED_DEFAULT;
 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
@@ -50,9 +52,8 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 import org.apache.commons.lang3.NotImplementedException;
@@ -97,6 +98,9 @@ import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor;
 import org.apache.hadoop.hbase.thrift2.generated.TTableName;
 import org.apache.hadoop.hbase.thrift2.generated.TThriftServerType;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.cache.RemovalListener;
 import org.apache.thrift.TException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -114,9 +118,8 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
   private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);
 
   // nextScannerId and scannerMap are used to manage scanner state
-  // TODO: Cleanup thread for Scanners, Scanner id wrap
   private final AtomicInteger nextScannerId = new AtomicInteger(0);
-  private final Map<Integer, ResultScanner> scannerMap = new ConcurrentHashMap<>();
+  private final Cache<Integer, ResultScanner> scannerMap;
 
   private static final IOException ioe
       = new DoNotRetryIOException("Thrift Server is in Read-only mode.");
@@ -160,7 +163,14 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
   public ThriftHBaseServiceHandler(final Configuration conf,
       final UserProvider userProvider) throws IOException {
     super(conf, userProvider);
+    long cacheTimeout = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+      DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
     isReadOnly = conf.getBoolean(THRIFT_READONLY_ENABLED, THRIFT_READONLY_ENABLED_DEFAULT);
+    scannerMap = CacheBuilder.newBuilder()
+      .expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS)
+      .removalListener((RemovalListener<Integer, ResultScanner>) removalNotification ->
+          removalNotification.getValue().close())
+      .build();
   }
 
   @Override
@@ -212,16 +222,15 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
    * @return a Scanner, or null if the Id is invalid
    */
   private ResultScanner getScanner(int id) {
-    return scannerMap.get(id);
+    return scannerMap.getIfPresent(id);
   }
 
   /**
    * Removes the scanner associated with the specified ID from the internal HashMap.
    * @param id of the Scanner to remove
-   * @return the removed Scanner, or <code>null</code> if the Id is invalid
    */
-  protected ResultScanner removeScanner(int id) {
-    return scannerMap.remove(id);
+  protected void removeScanner(int id) {
+    scannerMap.invalidate(id);
   }
 
   @Override
@@ -466,18 +475,15 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
     return results;
   }
 
-
-
   @Override
   public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
     LOG.debug("scannerClose: id=" + scannerId);
     ResultScanner scanner = getScanner(scannerId);
     if (scanner == null) {
-      String message = "scanner ID is invalid";
-      LOG.warn(message);
-      TIllegalArgument ex = new TIllegalArgument();
-      ex.setMessage("Invalid scanner Id");
-      throw ex;
+      LOG.warn("scanner ID: " + scannerId + "is invalid");
+      // While the scanner could be already expired,
+      // we should not throw exception here. Just log and return.
+      return;
     }
     scanner.close();
     removeScanner(scannerId);
diff --git a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
index e84962d..1453f37 100644
--- a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
+++ b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hbase.thrift2;
 
 import static java.nio.ByteBuffer.wrap;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
 import static org.apache.hadoop.hbase.thrift.HBaseServiceHandler.CLEANUP_INTERVAL;
 import static org.apache.hadoop.hbase.thrift.HBaseServiceHandler.MAX_IDLETIME;
 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
@@ -1041,6 +1043,31 @@ public class TestThriftHBaseServiceHandler {
   }
 
   @Test
+  public void testExpiredScanner() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1000);
+    ThriftHBaseServiceHandler handler =
+      new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf));
+
+    TScan scan = new TScan();
+    ByteBuffer table = wrap(tableAname);
+
+    int scannerId = handler.openScanner(table, scan);
+    handler.getScannerRows(scannerId, 1);
+    Thread.sleep(1000);
+
+    try {
+      handler.getScannerRows(scannerId, 1);
+      fail("The scanner should be expired and have an TIllegalArgument exception here.");
+    } catch (TIllegalArgument e) {
+      assertEquals("Invalid scanner Id", e.getMessage());
+    } finally {
+      conf.setLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+        DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
+    }
+  }
+
+  @Test
   public void testPutTTL() throws Exception {
     ThriftHBaseServiceHandler handler = createHandler();
     byte[] rowName = Bytes.toBytes("testPutTTL");