You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2022/02/24 16:23:27 UTC

[hbase] branch master updated: HBASE-26765 Minor refactor of async scanning code (#4121)

This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dc663e  HBASE-26765 Minor refactor of async scanning code (#4121)
5dc663e is described below

commit 5dc663ea388d64f9d583a9e8151da6b8d403b0fd
Author: Nick Dimiduk <nd...@apache.org>
AuthorDate: Thu Feb 24 17:20:57 2022 +0100

    HBASE-26765 Minor refactor of async scanning code (#4121)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |  2 +-
 .../apache/hadoop/hbase/client/AsyncTableImpl.java |  5 ++---
 .../hbase/client/AsyncTableResultScanner.java      | 22 +++++++++++-----------
 .../hadoop/hbase/client/RawAsyncTableImpl.java     | 12 ++++++++----
 4 files changed, 22 insertions(+), 19 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 9ab1a1e..506962f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -362,7 +362,7 @@ public class AsyncConnectionImpl implements AsyncConnection {
       public AsyncTable<ScanResultConsumer> build() {
         RawAsyncTableImpl rawTable =
           new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
-        return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
+        return new AsyncTableImpl(rawTable, pool);
       }
     };
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index a124467..96c650f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -44,12 +44,11 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
 @InterfaceAudience.Private
 class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
 
-  private final AsyncTable<AdvancedScanResultConsumer> rawTable;
+  private final RawAsyncTableImpl rawTable;
 
   private final ExecutorService pool;
 
-  AsyncTableImpl(AsyncConnectionImpl conn, AsyncTable<AdvancedScanResultConsumer> rawTable,
-      ExecutorService pool) {
+  AsyncTableImpl(RawAsyncTableImpl rawTable, ExecutorService pool) {
     this.rawTable = rawTable;
     this.pool = pool;
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
index 2858d2f..e9b15f9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayDeque;
 import java.util.Queue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -30,8 +31,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically
- * in background and cache it in memory. Typically the {@link #maxCacheSize} will be
+ * The {@link ResultScanner} implementation for {@link RawAsyncTableImpl}. It will fetch data
+ * automatically in background and cache it in memory. Typically, the {@link #maxCacheSize} will be
  * {@code 2 * scan.getMaxResultSize()}.
  */
 @InterfaceAudience.Private
@@ -39,7 +40,7 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum
 
   private static final Logger LOG = LoggerFactory.getLogger(AsyncTableResultScanner.class);
 
-  private final AsyncTable<AdvancedScanResultConsumer> rawTable;
+  private final TableName tableName;
 
   private final long maxCacheSize;
 
@@ -57,12 +58,10 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum
 
   private ScanResumer resumer;
 
-  public AsyncTableResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan,
-      long maxCacheSize) {
-    this.rawTable = table;
+  public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) {
+    this.tableName = tableName;
     this.maxCacheSize = maxCacheSize;
     this.scan = scan;
-    table.scan(scan, this);
   }
 
   private void addToCache(Result result) {
@@ -72,9 +71,10 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum
 
   private void stopPrefetch(ScanController controller) {
     if (LOG.isDebugEnabled()) {
-      LOG.debug(String.format("0x%x", System.identityHashCode(this)) +
-        " stop prefetching when scanning " + rawTable.getName() + " as the cache size " +
-        cacheSize + " is greater than the maxCacheSize " + maxCacheSize);
+      LOG.debug("{} stop prefetching when scanning {} as the cache size {}" +
+        " is greater than the maxCacheSize {}",
+        String.format("0x%x", System.identityHashCode(this)), tableName, cacheSize,
+        maxCacheSize);
     }
     resumer = controller.suspend();
   }
@@ -138,7 +138,7 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum
         return null;
       }
       if (error != null) {
-        FutureUtils.rethrow(error);
+        throw FutureUtils.rethrow(error);
       }
       try {
         wait();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 655ab23..a144550 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -628,10 +628,14 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
   }
 
   @Override
-  public ResultScanner getScanner(Scan scan) {
-    return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan),
-      resultSize2CacheSize(
-        scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
+  public AsyncTableResultScanner getScanner(Scan scan) {
+    final long maxCacheSize = resultSize2CacheSize(
+      scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize);
+    final Scan scanCopy = ReflectionUtils.newInstance(scan.getClass(), scan);
+    final AsyncTableResultScanner scanner =
+      new AsyncTableResultScanner(tableName, scanCopy, maxCacheSize);
+    scan(scan, scanner);
+    return scanner;
   }
 
   @Override