You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/20 21:30:26 UTC

[29/50] [abbrv] hbase git commit: HBASE-18598 AsyncNonMetaRegionLocator use FIFO algorithm to get a candidate locate request

HBASE-18598 AsyncNonMetaRegionLocator use FIFO algorithm to get a candidate locate request


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/59ffb611
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/59ffb611
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/59ffb611

Branch: refs/heads/HBASE-18467
Commit: 59ffb6119b2e4613bc8baec9a0738096184a3d92
Parents: 665fd0d
Author: Guanghao Zhang <zg...@apache.org>
Authored: Tue Aug 15 16:15:29 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Wed Aug 16 13:08:40 2017 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncNonMetaRegionLocator.java | 119 ++++++++++---------
 .../client/TestAsyncNonMetaRegionLocator.java   |   1 +
 2 files changed, 63 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/59ffb611/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 31f369c..ab1f0db 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -29,18 +29,18 @@ import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -107,7 +107,7 @@ class AsyncNonMetaRegionLocator {
     public final Set<LocateRequest> pendingRequests = new HashSet<>();
 
     public final Map<LocateRequest, CompletableFuture<HRegionLocation>> allRequests =
-        new HashMap<>();
+        new LinkedHashMap<>();
 
     public boolean hasQuota(int max) {
       return pendingRequests.size() < max;
@@ -120,6 +120,49 @@ class AsyncNonMetaRegionLocator {
     public void send(LocateRequest req) {
       pendingRequests.add(req);
     }
+
+    public Optional<LocateRequest> getCandidate() {
+      return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
+    }
+
+    public void clearCompletedRequests(Optional<HRegionLocation> location) {
+      for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter = allRequests
+          .entrySet().iterator(); iter.hasNext();) {
+        Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next();
+        if (tryComplete(entry.getKey(), entry.getValue(), location)) {
+          iter.remove();
+        }
+      }
+    }
+
+    private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future,
+        Optional<HRegionLocation> location) {
+      if (future.isDone()) {
+        return true;
+      }
+      if (!location.isPresent()) {
+        return false;
+      }
+      HRegionLocation loc = location.get();
+      boolean completed;
+      if (req.locateType.equals(RegionLocateType.BEFORE)) {
+        // for locating the row before current row, the common case is to find the previous region in
+        // reverse scan, so we check the endKey first. In general, the condition should be startKey <
+        // req.row and endKey >= req.row. Here we split it to endKey == req.row || (endKey > req.row
+        // && startKey < req.row). The two conditions are equal since startKey < endKey.
+        int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row);
+        completed =
+            c == 0 || (c > 0 && Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0);
+      } else {
+        completed = loc.getRegionInfo().containsRow(req.row);
+      }
+      if (completed) {
+        future.complete(loc);
+        return true;
+      } else {
+        return false;
+      }
+    }
   }
 
   AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
@@ -186,48 +229,27 @@ class AsyncNonMetaRegionLocator {
     }
   }
 
-  private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future,
-      HRegionLocation loc) {
-    if (future.isDone()) {
-      return true;
-    }
-    boolean completed;
-    if (req.locateType.equals(RegionLocateType.BEFORE)) {
-      // for locating the row before current row, the common case is to find the previous region in
-      // reverse scan, so we check the endKey first. In general, the condition should be startKey <
-      // req.row and endKey >= req.row. Here we split it to endKey == req.row || (endKey > req.row
-      // && startKey < req.row). The two conditions are equal since startKey < endKey.
-      int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row);
-      completed =
-          c == 0 || (c > 0 && Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0);
-    } else {
-      completed = loc.getRegionInfo().containsRow(req.row);
-    }
-    if (completed) {
-      future.complete(loc);
-      return true;
-    } else {
-      return false;
-    }
-  }
-
   private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
       Throwable error) {
     if (error != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Failed to locate region in '" + tableName + "', row='" +
-            Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType,
-          error);
-      }
+      LOG.warn(
+        "Failed to locate region in '" + tableName + "', row='" + Bytes.toStringBinary(req.row)
+            + "', locateType=" + req.locateType, error);
     }
-    LocateRequest toSend = null;
+    Optional<LocateRequest> toSend = Optional.empty();
     TableCache tableCache = getTableCache(tableName);
     if (loc != null) {
       if (!addToCache(tableCache, loc)) {
         // someone is ahead of us.
         synchronized (tableCache) {
           tableCache.pendingRequests.remove(req);
+          tableCache.clearCompletedRequests(Optional.empty());
+          // Remove a complete locate request in a synchronized block, so the table cache must have
+          // quota to send a candidate request.
+          toSend = tableCache.getCandidate();
+          toSend.ifPresent(r -> tableCache.send(r));
         }
+        toSend.ifPresent(r -> locateInMeta(tableName, r));
         return;
       }
     }
@@ -239,30 +261,13 @@ class AsyncNonMetaRegionLocator {
           future.completeExceptionally(error);
         }
       }
-      if (loc != null) {
-        for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter =
-            tableCache.allRequests.entrySet().iterator(); iter.hasNext();) {
-          Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next();
-          if (tryComplete(entry.getKey(), entry.getValue(), loc)) {
-            iter.remove();
-          }
-        }
-      }
-      if (!tableCache.allRequests.isEmpty() &&
-          tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) {
-        LocateRequest[] candidates = tableCache.allRequests.keySet().stream()
-            .filter(r -> !tableCache.isPending(r)).toArray(LocateRequest[]::new);
-        if (candidates.length > 0) {
-          // TODO: use a better algorithm to send a request which is more likely to fetch a new
-          // location.
-          toSend = candidates[ThreadLocalRandom.current().nextInt(candidates.length)];
-          tableCache.send(toSend);
-        }
-      }
-    }
-    if (toSend != null) {
-      locateInMeta(tableName, toSend);
+      tableCache.clearCompletedRequests(Optional.ofNullable(loc));
+      // Remove a complete locate request in a synchronized block, so the table cache must have
+      // quota to send a candidate request.
+      toSend = tableCache.getCandidate();
+      toSend.ifPresent(r -> tableCache.send(r));
     }
+    toSend.ifPresent(r -> locateInMeta(tableName, r));
   }
 
   private void onScanComplete(TableName tableName, LocateRequest req, List<Result> results,

http://git-wip-us.apache.org/repos/asf/hbase/blob/59ffb611/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index 0bb192b..80ed02e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -209,6 +209,7 @@ public class TestAsyncNonMetaRegionLocator {
         throw new RuntimeException(e);
       }
     }));
+
     LOCATOR.clearCache(TABLE_NAME);
     byte[][] endKeys = getEndKeys();
     IntStream.range(0, 2).forEach(