You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/06/07 04:25:09 UTC
[hbase] branch branch-2.5 updated: HBASE-27093 AsyncNonMetaRegionLocator:put Complete CompletableFuture outside lock block (#4496)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 5cc614f23f9 HBASE-27093 AsyncNonMetaRegionLocator:put Complete CompletableFuture outside lock block (#4496)
5cc614f23f9 is described below
commit 5cc614f23f916fb713319099cd4dcbe341d23c71
Author: wangzhi <12...@qq.com>
AuthorDate: Tue Jun 7 12:17:52 2022 +0800
HBASE-27093 AsyncNonMetaRegionLocator:put Complete CompletableFuture outside lock block (#4496)
Signed-off-by: Duo Zhang <zh...@apache.org>
(cherry picked from commit 176c43c5ad1aab01eb2d2b05c0cb90132e8d19b1)
---
.../hbase/client/AsyncNonMetaRegionLocator.java | 54 ++++++++++++++++++----
1 file changed, 44 insertions(+), 10 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index df6c6b753ed..0009415142c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -35,10 +35,12 @@ import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -122,6 +124,26 @@ class AsyncNonMetaRegionLocator {
}
}
+ private static final class RegionLocationsFutureResult {
+ private final CompletableFuture<RegionLocations> future;
+ private final RegionLocations result;
+ private final Throwable e;
+
+ public RegionLocationsFutureResult(CompletableFuture<RegionLocations> future,
+ RegionLocations result, Throwable e) {
+ this.future = future;
+ this.result = result;
+ this.e = e;
+ }
+
+ public void complete() {
+ if (e != null) {
+ future.completeExceptionally(e);
+ }
+ future.complete(result);
+ }
+ }
+
private static final class TableCache {
private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
@@ -148,18 +170,20 @@ class AsyncNonMetaRegionLocator {
return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
}
- public void clearCompletedRequests(RegionLocations locations) {
+ public List<RegionLocationsFutureResult> clearCompletedRequests(RegionLocations locations) {
+ List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
allRequests.entrySet().iterator(); iter.hasNext();) {
Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
- if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
+ if (tryComplete(entry.getKey(), entry.getValue(), locations, futureResultList)) {
iter.remove();
}
}
+ return futureResultList;
}
private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
- RegionLocations locations) {
+ RegionLocations locations, List<RegionLocationsFutureResult> futureResultList) {
if (future.isDone()) {
return true;
}
@@ -185,7 +209,7 @@ class AsyncNonMetaRegionLocator {
completed = loc.getRegion().containsRow(req.row);
}
if (completed) {
- future.complete(locations);
+ futureResultList.add(new RegionLocationsFutureResult(future, locations, null));
return true;
} else {
return false;
@@ -320,32 +344,36 @@ class AsyncNonMetaRegionLocator {
TableCache tableCache = getTableCache(tableName);
if (locs != null) {
RegionLocations addedLocs = addToCache(tableCache, locs);
+ List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
- tableCache.clearCompletedRequests(addedLocs);
+ futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
// Remove a complete locate request in a synchronized block, so the table cache must have
// quota to send a candidate request.
toSend = tableCache.getCandidate();
toSend.ifPresent(r -> tableCache.send(r));
}
+ futureResultList.forEach(RegionLocationsFutureResult::complete);
toSend.ifPresent(r -> locateInMeta(tableName, r));
} else {
// we meet an error
assert error != null;
+ List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
// fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
// already retried several times
- CompletableFuture<?> future = tableCache.allRequests.remove(req);
+ CompletableFuture<RegionLocations> future = tableCache.allRequests.remove(req);
if (future != null) {
- future.completeExceptionally(error);
+ futureResultList.add(new RegionLocationsFutureResult(future, null, error));
}
- tableCache.clearCompletedRequests(null);
+ futureResultList.addAll(tableCache.clearCompletedRequests(null));
// Remove a complete locate request in a synchronized block, so the table cache must have
// quota to send a candidate request.
toSend = tableCache.getCandidate();
toSend.ifPresent(r -> tableCache.send(r));
}
+ futureResultList.forEach(RegionLocationsFutureResult::complete);
toSend.ifPresent(r -> locateInMeta(tableName, r));
}
}
@@ -543,9 +571,11 @@ class AsyncNonMetaRegionLocator {
continue;
}
RegionLocations addedLocs = addToCache(tableCache, locs);
+ List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
- tableCache.clearCompletedRequests(addedLocs);
+ futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
}
+ futureResultList.forEach(RegionLocationsFutureResult::complete);
}
}
}
@@ -677,12 +707,16 @@ class AsyncNonMetaRegionLocator {
if (tableCache == null) {
return;
}
+ List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
if (!tableCache.allRequests.isEmpty()) {
IOException error = new IOException("Cache cleared");
- tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
+ tableCache.allRequests.values().forEach(f -> {
+ futureResultList.add(new RegionLocationsFutureResult(f, null, error));
+ });
}
}
+ futureResultList.forEach(RegionLocationsFutureResult::complete);
conn.getConnectionMetrics()
.ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
}