You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/01/02 13:20:15 UTC
[hbase] branch master updated: HBASE-26629 Add expiration for long time vacant scanners in Thrift2 (#3984)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new c811acd HBASE-26629 Add expiration for long time vacant scanners in Thrift2 (#3984)
c811acd is described below
commit c811acdcab266cf014fa1e55236fa54a35c8d02a
Author: Yutong Xiao <yu...@gmail.com>
AuthorDate: Sun Jan 2 21:19:31 2022 +0800
HBASE-26629 Add expiration for long time vacant scanners in Thrift2 (#3984)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../hbase/thrift2/ThriftHBaseServiceHandler.java | 36 +++++++++++++---------
.../thrift2/TestThriftHBaseServiceHandler.java | 27 ++++++++++++++++
2 files changed, 48 insertions(+), 15 deletions(-)
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
index ef6b8b0..19fee57 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
@@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.thrift2;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
@@ -50,9 +52,8 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.commons.lang3.NotImplementedException;
@@ -97,6 +98,9 @@ import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor;
import org.apache.hadoop.hbase.thrift2.generated.TTableName;
import org.apache.hadoop.hbase.thrift2.generated.TThriftServerType;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.cache.RemovalListener;
import org.apache.thrift.TException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -114,9 +118,8 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);
// nextScannerId and scannerMap are used to manage scanner state
- // TODO: Cleanup thread for Scanners, Scanner id wrap
private final AtomicInteger nextScannerId = new AtomicInteger(0);
- private final Map<Integer, ResultScanner> scannerMap = new ConcurrentHashMap<>();
+ private final Cache<Integer, ResultScanner> scannerMap;
private static final IOException ioe
= new DoNotRetryIOException("Thrift Server is in Read-only mode.");
@@ -160,7 +163,14 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
public ThriftHBaseServiceHandler(final Configuration conf,
final UserProvider userProvider) throws IOException {
super(conf, userProvider);
+ long cacheTimeout = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+ DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
isReadOnly = conf.getBoolean(THRIFT_READONLY_ENABLED, THRIFT_READONLY_ENABLED_DEFAULT);
+ scannerMap = CacheBuilder.newBuilder()
+ .expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS)
+ .removalListener((RemovalListener<Integer, ResultScanner>) removalNotification ->
+ removalNotification.getValue().close())
+ .build();
}
@Override
@@ -212,16 +222,15 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
* @return a Scanner, or null if the Id is invalid
*/
private ResultScanner getScanner(int id) {
- return scannerMap.get(id);
+ return scannerMap.getIfPresent(id);
}
/**
* Removes the scanner associated with the specified ID from the internal HashMap.
* @param id of the Scanner to remove
- * @return the removed Scanner, or <code>null</code> if the Id is invalid
*/
- protected ResultScanner removeScanner(int id) {
- return scannerMap.remove(id);
+ protected void removeScanner(int id) {
+ scannerMap.invalidate(id);
}
@Override
@@ -466,18 +475,15 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
return results;
}
-
-
@Override
public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
LOG.debug("scannerClose: id=" + scannerId);
ResultScanner scanner = getScanner(scannerId);
if (scanner == null) {
- String message = "scanner ID is invalid";
- LOG.warn(message);
- TIllegalArgument ex = new TIllegalArgument();
- ex.setMessage("Invalid scanner Id");
- throw ex;
+ LOG.warn("scanner ID: " + scannerId + "is invalid");
+ // While the scanner could be already expired,
+ // we should not throw exception here. Just log and return.
+ return;
}
scanner.close();
removeScanner(scannerId);
diff --git a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
index e84962d..1453f37 100644
--- a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
+++ b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.thrift2;
import static java.nio.ByteBuffer.wrap;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.thrift.HBaseServiceHandler.CLEANUP_INTERVAL;
import static org.apache.hadoop.hbase.thrift.HBaseServiceHandler.MAX_IDLETIME;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
@@ -1041,6 +1043,31 @@ public class TestThriftHBaseServiceHandler {
}
@Test
+ public void testExpiredScanner() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ conf.setLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1000);
+ ThriftHBaseServiceHandler handler =
+ new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf));
+
+ TScan scan = new TScan();
+ ByteBuffer table = wrap(tableAname);
+
+ int scannerId = handler.openScanner(table, scan);
+ handler.getScannerRows(scannerId, 1);
+ Thread.sleep(1000);
+
+ try {
+ handler.getScannerRows(scannerId, 1);
+ fail("The scanner should be expired and have an TIllegalArgument exception here.");
+ } catch (TIllegalArgument e) {
+ assertEquals("Invalid scanner Id", e.getMessage());
+ } finally {
+ conf.setLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+ DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
+ }
+ }
+
+ @Test
public void testPutTTL() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
byte[] rowName = Bytes.toBytes("testPutTTL");