You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/07/20 01:51:01 UTC

[GitHub] [hbase] Apache9 commented on a change in pull request #2095: HBASE-24459 Move the locateMeta logic from AsyncMetaRegionTableLocato…

Apache9 commented on a change in pull request #2095:
URL: https://github.com/apache/hbase/pull/2095#discussion_r456988605



##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
##########
@@ -652,4 +626,160 @@ static void setCoprocessorError(RpcController controller, Throwable error) {
       controller.setFailed(error.toString());
     }
   }
+
+  public static RegionLocations locateRow(NavigableMap<byte[], RegionLocations> cache,
+    TableName tableName, byte[] row, int replicaId) {
+    Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
+    byte[] endKey = loc.getRegion().getEndKey();
+    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
+      }
+      return locs;
+    } else {
+      return null;
+    }
+  }
+
+  public static RegionLocations locateRowBefore(NavigableMap<byte[], RegionLocations> cache,
+    TableName tableName, byte[] row, int replicaId) {
+    boolean isEmptyStopRow = isEmptyStopRow(row);
+    Map.Entry<byte[], RegionLocations> entry =
+      isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
+    if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
+      (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
+      }
+      return locs;
+    } else {
+      return null;
+    }
+  }
+
+  public static void tryClearMasterStubCache(IOException error,
+    ClientMetaService.Interface currentStub, AtomicReference<ClientMetaService.Interface> stub) {
+    if (ClientExceptionsUtil.isConnectionException(error) ||
+      error instanceof ServerNotRunningYetException) {
+      stub.compareAndSet(currentStub, null);
+    }
+  }
+
+  public static <T> CompletableFuture<T> getMasterStub(ConnectionRegistry registry,
+    AtomicReference<T> stub, AtomicReference<CompletableFuture<T>> stubMakeFuture,
+    RpcClient rpcClient, User user, long rpcTimeout, TimeUnit unit,
+    Function<RpcChannel, T> stubMaker, String type) {
+    return getOrFetch(stub, stubMakeFuture, () -> {
+      CompletableFuture<T> future = new CompletableFuture<>();
+      addListener(registry.getActiveMaster(), (addr, error) -> {
+        if (error != null) {
+          future.completeExceptionally(error);
+        } else if (addr == null) {
+          future.completeExceptionally(new MasterNotRunningException(
+            "ZooKeeper available but no active master location found"));
+        } else {
+          LOG.debug("The fetched master address is {}", addr);
+          try {
+            future.complete(stubMaker.apply(
+              rpcClient.createRpcChannel(addr, user, toIntNoOverflow(unit.toMillis(rpcTimeout)))));
+          } catch (IOException e) {
+            future.completeExceptionally(e);
+          }
+        }
+
+      });
+      return future;
+    }, type);
+  }
+
+  private static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> cacheRef,
+    AtomicReference<CompletableFuture<T>> futureRef, 
+    Supplier<CompletableFuture<T>> fetch, String type) {
+    for (;;) {
+      T cachedValue = cacheRef.get();
+      if (cachedValue != null) {
+        return CompletableFuture.completedFuture(cachedValue);
+      }
+      LOG.trace("{} cache is null, try fetching from registry", type);
+      if (futureRef.compareAndSet(null, new CompletableFuture<>())) {
+        LOG.debug("Start fetching {} from registry", type);
+        CompletableFuture<T> future = futureRef.get();
+        addListener(fetch.get(), (value, error) -> {
+          if (error != null) {
+            LOG.debug("Failed to fetch {} from registry", type, error);
+            futureRef.getAndSet(null).completeExceptionally(error);
+            return;
+          }
+          LOG.debug("The fetched {} is {}", type, value);
+          // Here we update cache before reset future, so it is possible that someone can get a
+          // stale value. Consider this:
+          // 1. update cacheRef
+          // 2. someone clears the cache and relocates again
+          // 3. the futureRef is not null so the old future is used.
+          // 4. we clear futureRef and complete the future in it with the value being
+          // cleared in step 2.
+          // But we do not think it is a big deal as it rarely happens, and even if it happens, the
+          // caller will retry again later, no correctness problems.
+          cacheRef.set(value);
+          futureRef.set(null);
+          future.complete(value);
+        });
+        return future;
+      } else {
+        CompletableFuture<T> future = futureRef.get();
+        if (future != null) {
+          return future;
+        }
+      }
+    }
+  }
+
+  public static CompletableFuture<List<HRegionLocation>> getAllMetaRegionLocations(
+    boolean excludeOfflinedSplitParents,
+    CompletableFuture<ClientMetaService.Interface> getStubFuture,
+    AtomicReference<ClientMetaService.Interface> stubRef,
+    RpcControllerFactory rpcControllerFactory, int callTimeoutMs) {

Review comment:
       The class is IA.Private so I think we could do this later when we really have the requirement?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org