You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/24 07:48:32 UTC
[hbase] branch master updated: HBASE-21761 Align the methods in
RegionLocator and AsyncTableRegionLocator
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 416b70f HBASE-21761 Align the methods in RegionLocator and AsyncTableRegionLocator
416b70f is described below
commit 416b70f461694bf981d508b49368cc66881c2ef3
Author: zhangduo <zh...@apache.org>
AuthorDate: Wed Jan 23 21:26:56 2019 +0800
HBASE-21761 Align the methods in RegionLocator and AsyncTableRegionLocator
Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
.../hadoop/hbase/client/AsyncConnectionImpl.java | 2 +-
.../hbase/client/AsyncTableRegionLocator.java | 74 +++++++++++-
.../hbase/client/AsyncTableRegionLocatorImpl.java | 89 ++++----------
.../apache/hadoop/hbase/client/RegionLocator.java | 3 +
.../org/apache/hadoop/hbase/util/FutureUtils.java | 33 ++---
...Locator.java => AbstractTestRegionLocator.java} | 109 +++++++++--------
.../hbase/client/TestAsyncTableRegionLocator.java | 116 ++++++++----------
.../hadoop/hbase/client/TestRegionLocator.java | 133 +++++----------------
8 files changed, 250 insertions(+), 309 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 3cbd950..4f6f083 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -149,7 +149,7 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override
public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
- return new AsyncTableRegionLocatorImpl(tableName, locator);
+ return new AsyncTableRegionLocatorImpl(tableName, this);
}
// we will override this method for testing retry caller, so do not remove this method.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
index f67204a..321f44e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
@@ -17,10 +17,13 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -56,7 +59,7 @@ public interface AsyncTableRegionLocator {
* @param reload true to reload information or false to use cached information
*/
default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
- return getRegionLocation(row, RegionReplicaUtil.DEFAULT_REPLICA_ID, reload);
+ return getRegionLocation(row, RegionInfo.DEFAULT_REPLICA_ID, reload);
}
/**
@@ -83,8 +86,77 @@ public interface AsyncTableRegionLocator {
CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, boolean reload);
/**
+ * Find all the replicas for the region on which the given row is being served.
+ * @param row Row to find.
+ * @return Locations for all the replicas of the row.
+ * @throws IOException if a remote or network exception occurs
+ */
+ default CompletableFuture<List<HRegionLocation>> getRegionLocations(byte[] row) {
+ return getRegionLocations(row, false);
+ }
+
+ /**
+ * Find all the replicas for the region on which the given row is being served.
+ * @param row Row to find.
+ * @param reload true to reload information or false to use cached information
+ * @return Locations for all the replicas of the row.
+ * @throws IOException if a remote or network exception occurs
+ */
+ CompletableFuture<List<HRegionLocation>> getRegionLocations(byte[] row, boolean reload);
+
+ /**
* Retrieves all of the regions associated with this table.
+ * <p/>
+ * Usually we will go to meta table directly in this method so there is no {@code reload}
+ * parameter.
+ * <p/>
+ * Notice that the location for region replicas other than the default replica are also returned.
* @return a {@link List} of all regions associated with this table.
*/
CompletableFuture<List<HRegionLocation>> getAllRegionLocations();
+
+ /**
+ * Gets the starting row key for every region in the currently open table.
+ * <p>
+ * This is mainly useful for the MapReduce integration.
+ * @return Array of region starting row keys
+ * @throws IOException if a remote or network exception occurs
+ */
+ default CompletableFuture<List<byte[]>> getStartKeys() throws IOException {
+ return getStartEndKeys().thenApply(
+ startEndKeys -> startEndKeys.stream().map(Pair::getFirst).collect(Collectors.toList()));
+ }
+
+ /**
+ * Gets the ending row key for every region in the currently open table.
+ * <p>
+ * This is mainly useful for the MapReduce integration.
+ * @return Array of region ending row keys
+ * @throws IOException if a remote or network exception occurs
+ */
+ default CompletableFuture<List<byte[]>> getEndKeys() throws IOException {
+ return getStartEndKeys().thenApply(
+ startEndKeys -> startEndKeys.stream().map(Pair::getSecond).collect(Collectors.toList()));
+ }
+
+ /**
+ * Gets the starting and ending row keys for every region in the currently open table.
+ * <p>
+ * This is mainly useful for the MapReduce integration.
+ * @return Pair of arrays of region starting and ending row keys
+ * @throws IOException if a remote or network exception occurs
+ */
+ default CompletableFuture<List<Pair<byte[], byte[]>>> getStartEndKeys() throws IOException {
+ return getAllRegionLocations().thenApply(
+ locs -> locs.stream().filter(loc -> RegionReplicaUtil.isDefaultReplica(loc.getRegion()))
+ .map(HRegionLocation::getRegion).map(r -> Pair.newPair(r.getStartKey(), r.getEndKey()))
+ .collect(Collectors.toList()));
+ }
+
+ /**
+ * Clear all the entries in the region location cache.
+ * <p/>
+ * This may cause performance issue so use it with caution.
+ */
+ void clearRegionLocationCache();
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
index 606ee7a..145e975 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -17,20 +17,15 @@
*/
package org.apache.hadoop.hbase.client;
+import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
/**
* The implementation of AsyncRegionLocator.
*/
@@ -39,11 +34,11 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
private final TableName tableName;
- private final AsyncRegionLocator locator;
+ private final AsyncConnectionImpl conn;
- public AsyncTableRegionLocatorImpl(TableName tableName, AsyncRegionLocator locator) {
+ public AsyncTableRegionLocatorImpl(TableName tableName, AsyncConnectionImpl conn) {
this.tableName = tableName;
- this.locator = locator;
+ this.conn = conn;
}
@Override
@@ -54,67 +49,29 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
@Override
public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId,
boolean reload) {
- return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload,
- -1L);
+ return conn.getLocator().getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT,
+ reload, -1L);
}
- // this is used to prevent stack overflow if there are thousands of regions for the table. If the
- // location is in cache, the CompletableFuture will be completed immediately inside the same
- // thread, and then in the action we will call locate again, also in the same thread. If all the
- // locations are in cache, and we do not use whenCompleteAsync to break the tie, the stack will be
- // very very deep and cause stack overflow.
- @VisibleForTesting
- static final ThreadLocal<MutableInt> STACK_DEPTH = new ThreadLocal<MutableInt>() {
-
- @Override
- protected MutableInt initialValue() {
- return new MutableInt(0);
+ @Override
+ public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
+ if (TableName.isMetaTableName(tableName)) {
+ return conn.registry.getMetaRegionLocation()
+ .thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
}
- };
-
- @VisibleForTesting
- static final int MAX_STACK_DEPTH = 16;
+ return AsyncMetaTableAccessor.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME),
+ Optional.of(tableName));
+ }
- private void locate(CompletableFuture<List<HRegionLocation>> future,
- ConcurrentLinkedQueue<HRegionLocation> result, byte[] row) {
- BiConsumer<HRegionLocation, Throwable> listener = (loc, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- result.add(loc);
- if (ConnectionUtils.isEmptyStartRow(loc.getRegion().getStartKey())) {
- future.complete(result.stream()
- .sorted((l1, l2) -> RegionInfo.COMPARATOR.compare(l1.getRegion(), l2.getRegion()))
- .collect(Collectors.toList()));
- } else {
- locate(future, result, loc.getRegion().getStartKey());
- }
- };
- MutableInt depth = STACK_DEPTH.get();
- boolean async = depth.incrementAndGet() >= MAX_STACK_DEPTH;
- try {
- CompletableFuture<HRegionLocation> f =
- locator.getRegionLocation(tableName, row, RegionLocateType.BEFORE, -1L);
- if (async) {
- FutureUtils.addListenerAsync(f, listener);
- } else {
- FutureUtils.addListener(f, listener);
- }
- } finally {
- if (depth.decrementAndGet() == 0) {
- STACK_DEPTH.remove();
- }
- }
+ @Override
+ public CompletableFuture<List<HRegionLocation>> getRegionLocations(byte[] row, boolean reload) {
+ return conn.getLocator()
+ .getRegionLocations(tableName, row, RegionLocateType.CURRENT, reload, -1L)
+ .thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
}
@Override
- public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
- ConcurrentLinkedQueue<HRegionLocation> result = new ConcurrentLinkedQueue<>();
- CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
- // start from end to start, as when locating we will do reverse scan, so we will prefetch the
- // location of the regions before the current one.
- locate(future, result, HConstants.EMPTY_END_ROW);
- return future;
+ public void clearRegionLocationCache() {
+ conn.getLocator().clearCache(tableName);
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java
index fbea0f5..c7440c6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java
@@ -108,6 +108,9 @@ public interface RegionLocator extends Closeable {
/**
* Retrieves all of the regions associated with this table.
* <p/>
+ * Usually we will go to meta table directly in this method so there is no {@code reload}
+ * parameter.
+ * <p/>
* Notice that the location for region replicas other than the default replica are also returned.
* @return a {@link List} of all regions associated with this table.
* @throws IOException if a remote or network exception occurs
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
index 6c3e026..02ce655 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -17,12 +17,18 @@
*/
package org.apache.hadoop.hbase.util;
+import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+
/**
* Helper class for processing futures.
*/
@@ -59,21 +65,18 @@ public final class FutureUtils {
}
/**
- * Almost the same with the {@link #addListener(CompletableFuture, BiConsumer)} method above, the
- * difference is that in this method we will call
- * {@link CompletableFuture#whenCompleteAsync(BiConsumer)} instead of
- * {@link CompletableFuture#whenComplete(BiConsumer)}.
- * @see #addListener(CompletableFuture, BiConsumer)
+ * A helper class for getting the result of a Future, and convert the error to an
+ * {@link IOException}.
*/
- @SuppressWarnings("FutureReturnValueIgnored")
- public static <T> void addListenerAsync(CompletableFuture<T> future,
- BiConsumer<? super T, ? super Throwable> action) {
- future.whenCompleteAsync((resp, error) -> {
- try {
- action.accept(resp, error);
- } catch (Throwable t) {
- LOG.error("Unexpected error caught when processing CompletableFuture", t);
- }
- });
+ public static <T> T get(Future<T> future) throws IOException {
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ throw new IOException(cause);
+ }
}
}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
similarity index 58%
copy from hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocator.java
copy to hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
index e0634ae..b21c33f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
@@ -19,48 +19,34 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
-@Category({ MediumTests.class, ClientTests.class })
-public class TestRegionLocator {
+public abstract class AbstractTestRegionLocator {
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestRegionLocator.class);
+ protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ protected static TableName TABLE_NAME = TableName.valueOf("Locator");
- private static TableName TABLE_NAME = TableName.valueOf("Locator");
+ protected static byte[] FAMILY = Bytes.toBytes("family");
- private static byte[] FAMILY = Bytes.toBytes("family");
+ protected static int REGION_REPLICATION = 3;
- private static int REGION_REPLICATION = 3;
+ protected static byte[][] SPLIT_KEYS;
- private static byte[][] SPLIT_KEYS;
-
- @BeforeClass
- public static void setUp() throws Exception {
+ protected static void startClusterAndCreateTable() throws Exception {
UTIL.startMiniCluster(3);
TableDescriptor td =
TableDescriptorBuilder.newBuilder(TABLE_NAME).setRegionReplication(REGION_REPLICATION)
@@ -74,16 +60,9 @@ public class TestRegionLocator {
UTIL.getAdmin().balancerSwitch(false, true);
}
- @AfterClass
- public static void tearDown() throws Exception {
- UTIL.shutdownMiniCluster();
- }
-
@After
public void tearDownAfterTest() throws IOException {
- try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) {
- locator.clearRegionLocationCache();
- }
+ clearCache();
}
private byte[] getStartKey(int index) {
@@ -110,13 +89,11 @@ public class TestRegionLocator {
@Test
public void testStartEndKeys() throws IOException {
- try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) {
- assertStartKeys(locator.getStartKeys());
- assertEndKeys(locator.getEndKeys());
- Pair<byte[][], byte[][]> startEndKeys = locator.getStartEndKeys();
- assertStartKeys(startEndKeys.getFirst());
- assertEndKeys(startEndKeys.getSecond());
- }
+ assertStartKeys(getStartKeys());
+ assertEndKeys(getEndKeys());
+ Pair<byte[][], byte[][]> startEndKeys = getStartEndKeys();
+ assertStartKeys(startEndKeys.getFirst());
+ assertEndKeys(startEndKeys.getSecond());
}
private void assertRegionLocation(HRegionLocation loc, int index, int replicaId) {
@@ -135,32 +112,54 @@ public class TestRegionLocator {
@Test
public void testGetRegionLocation() throws IOException {
- try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) {
- for (int i = 0; i <= SPLIT_KEYS.length; i++) {
- for (int replicaId = 0; replicaId < REGION_REPLICATION; replicaId++) {
- assertRegionLocation(locator.getRegionLocation(getStartKey(i), replicaId), i, replicaId);
- }
+ for (int i = 0; i <= SPLIT_KEYS.length; i++) {
+ for (int replicaId = 0; replicaId < REGION_REPLICATION; replicaId++) {
+ assertRegionLocation(getRegionLocation(getStartKey(i), replicaId), i, replicaId);
+ }
+ }
+ }
+
+ @Test
+ public void testGetRegionLocations() throws IOException {
+ for (int i = 0; i <= SPLIT_KEYS.length; i++) {
+ List<HRegionLocation> locs = getRegionLocations(getStartKey(i));
+ assertEquals(REGION_REPLICATION, locs.size());
+ for (int replicaId = 0; replicaId < REGION_REPLICATION; replicaId++) {
+ assertRegionLocation(locs.get(replicaId), i, replicaId);
}
}
}
@Test
public void testGetAllRegionLocations() throws IOException {
- try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) {
- List<HRegionLocation> locs = locator.getAllRegionLocations();
- assertEquals(REGION_REPLICATION * (SPLIT_KEYS.length + 1), locs.size());
- Collections.sort(locs, (l1, l2) -> {
- int c = Bytes.compareTo(l1.getRegion().getStartKey(), l2.getRegion().getStartKey());
- if (c != 0) {
- return c;
- }
- return Integer.compare(l1.getRegion().getReplicaId(), l2.getRegion().getReplicaId());
- });
- for (int i = 0; i <= SPLIT_KEYS.length; i++) {
- for (int replicaId = 0; replicaId < REGION_REPLICATION; replicaId++) {
- assertRegionLocation(locs.get(i * REGION_REPLICATION + replicaId), i, replicaId);
- }
+ List<HRegionLocation> locs = getAllRegionLocations();
+ assertEquals(REGION_REPLICATION * (SPLIT_KEYS.length + 1), locs.size());
+ Collections.sort(locs, (l1, l2) -> {
+ int c = Bytes.compareTo(l1.getRegion().getStartKey(), l2.getRegion().getStartKey());
+ if (c != 0) {
+ return c;
+ }
+ return Integer.compare(l1.getRegion().getReplicaId(), l2.getRegion().getReplicaId());
+ });
+ for (int i = 0; i <= SPLIT_KEYS.length; i++) {
+ for (int replicaId = 0; replicaId < REGION_REPLICATION; replicaId++) {
+ assertRegionLocation(locs.get(i * REGION_REPLICATION + replicaId), i, replicaId);
}
}
}
+
+ protected abstract byte[][] getStartKeys() throws IOException;
+
+ protected abstract byte[][] getEndKeys() throws IOException;
+
+ protected abstract Pair<byte[][], byte[][]> getStartEndKeys() throws IOException;
+
+ protected abstract HRegionLocation getRegionLocation(byte[] row, int replicaId)
+ throws IOException;
+
+ protected abstract List<HRegionLocation> getRegionLocations(byte[] row) throws IOException;
+
+ protected abstract List<HRegionLocation> getAllRegionLocations() throws IOException;
+
+ protected abstract void clearCache() throws IOException;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java
index 8652004..f32693a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java
@@ -17,102 +17,86 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.commons.math3.util.Pair;
+import static org.apache.hadoop.hbase.util.FutureUtils.get;
+
+import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncTableRegionLocator {
+public class TestAsyncTableRegionLocator extends AbstractTestRegionLocator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncTableRegionLocator.class);
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
- private static TableName TABLE_NAME = TableName.valueOf("async");
-
private static AsyncConnection CONN;
+ private static AsyncTableRegionLocator LOCATOR;
+
@BeforeClass
public static void setUp() throws Exception {
- TEST_UTIL.startMiniCluster(3);
- TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("cf"));
- TEST_UTIL.waitTableAvailable(TABLE_NAME);
- TEST_UTIL.getAdmin().balancerSwitch(false, true);
- CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+ startClusterAndCreateTable();
+ CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+ LOCATOR = CONN.getRegionLocator(TABLE_NAME);
}
@AfterClass
public static void tearDown() throws Exception {
Closeables.close(CONN, true);
- TEST_UTIL.shutdownMiniCluster();
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Override
+ protected byte[][] getStartKeys() throws IOException {
+ return get(LOCATOR.getStartKeys()).toArray(new byte[0][]);
+ }
+
+ @Override
+ protected byte[][] getEndKeys() throws IOException {
+ return get(LOCATOR.getEndKeys()).toArray(new byte[0][]);
}
- private void assertLocEquals(Map<RegionInfo, ServerName> region2Loc)
- throws InterruptedException, ExecutionException {
- for (HRegionLocation loc : CONN.getRegionLocator(TABLE_NAME).getAllRegionLocations().get()) {
- ServerName expected = region2Loc.remove(loc.getRegion());
- assertNotNull(expected);
- assertEquals(expected, loc.getServerName());
+ @Override
+ protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
+ List<Pair<byte[], byte[]>> startEndKeys = get(LOCATOR.getStartEndKeys());
+ byte[][] startKeys = new byte[startEndKeys.size()][];
+ byte[][] endKeys = new byte[startEndKeys.size()][];
+ for (int i = 0, n = startEndKeys.size(); i < n; i++) {
+ Pair<byte[], byte[]> pair = startEndKeys.get(i);
+ startKeys[i] = pair.getFirst();
+ endKeys[i] = pair.getSecond();
}
+ return Pair.newPair(startKeys, endKeys);
+ }
+
+ @Override
+ protected HRegionLocation getRegionLocation(byte[] row, int replicaId) throws IOException {
+ return get(LOCATOR.getRegionLocation(row, replicaId));
+ }
+
+ @Override
+ protected List<HRegionLocation> getRegionLocations(byte[] row) throws IOException {
+ return get(LOCATOR.getRegionLocations(row));
+ }
+
+ @Override
+ protected List<HRegionLocation> getAllRegionLocations() throws IOException {
+ return get(LOCATOR.getAllRegionLocations());
}
- @Test
- public void testGetAll() throws InterruptedException, ExecutionException {
- Map<RegionInfo, ServerName> region2Loc = TEST_UTIL.getMiniHBaseCluster()
- .getRegionServerThreads().stream().map(t -> t.getRegionServer())
- .flatMap(rs -> rs.getRegions(TABLE_NAME).stream()
- .map(r -> Pair.create(r.getRegionInfo(), rs.getServerName())))
- .collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
- MutableInt maxDepth = new MutableInt(0);
- MutableInt depth = new MutableInt(0) {
-
- private static final long serialVersionUID = 5887112211305087650L;
-
- @Override
- public int incrementAndGet() {
- int val = super.incrementAndGet();
- if (val > maxDepth.intValue()) {
- maxDepth.setValue(val);
- }
- return val;
- }
- };
- // first time, read from meta
- AsyncTableRegionLocatorImpl.STACK_DEPTH.set(depth);
- assertLocEquals(new HashMap<>(region2Loc));
- assertTrue(maxDepth.intValue() > 0);
- assertTrue(maxDepth.intValue() <= AsyncTableRegionLocatorImpl.MAX_STACK_DEPTH);
-
- // second time, read from cache
- maxDepth.setValue(0);
- depth.setValue(0);
- AsyncTableRegionLocatorImpl.STACK_DEPTH.set(depth);
- assertLocEquals(new HashMap<>(region2Loc));
- assertTrue(maxDepth.intValue() > 0);
- assertTrue(maxDepth.intValue() <= AsyncTableRegionLocatorImpl.MAX_STACK_DEPTH);
+ @Override
+ protected void clearCache() throws IOException {
+ LOCATOR.clearRegionLocationCache();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocator.java
index e0634ae..49ce8b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocator.java
@@ -17,150 +17,73 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
@Category({ MediumTests.class, ClientTests.class })
-public class TestRegionLocator {
+public class TestRegionLocator extends AbstractTestRegionLocator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionLocator.class);
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-
- private static TableName TABLE_NAME = TableName.valueOf("Locator");
-
- private static byte[] FAMILY = Bytes.toBytes("family");
-
- private static int REGION_REPLICATION = 3;
-
- private static byte[][] SPLIT_KEYS;
+ private static RegionLocator LOCATOR;
@BeforeClass
public static void setUp() throws Exception {
- UTIL.startMiniCluster(3);
- TableDescriptor td =
- TableDescriptorBuilder.newBuilder(TABLE_NAME).setRegionReplication(REGION_REPLICATION)
- .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
- SPLIT_KEYS = new byte[9][];
- for (int i = 0; i < 9; i++) {
- SPLIT_KEYS[i] = Bytes.toBytes(Integer.toString(i + 1));
- }
- UTIL.getAdmin().createTable(td, SPLIT_KEYS);
- UTIL.waitTableAvailable(TABLE_NAME);
- UTIL.getAdmin().balancerSwitch(false, true);
+ startClusterAndCreateTable();
+ LOCATOR = UTIL.getConnection().getRegionLocator(TABLE_NAME);
}
@AfterClass
public static void tearDown() throws Exception {
+ Closeables.close(LOCATOR, true);
UTIL.shutdownMiniCluster();
}
- @After
- public void tearDownAfterTest() throws IOException {
- try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) {
- locator.clearRegionLocationCache();
- }
- }
-
- private byte[] getStartKey(int index) {
- return index == 0 ? HConstants.EMPTY_START_ROW : SPLIT_KEYS[index - 1];
- }
-
- private byte[] getEndKey(int index) {
- return index == SPLIT_KEYS.length ? HConstants.EMPTY_END_ROW : SPLIT_KEYS[index];
+ @Override
+ protected byte[][] getStartKeys() throws IOException {
+ return LOCATOR.getStartKeys();
}
- private void assertStartKeys(byte[][] startKeys) {
- assertEquals(SPLIT_KEYS.length + 1, startKeys.length);
- for (int i = 0; i < startKeys.length; i++) {
- assertArrayEquals(getStartKey(i), startKeys[i]);
- }
+ @Override
+ protected byte[][] getEndKeys() throws IOException {
+ return LOCATOR.getEndKeys();
}
- private void assertEndKeys(byte[][] endKeys) {
- assertEquals(SPLIT_KEYS.length + 1, endKeys.length);
- for (int i = 0; i < endKeys.length; i++) {
- assertArrayEquals(getEndKey(i), endKeys[i]);
- }
+ @Override
+ protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
+ return LOCATOR.getStartEndKeys();
}
- @Test
- public void testStartEndKeys() throws IOException {
- try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) {
- assertStartKeys(locator.getStartKeys());
- assertEndKeys(locator.getEndKeys());
- Pair<byte[][], byte[][]> startEndKeys = locator.getStartEndKeys();
- assertStartKeys(startEndKeys.getFirst());
- assertEndKeys(startEndKeys.getSecond());
- }
+ @Override
+ protected HRegionLocation getRegionLocation(byte[] row, int replicaId) throws IOException {
+ return LOCATOR.getRegionLocation(row, replicaId);
}
- private void assertRegionLocation(HRegionLocation loc, int index, int replicaId) {
- RegionInfo region = loc.getRegion();
- byte[] startKey = getStartKey(index);
- assertArrayEquals(startKey, region.getStartKey());
- assertArrayEquals(getEndKey(index), region.getEndKey());
- assertEquals(replicaId, region.getReplicaId());
- ServerName expected =
- UTIL.getMiniHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
- .filter(rs -> rs.getRegions(TABLE_NAME).stream().map(Region::getRegionInfo)
- .anyMatch(r -> r.containsRow(startKey) && r.getReplicaId() == replicaId))
- .findFirst().get().getServerName();
- assertEquals(expected, loc.getServerName());
+ @Override
+ protected List<HRegionLocation> getRegionLocations(byte[] row) throws IOException {
+ return LOCATOR.getRegionLocations(row);
}
- @Test
- public void testGetRegionLocation() throws IOException {
- try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) {
- for (int i = 0; i <= SPLIT_KEYS.length; i++) {
- for (int replicaId = 0; replicaId < REGION_REPLICATION; replicaId++) {
- assertRegionLocation(locator.getRegionLocation(getStartKey(i), replicaId), i, replicaId);
- }
- }
- }
+ @Override
+ protected List<HRegionLocation> getAllRegionLocations() throws IOException {
+ return LOCATOR.getAllRegionLocations();
}
- @Test
- public void testGetAllRegionLocations() throws IOException {
- try (RegionLocator locator = UTIL.getConnection().getRegionLocator(TABLE_NAME)) {
- List<HRegionLocation> locs = locator.getAllRegionLocations();
- assertEquals(REGION_REPLICATION * (SPLIT_KEYS.length + 1), locs.size());
- Collections.sort(locs, (l1, l2) -> {
- int c = Bytes.compareTo(l1.getRegion().getStartKey(), l2.getRegion().getStartKey());
- if (c != 0) {
- return c;
- }
- return Integer.compare(l1.getRegion().getReplicaId(), l2.getRegion().getReplicaId());
- });
- for (int i = 0; i <= SPLIT_KEYS.length; i++) {
- for (int replicaId = 0; replicaId < REGION_REPLICATION; replicaId++) {
- assertRegionLocation(locs.get(i * REGION_REPLICATION + replicaId), i, replicaId);
- }
- }
- }
+ @Override
+ protected void clearCache() throws IOException {
+ LOCATOR.clearRegionLocationCache();
}
}