You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/19 03:18:42 UTC
[hbase] branch master updated: HBASE-21726 Add
getAllRegionLocations method to AsyncTableRegionLocator
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 882bd56 HBASE-21726 Add getAllRegionLocations method to AsyncTableRegionLocator
882bd56 is described below
commit 882bd564f4d20c0682872af6c0e12d04af3f7e22
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Thu Jan 17 17:42:00 2019 +0800
HBASE-21726 Add getAllRegionLocations method to AsyncTableRegionLocator
---
.../hbase/client/AsyncTableRegionLocator.java | 15 ++-
.../hbase/client/AsyncTableRegionLocatorImpl.java | 70 +++++++++++-
.../org/apache/hadoop/hbase/util/FutureUtils.java | 19 ++++
.../hbase/client/TestAsyncTableRegionLocator.java | 118 +++++++++++++++++++++
4 files changed, 217 insertions(+), 5 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
index 3bda38e..f67204a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
@@ -39,7 +40,7 @@ public interface AsyncTableRegionLocator {
/**
* Finds the region on which the given row is being served. Does not reload the cache.
- * <p>
+ * <p/>
* Returns the location of the region to which the row belongs.
* @param row Row to find.
*/
@@ -49,7 +50,7 @@ public interface AsyncTableRegionLocator {
/**
* Finds the region on which the given row is being served.
- * <p>
+ * <p/>
* Returns the location of the region to which the row belongs.
* @param row Row to find.
* @param reload true to reload information or false to use cached information
@@ -60,7 +61,7 @@ public interface AsyncTableRegionLocator {
/**
* Finds the region with the given <code>replicaId</code> on which the given row is being served.
- * <p>
+ * <p/>
* Returns the location of the region with the given <code>replicaId</code> to which the row
* belongs.
* @param row Row to find.
@@ -72,7 +73,7 @@ public interface AsyncTableRegionLocator {
/**
* Finds the region with the given <code>replicaId</code> on which the given row is being served.
- * <p>
+ * <p/>
* Returns the location of the region with the given <code>replicaId</code> to which the row
* belongs.
* @param row Row to find.
@@ -80,4 +81,10 @@ public interface AsyncTableRegionLocator {
* @param reload true to reload information or false to use cached information
*/
CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, boolean reload);
+
+ /**
+ * Retrieves all of the regions associated with this table.
+ * @return a {@link List} of all regions associated with this table.
+ */
+ CompletableFuture<List<HRegionLocation>> getAllRegionLocations();
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
index 465a411..606ee7a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -17,12 +17,20 @@
*/
package org.apache.hadoop.hbase.client;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
-
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
/**
* The implementation of AsyncRegionLocator.
*/
@@ -49,4 +57,64 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload,
-1L);
}
+
+ // this is used to prevent stack overflow if there are thousands of regions for the table. If the
+ // location is in cache, the CompletableFuture will be completed immediately inside the same
+ // thread, and then in the action we will call locate again, also in the same thread. If all the
+ // locations are in cache, and we do not use whenCompleteAsync to break the tie, the stack will be
+ // very very deep and cause stack overflow.
+ @VisibleForTesting
+ static final ThreadLocal<MutableInt> STACK_DEPTH = new ThreadLocal<MutableInt>() {
+
+ @Override
+ protected MutableInt initialValue() {
+ return new MutableInt(0);
+ }
+ };
+
+ @VisibleForTesting
+ static final int MAX_STACK_DEPTH = 16;
+
+ private void locate(CompletableFuture<List<HRegionLocation>> future,
+ ConcurrentLinkedQueue<HRegionLocation> result, byte[] row) {
+ BiConsumer<HRegionLocation, Throwable> listener = (loc, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ result.add(loc);
+ if (ConnectionUtils.isEmptyStartRow(loc.getRegion().getStartKey())) {
+ future.complete(result.stream()
+ .sorted((l1, l2) -> RegionInfo.COMPARATOR.compare(l1.getRegion(), l2.getRegion()))
+ .collect(Collectors.toList()));
+ } else {
+ locate(future, result, loc.getRegion().getStartKey());
+ }
+ };
+ MutableInt depth = STACK_DEPTH.get();
+ boolean async = depth.incrementAndGet() >= MAX_STACK_DEPTH;
+ try {
+ CompletableFuture<HRegionLocation> f =
+ locator.getRegionLocation(tableName, row, RegionLocateType.BEFORE, -1L);
+ if (async) {
+ FutureUtils.addListenerAsync(f, listener);
+ } else {
+ FutureUtils.addListener(f, listener);
+ }
+ } finally {
+ if (depth.decrementAndGet() == 0) {
+ STACK_DEPTH.remove();
+ }
+ }
+ }
+
+ @Override
+ public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
+ ConcurrentLinkedQueue<HRegionLocation> result = new ConcurrentLinkedQueue<>();
+ CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
+ // start from end to start, as when locating we will do reverse scan, so we will prefetch the
+ // location of the regions before the current one.
+ locate(future, result, HConstants.EMPTY_END_ROW);
+ return future;
+ }
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
index 067e66b..6c3e026 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -57,4 +57,23 @@ public final class FutureUtils {
}
});
}
+
+ /**
+ * Almost the same with the {@link #addListener(CompletableFuture, BiConsumer)} method above, the
+ * difference is that in this method we will call
+ * {@link CompletableFuture#whenCompleteAsync(BiConsumer)} instead of
+ * {@link CompletableFuture#whenComplete(BiConsumer)}.
+ * @see #addListener(CompletableFuture, BiConsumer)
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public static <T> void addListenerAsync(CompletableFuture<T> future,
+ BiConsumer<? super T, ? super Throwable> action) {
+ future.whenCompleteAsync((resp, error) -> {
+ try {
+ action.accept(resp, error);
+ } catch (Throwable t) {
+ LOG.error("Unexpected error caught when processing CompletableFuture", t);
+ }
+ });
+ }
}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java
new file mode 100644
index 0000000..8652004
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableRegionLocator {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncTableRegionLocator.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("async");
+
+ private static AsyncConnection CONN;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL.startMiniCluster(3);
+ TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("cf"));
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ TEST_UTIL.getAdmin().balancerSwitch(false, true);
+ CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ Closeables.close(CONN, true);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private void assertLocEquals(Map<RegionInfo, ServerName> region2Loc)
+ throws InterruptedException, ExecutionException {
+ for (HRegionLocation loc : CONN.getRegionLocator(TABLE_NAME).getAllRegionLocations().get()) {
+ ServerName expected = region2Loc.remove(loc.getRegion());
+ assertNotNull(expected);
+ assertEquals(expected, loc.getServerName());
+ }
+ }
+
+ @Test
+ public void testGetAll() throws InterruptedException, ExecutionException {
+ Map<RegionInfo, ServerName> region2Loc = TEST_UTIL.getMiniHBaseCluster()
+ .getRegionServerThreads().stream().map(t -> t.getRegionServer())
+ .flatMap(rs -> rs.getRegions(TABLE_NAME).stream()
+ .map(r -> Pair.create(r.getRegionInfo(), rs.getServerName())))
+ .collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
+ MutableInt maxDepth = new MutableInt(0);
+ MutableInt depth = new MutableInt(0) {
+
+ private static final long serialVersionUID = 5887112211305087650L;
+
+ @Override
+ public int incrementAndGet() {
+ int val = super.incrementAndGet();
+ if (val > maxDepth.intValue()) {
+ maxDepth.setValue(val);
+ }
+ return val;
+ }
+ };
+ // first time, read from meta
+ AsyncTableRegionLocatorImpl.STACK_DEPTH.set(depth);
+ assertLocEquals(new HashMap<>(region2Loc));
+ assertTrue(maxDepth.intValue() > 0);
+ assertTrue(maxDepth.intValue() <= AsyncTableRegionLocatorImpl.MAX_STACK_DEPTH);
+
+ // second time, read from cache
+ maxDepth.setValue(0);
+ depth.setValue(0);
+ AsyncTableRegionLocatorImpl.STACK_DEPTH.set(depth);
+ assertLocEquals(new HashMap<>(region2Loc));
+ assertTrue(maxDepth.intValue() > 0);
+ assertTrue(maxDepth.intValue() <= AsyncTableRegionLocatorImpl.MAX_STACK_DEPTH);
+ }
+}