You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/19 03:19:03 UTC

[hbase] branch branch-2 updated: HBASE-21726 Add getAllRegionLocations method to AsyncTableRegionLocator

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new ff272e8  HBASE-21726 Add getAllRegionLocations method to AsyncTableRegionLocator
ff272e8 is described below

commit ff272e86839a791f370e702dfe9d09b7478232a1
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Thu Jan 17 17:42:00 2019 +0800

    HBASE-21726 Add getAllRegionLocations method to AsyncTableRegionLocator
---
 .../hbase/client/AsyncTableRegionLocator.java      |  15 ++-
 .../hbase/client/AsyncTableRegionLocatorImpl.java  |  70 +++++++++++-
 .../org/apache/hadoop/hbase/util/FutureUtils.java  |  19 ++++
 .../hbase/client/TestAsyncTableRegionLocator.java  | 118 +++++++++++++++++++++
 4 files changed, 217 insertions(+), 5 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
index 3bda38e..f67204a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
@@ -39,7 +40,7 @@ public interface AsyncTableRegionLocator {
 
   /**
    * Finds the region on which the given row is being served. Does not reload the cache.
-   * <p>
+   * <p/>
    * Returns the location of the region to which the row belongs.
    * @param row Row to find.
    */
@@ -49,7 +50,7 @@ public interface AsyncTableRegionLocator {
 
   /**
    * Finds the region on which the given row is being served.
-   * <p>
+   * <p/>
    * Returns the location of the region to which the row belongs.
    * @param row Row to find.
    * @param reload true to reload information or false to use cached information
@@ -60,7 +61,7 @@ public interface AsyncTableRegionLocator {
 
   /**
    * Finds the region with the given <code>replicaId</code> on which the given row is being served.
-   * <p>
+   * <p/>
    * Returns the location of the region with the given <code>replicaId</code> to which the row
    * belongs.
    * @param row Row to find.
@@ -72,7 +73,7 @@ public interface AsyncTableRegionLocator {
 
   /**
    * Finds the region with the given <code>replicaId</code> on which the given row is being served.
-   * <p>
+   * <p/>
    * Returns the location of the region with the given <code>replicaId</code> to which the row
    * belongs.
    * @param row Row to find.
@@ -80,4 +81,10 @@ public interface AsyncTableRegionLocator {
    * @param reload true to reload information or false to use cached information
    */
   CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, boolean reload);
+
+  /**
+   * Retrieves all of the regions associated with this table.
+   * @return a {@link List} of all regions associated with this table.
+   */
+  CompletableFuture<List<HRegionLocation>> getAllRegionLocations();
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
index 465a411..606ee7a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -17,12 +17,20 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
-
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
 /**
  * The implementation of AsyncRegionLocator.
  */
@@ -49,4 +57,64 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
     return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload,
       -1L);
   }
+
+  // this is used to prevent stack overflow if there are thousands of regions for the table. If the
+  // location is in cache, the CompletableFuture will be completed immediately inside the same
+  // thread, and then in the action we will call locate again, also in the same thread. If all the
+  // locations are in cache, and we do not use whenCompleteAsync to break the tie, the stack will be
+  // very very deep and cause stack overflow.
+  @VisibleForTesting
+  static final ThreadLocal<MutableInt> STACK_DEPTH = new ThreadLocal<MutableInt>() {
+
+    @Override
+    protected MutableInt initialValue() {
+      return new MutableInt(0);
+    }
+  };
+
+  @VisibleForTesting
+  static final int MAX_STACK_DEPTH = 16;
+
+  private void locate(CompletableFuture<List<HRegionLocation>> future,
+      ConcurrentLinkedQueue<HRegionLocation> result, byte[] row) {
+    BiConsumer<HRegionLocation, Throwable> listener = (loc, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      result.add(loc);
+      if (ConnectionUtils.isEmptyStartRow(loc.getRegion().getStartKey())) {
+        future.complete(result.stream()
+          .sorted((l1, l2) -> RegionInfo.COMPARATOR.compare(l1.getRegion(), l2.getRegion()))
+          .collect(Collectors.toList()));
+      } else {
+        locate(future, result, loc.getRegion().getStartKey());
+      }
+    };
+    MutableInt depth = STACK_DEPTH.get();
+    boolean async = depth.incrementAndGet() >= MAX_STACK_DEPTH;
+    try {
+      CompletableFuture<HRegionLocation> f =
+        locator.getRegionLocation(tableName, row, RegionLocateType.BEFORE, -1L);
+      if (async) {
+        FutureUtils.addListenerAsync(f, listener);
+      } else {
+        FutureUtils.addListener(f, listener);
+      }
+    } finally {
+      if (depth.decrementAndGet() == 0) {
+        STACK_DEPTH.remove();
+      }
+    }
+  }
+
+  @Override
+  public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
+    ConcurrentLinkedQueue<HRegionLocation> result = new ConcurrentLinkedQueue<>();
+    CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
+    // start from end to start, as when locating we will do reverse scan, so we will prefetch the
+    // location of the regions before the current one.
+    locate(future, result, HConstants.EMPTY_END_ROW);
+    return future;
+  }
 }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
index 067e66b..6c3e026 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -57,4 +57,23 @@ public final class FutureUtils {
       }
     });
   }
+
+  /**
+   * Almost the same with the {@link #addListener(CompletableFuture, BiConsumer)} method above, the
+   * difference is that in this method we will call
+   * {@link CompletableFuture#whenCompleteAsync(BiConsumer)} instead of
+   * {@link CompletableFuture#whenComplete(BiConsumer)}.
+   * @see #addListener(CompletableFuture, BiConsumer)
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  public static <T> void addListenerAsync(CompletableFuture<T> future,
+      BiConsumer<? super T, ? super Throwable> action) {
+    future.whenCompleteAsync((resp, error) -> {
+      try {
+        action.accept(resp, error);
+      } catch (Throwable t) {
+        LOG.error("Unexpected error caught when processing CompletableFuture", t);
+      }
+    });
+  }
 }
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java
new file mode 100644
index 0000000..8652004
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableRegionLocator {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncTableRegionLocator.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static AsyncConnection CONN;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("cf"));
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    TEST_UTIL.getAdmin().balancerSwitch(false, true);
+    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    Closeables.close(CONN, true);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private void assertLocEquals(Map<RegionInfo, ServerName> region2Loc)
+      throws InterruptedException, ExecutionException {
+    for (HRegionLocation loc : CONN.getRegionLocator(TABLE_NAME).getAllRegionLocations().get()) {
+      ServerName expected = region2Loc.remove(loc.getRegion());
+      assertNotNull(expected);
+      assertEquals(expected, loc.getServerName());
+    }
+  }
+
+  @Test
+  public void testGetAll() throws InterruptedException, ExecutionException {
+    Map<RegionInfo, ServerName> region2Loc = TEST_UTIL.getMiniHBaseCluster()
+      .getRegionServerThreads().stream().map(t -> t.getRegionServer())
+      .flatMap(rs -> rs.getRegions(TABLE_NAME).stream()
+        .map(r -> Pair.create(r.getRegionInfo(), rs.getServerName())))
+      .collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
+    MutableInt maxDepth = new MutableInt(0);
+    MutableInt depth = new MutableInt(0) {
+
+      private static final long serialVersionUID = 5887112211305087650L;
+
+      @Override
+      public int incrementAndGet() {
+        int val = super.incrementAndGet();
+        if (val > maxDepth.intValue()) {
+          maxDepth.setValue(val);
+        }
+        return val;
+      }
+    };
+    // first time, read from meta
+    AsyncTableRegionLocatorImpl.STACK_DEPTH.set(depth);
+    assertLocEquals(new HashMap<>(region2Loc));
+    assertTrue(maxDepth.intValue() > 0);
+    assertTrue(maxDepth.intValue() <= AsyncTableRegionLocatorImpl.MAX_STACK_DEPTH);
+
+    // second time, read from cache
+    maxDepth.setValue(0);
+    depth.setValue(0);
+    AsyncTableRegionLocatorImpl.STACK_DEPTH.set(depth);
+    assertLocEquals(new HashMap<>(region2Loc));
+    assertTrue(maxDepth.intValue() > 0);
+    assertTrue(maxDepth.intValue() <= AsyncTableRegionLocatorImpl.MAX_STACK_DEPTH);
+  }
+}