You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/06/07 04:24:36 UTC

[hbase] branch branch-2.4 updated: HBASE-27093 AsyncNonMetaRegionLocator:put Complete CompletableFuture outside lock block (#4496)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 3d82d2d9e78 HBASE-27093 AsyncNonMetaRegionLocator:put Complete CompletableFuture outside lock block (#4496)
3d82d2d9e78 is described below

commit 3d82d2d9e7870e1aa14aa6c0a849e764a51547e3
Author: wangzhi <12...@qq.com>
AuthorDate: Tue Jun 7 12:17:52 2022 +0800

    HBASE-27093 AsyncNonMetaRegionLocator:put Complete CompletableFuture outside lock block (#4496)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    (cherry picked from commit 176c43c5ad1aab01eb2d2b05c0cb90132e8d19b1)
---
 .../hbase/client/AsyncNonMetaRegionLocator.java    | 54 ++++++++++++++++++----
 1 file changed, 44 insertions(+), 10 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index df6c6b753ed..0009415142c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -35,10 +35,12 @@ import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -122,6 +124,26 @@ class AsyncNonMetaRegionLocator {
     }
   }
 
+  private static final class RegionLocationsFutureResult {
+    private final CompletableFuture<RegionLocations> future;
+    private final RegionLocations result;
+    private final Throwable e;
+
+    public RegionLocationsFutureResult(CompletableFuture<RegionLocations> future,
+      RegionLocations result, Throwable e) {
+      this.future = future;
+      this.result = result;
+      this.e = e;
+    }
+
+    public void complete() {
+      if (e != null) {
+        future.completeExceptionally(e);
+      }
+      future.complete(result);
+    }
+  }
+
   private static final class TableCache {
 
     private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
@@ -148,18 +170,20 @@ class AsyncNonMetaRegionLocator {
       return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
     }
 
-    public void clearCompletedRequests(RegionLocations locations) {
+    public List<RegionLocationsFutureResult> clearCompletedRequests(RegionLocations locations) {
+      List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
       for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
         allRequests.entrySet().iterator(); iter.hasNext();) {
         Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
-        if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
+        if (tryComplete(entry.getKey(), entry.getValue(), locations, futureResultList)) {
           iter.remove();
         }
       }
+      return futureResultList;
     }
 
     private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
-      RegionLocations locations) {
+      RegionLocations locations, List<RegionLocationsFutureResult> futureResultList) {
       if (future.isDone()) {
         return true;
       }
@@ -185,7 +209,7 @@ class AsyncNonMetaRegionLocator {
         completed = loc.getRegion().containsRow(req.row);
       }
       if (completed) {
-        future.complete(locations);
+        futureResultList.add(new RegionLocationsFutureResult(future, locations, null));
         return true;
       } else {
         return false;
@@ -320,32 +344,36 @@ class AsyncNonMetaRegionLocator {
     TableCache tableCache = getTableCache(tableName);
     if (locs != null) {
       RegionLocations addedLocs = addToCache(tableCache, locs);
+      List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
       synchronized (tableCache) {
         tableCache.pendingRequests.remove(req);
-        tableCache.clearCompletedRequests(addedLocs);
+        futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
         // Remove a complete locate request in a synchronized block, so the table cache must have
         // quota to send a candidate request.
         toSend = tableCache.getCandidate();
         toSend.ifPresent(r -> tableCache.send(r));
       }
+      futureResultList.forEach(RegionLocationsFutureResult::complete);
       toSend.ifPresent(r -> locateInMeta(tableName, r));
     } else {
       // we meet an error
       assert error != null;
+      List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
       synchronized (tableCache) {
         tableCache.pendingRequests.remove(req);
         // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
         // already retried several times
-        CompletableFuture<?> future = tableCache.allRequests.remove(req);
+        CompletableFuture<RegionLocations> future = tableCache.allRequests.remove(req);
         if (future != null) {
-          future.completeExceptionally(error);
+          futureResultList.add(new RegionLocationsFutureResult(future, null, error));
         }
-        tableCache.clearCompletedRequests(null);
+        futureResultList.addAll(tableCache.clearCompletedRequests(null));
         // Remove a complete locate request in a synchronized block, so the table cache must have
         // quota to send a candidate request.
         toSend = tableCache.getCandidate();
         toSend.ifPresent(r -> tableCache.send(r));
       }
+      futureResultList.forEach(RegionLocationsFutureResult::complete);
       toSend.ifPresent(r -> locateInMeta(tableName, r));
     }
   }
@@ -543,9 +571,11 @@ class AsyncNonMetaRegionLocator {
               continue;
             }
             RegionLocations addedLocs = addToCache(tableCache, locs);
+            List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
             synchronized (tableCache) {
-              tableCache.clearCompletedRequests(addedLocs);
+              futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
             }
+            futureResultList.forEach(RegionLocationsFutureResult::complete);
           }
         }
       }
@@ -677,12 +707,16 @@ class AsyncNonMetaRegionLocator {
     if (tableCache == null) {
       return;
     }
+    List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
     synchronized (tableCache) {
       if (!tableCache.allRequests.isEmpty()) {
         IOException error = new IOException("Cache cleared");
-        tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
+        tableCache.allRequests.values().forEach(f -> {
+          futureResultList.add(new RegionLocationsFutureResult(f, null, error));
+        });
       }
     }
+    futureResultList.forEach(RegionLocationsFutureResult::complete);
     conn.getConnectionMetrics()
       .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
   }