You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/20 21:30:26 UTC
[29/50] [abbrv] hbase git commit: HBASE-18598
AsyncNonMetaRegionLocator use FIFO algorithm to get a candidate locate
request
HBASE-18598 AsyncNonMetaRegionLocator use FIFO algorithm to get a candidate locate request
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/59ffb611
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/59ffb611
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/59ffb611
Branch: refs/heads/HBASE-18467
Commit: 59ffb6119b2e4613bc8baec9a0738096184a3d92
Parents: 665fd0d
Author: Guanghao Zhang <zg...@apache.org>
Authored: Tue Aug 15 16:15:29 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Wed Aug 16 13:08:40 2017 +0800
----------------------------------------------------------------------
.../hbase/client/AsyncNonMetaRegionLocator.java | 119 ++++++++++---------
.../client/TestAsyncNonMetaRegionLocator.java | 1 +
2 files changed, 63 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/59ffb611/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 31f369c..ab1f0db 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -29,18 +29,18 @@ import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import java.io.IOException;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -107,7 +107,7 @@ class AsyncNonMetaRegionLocator {
public final Set<LocateRequest> pendingRequests = new HashSet<>();
public final Map<LocateRequest, CompletableFuture<HRegionLocation>> allRequests =
- new HashMap<>();
+ new LinkedHashMap<>();
public boolean hasQuota(int max) {
return pendingRequests.size() < max;
@@ -120,6 +120,49 @@ class AsyncNonMetaRegionLocator {
public void send(LocateRequest req) {
pendingRequests.add(req);
}
+
+ public Optional<LocateRequest> getCandidate() {
+ return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
+ }
+
+ public void clearCompletedRequests(Optional<HRegionLocation> location) {
+ for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter = allRequests
+ .entrySet().iterator(); iter.hasNext();) {
+ Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next();
+ if (tryComplete(entry.getKey(), entry.getValue(), location)) {
+ iter.remove();
+ }
+ }
+ }
+
+ private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future,
+ Optional<HRegionLocation> location) {
+ if (future.isDone()) {
+ return true;
+ }
+ if (!location.isPresent()) {
+ return false;
+ }
+ HRegionLocation loc = location.get();
+ boolean completed;
+ if (req.locateType.equals(RegionLocateType.BEFORE)) {
+ // for locating the row before current row, the common case is to find the previous region in
+ // reverse scan, so we check the endKey first. In general, the condition should be startKey <
+ // req.row and endKey >= req.row. Here we split it to endKey == req.row || (endKey > req.row
+ // && startKey < req.row). The two conditions are equal since startKey < endKey.
+ int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row);
+ completed =
+ c == 0 || (c > 0 && Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0);
+ } else {
+ completed = loc.getRegionInfo().containsRow(req.row);
+ }
+ if (completed) {
+ future.complete(loc);
+ return true;
+ } else {
+ return false;
+ }
+ }
}
AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
@@ -186,48 +229,27 @@ class AsyncNonMetaRegionLocator {
}
}
- private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future,
- HRegionLocation loc) {
- if (future.isDone()) {
- return true;
- }
- boolean completed;
- if (req.locateType.equals(RegionLocateType.BEFORE)) {
- // for locating the row before current row, the common case is to find the previous region in
- // reverse scan, so we check the endKey first. In general, the condition should be startKey <
- // req.row and endKey >= req.row. Here we split it to endKey == req.row || (endKey > req.row
- // && startKey < req.row). The two conditions are equal since startKey < endKey.
- int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row);
- completed =
- c == 0 || (c > 0 && Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0);
- } else {
- completed = loc.getRegionInfo().containsRow(req.row);
- }
- if (completed) {
- future.complete(loc);
- return true;
- } else {
- return false;
- }
- }
-
private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
Throwable error) {
if (error != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to locate region in '" + tableName + "', row='" +
- Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType,
- error);
- }
+ LOG.warn(
+ "Failed to locate region in '" + tableName + "', row='" + Bytes.toStringBinary(req.row)
+ + "', locateType=" + req.locateType, error);
}
- LocateRequest toSend = null;
+ Optional<LocateRequest> toSend = Optional.empty();
TableCache tableCache = getTableCache(tableName);
if (loc != null) {
if (!addToCache(tableCache, loc)) {
// someone is ahead of us.
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
+ tableCache.clearCompletedRequests(Optional.empty());
+ // Remove a complete locate request in a synchronized block, so the table cache must have
+ // quota to send a candidate request.
+ toSend = tableCache.getCandidate();
+ toSend.ifPresent(r -> tableCache.send(r));
}
+ toSend.ifPresent(r -> locateInMeta(tableName, r));
return;
}
}
@@ -239,30 +261,13 @@ class AsyncNonMetaRegionLocator {
future.completeExceptionally(error);
}
}
- if (loc != null) {
- for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter =
- tableCache.allRequests.entrySet().iterator(); iter.hasNext();) {
- Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next();
- if (tryComplete(entry.getKey(), entry.getValue(), loc)) {
- iter.remove();
- }
- }
- }
- if (!tableCache.allRequests.isEmpty() &&
- tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) {
- LocateRequest[] candidates = tableCache.allRequests.keySet().stream()
- .filter(r -> !tableCache.isPending(r)).toArray(LocateRequest[]::new);
- if (candidates.length > 0) {
- // TODO: use a better algorithm to send a request which is more likely to fetch a new
- // location.
- toSend = candidates[ThreadLocalRandom.current().nextInt(candidates.length)];
- tableCache.send(toSend);
- }
- }
- }
- if (toSend != null) {
- locateInMeta(tableName, toSend);
+ tableCache.clearCompletedRequests(Optional.ofNullable(loc));
+ // Remove a complete locate request in a synchronized block, so the table cache must have
+ // quota to send a candidate request.
+ toSend = tableCache.getCandidate();
+ toSend.ifPresent(r -> tableCache.send(r));
}
+ toSend.ifPresent(r -> locateInMeta(tableName, r));
}
private void onScanComplete(TableName tableName, LocateRequest req, List<Result> results,
http://git-wip-us.apache.org/repos/asf/hbase/blob/59ffb611/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index 0bb192b..80ed02e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -209,6 +209,7 @@ public class TestAsyncNonMetaRegionLocator {
throw new RuntimeException(e);
}
}));
+
LOCATOR.clearCache(TABLE_NAME);
byte[][] endKeys = getEndKeys();
IntStream.range(0, 2).forEach(