You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/11/01 02:20:25 UTC
hbase git commit: HBASE-16945 Implement AsyncRegionLocator
Repository: hbase
Updated Branches:
refs/heads/master 45a259424 -> 6cf9333e9
HBASE-16945 Implement AsyncRegionLocator
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6cf9333e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6cf9333e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6cf9333e
Branch: refs/heads/master
Commit: 6cf9333e978eb91d54935abdd6ebe9152e4ea183
Parents: 45a2594
Author: zhangduo <zh...@apache.org>
Authored: Mon Oct 31 20:39:43 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Nov 1 10:18:55 2016 +0800
----------------------------------------------------------------------
.../hbase/client/AsyncConnectionImpl.java | 11 +-
.../hadoop/hbase/client/AsyncRegionLocator.java | 442 +++++++++++++++++--
.../client/AsyncRpcRetryingCallerFactory.java | 13 +-
.../AsyncSingleRequestRpcRetryingCaller.java | 40 +-
.../client/AsyncTableRegionLocatorImpl.java | 2 +-
.../hadoop/hbase/util/CollectionUtils.java | 14 +
.../hbase/client/TestAsyncGetMultiThread.java | 148 +++++++
.../hbase/client/TestAsyncRegionLocator.java | 239 ++++++++++
...TestAsyncSingleRequestRpcRetryingCaller.java | 22 +-
9 files changed, 849 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 121a16b..6cad6a2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -66,7 +66,7 @@ class AsyncConnectionImpl implements AsyncConnection {
private final User user;
- private final AsyncRegistry registry;
+ final AsyncRegistry registry;
private final String clusterId;
@@ -87,15 +87,11 @@ class AsyncConnectionImpl implements AsyncConnection {
private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
@SuppressWarnings("deprecation")
- public AsyncConnectionImpl(Configuration conf, User user) throws IOException {
+ public AsyncConnectionImpl(Configuration conf, User user) {
this.conf = conf;
this.user = user;
-
this.connConf = new AsyncConnectionConfiguration(conf);
-
- this.locator = new AsyncRegionLocator(conf);
-
- // action below will not throw exception so no need to catch and close.
+ this.locator = new AsyncRegionLocator(this);
this.registry = ClusterRegistryFactory.getRegistry(conf);
this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> {
if (LOG.isDebugEnabled()) {
@@ -122,7 +118,6 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override
public void close() {
- IOUtils.closeQuietly(locator);
IOUtils.closeQuietly(rpcClient);
IOUtils.closeQuietly(registry);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 321fd71..ba5a0e0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -17,71 +17,437 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
+import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY;
+import static org.apache.hadoop.hbase.HConstants.NINES;
+import static org.apache.hadoop.hbase.HConstants.ZEROES;
+import static org.apache.hadoop.hbase.HRegionInfo.createRegionName;
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
+import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
+import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
-import java.io.Closeable;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.util.Bytes;
/**
- * TODO: reimplement using aync connection when the scan logic is ready. The current implementation
- * is based on the blocking client.
+ * The asynchronous region locator.
*/
@InterfaceAudience.Private
-class AsyncRegionLocator implements Closeable {
+class AsyncRegionLocator {
- private final ConnectionImplementation conn;
+ private static final Log LOG = LogFactory.getLog(AsyncRegionLocator.class);
- AsyncRegionLocator(Configuration conf) throws IOException {
- conn = (ConnectionImplementation) ConnectionFactory.createConnection(conf);
+ private final AsyncConnectionImpl conn;
+
+ private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
+
+ private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture =
+ new AtomicReference<>();
+
+ private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], HRegionLocation>> cache =
+ new ConcurrentHashMap<>();
+
+ AsyncRegionLocator(AsyncConnectionImpl conn) {
+ this.conn = conn;
+ }
+
+ private CompletableFuture<HRegionLocation> locateMetaRegion() {
+ for (;;) {
+ HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
+ if (metaRegionLocation != null) {
+ return CompletableFuture.completedFuture(metaRegionLocation);
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Meta region location cache is null, try fetching from registry.");
+ }
+ if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Start fetching meta region location from registry.");
+ }
+ CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+ conn.registry.getMetaRegionLocation().whenComplete((locs, error) -> {
+ if (error != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to fetch meta region location from registry", error);
+ }
+ metaRelocateFuture.getAndSet(null).completeExceptionally(error);
+ return;
+ }
+ HRegionLocation loc = locs.getDefaultRegionLocation();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The fetched meta region location is " + loc);
+ }
+ // Here we update cache before reset future, so it is possible that someone can get a
+ // stale value. Consider this:
+ // 1. update cache
+ // 2. someone clear the cache and relocate again
+ // 3. the metaRelocateFuture is not null so the old future is used.
+ // 4. we clear metaRelocateFuture and complete the future in it with the value being
+ // cleared in step 2.
+ // But we do not think it is a big deal as it rarely happens, and even if it happens, the
+ // caller will retry again later, no correctness problems.
+ this.metaRegionLocation.set(loc);
+ metaRelocateFuture.set(null);
+ future.complete(loc);
+ });
+ } else {
+ CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+ if (future != null) {
+ return future;
+ }
+ }
+ }
+ }
+
+ private static ConcurrentNavigableMap<byte[], HRegionLocation> createTableCache() {
+ return new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+ }
+
+ private void removeFromCache(HRegionLocation loc) {
+ ConcurrentNavigableMap<byte[], HRegionLocation> tableCache =
+ cache.get(loc.getRegionInfo().getTable());
+ if (tableCache == null) {
+ return;
+ }
+ tableCache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> {
+ if (oldLoc.getSeqNum() > loc.getSeqNum()
+ || !oldLoc.getServerName().equals(loc.getServerName())) {
+ return oldLoc;
+ }
+ return null;
+ });
+ }
+
+ private void addToCache(HRegionLocation loc) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Try adding " + loc + " to cache");
+ }
+ ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = computeIfAbsent(cache,
+ loc.getRegionInfo().getTable(), AsyncRegionLocator::createTableCache);
+ byte[] startKey = loc.getRegionInfo().getStartKey();
+ HRegionLocation oldLoc = tableCache.putIfAbsent(startKey, loc);
+ if (oldLoc == null) {
+ return;
+ }
+ if (oldLoc.getSeqNum() > loc.getSeqNum()
+ || oldLoc.getServerName().equals(loc.getServerName())) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc
+ + " is newer than us or has the same server name");
+ }
+ return;
+ }
+ tableCache.compute(startKey, (k, oldValue) -> {
+ if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) {
+ return loc;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue
+ + " is newer than us or has the same server name."
+ + " Maybe it is updated before we replace it");
+ }
+ return oldValue;
+ });
+ }
+
+ private HRegionLocation locateInCache(TableName tableName, byte[] row) {
+ ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = cache.get(tableName);
+ if (tableCache == null) {
+ return null;
+ }
+ Map.Entry<byte[], HRegionLocation> entry = tableCache.floorEntry(row);
+ if (entry == null) {
+ return null;
+ }
+ HRegionLocation loc = entry.getValue();
+ byte[] endKey = loc.getRegionInfo().getEndKey();
+ if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+ return loc;
+ } else {
+ return null;
+ }
+ }
+
+ private void onScanComplete(CompletableFuture<HRegionLocation> future, TableName tableName,
+ byte[] row, List<Result> results, Throwable error, String rowNameInErrorMsg,
+ Consumer<HRegionLocation> otherCheck) {
+ if (error != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to fetch location of '" + tableName + "', " + rowNameInErrorMsg + "='"
+ + Bytes.toStringBinary(row) + "'",
+ error);
+ }
+ future.completeExceptionally(error);
+ return;
+ }
+ if (results.isEmpty()) {
+ future.completeExceptionally(new TableNotFoundException(tableName));
+ return;
+ }
+ RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The fetched location of '" + tableName + "', " + rowNameInErrorMsg + "='"
+ + Bytes.toStringBinary(row) + "' is " + locs);
+ }
+ if (locs == null || locs.getDefaultRegionLocation() == null) {
+ future.completeExceptionally(
+ new IOException(String.format("No location found for '%s', %s='%s'", tableName,
+ rowNameInErrorMsg, Bytes.toStringBinary(row))));
+ return;
+ }
+ HRegionLocation loc = locs.getDefaultRegionLocation();
+ HRegionInfo info = loc.getRegionInfo();
+ if (info == null) {
+ future.completeExceptionally(
+ new IOException(String.format("HRegionInfo is null for '%s', %s='%s'", tableName,
+ rowNameInErrorMsg, Bytes.toStringBinary(row))));
+ return;
+ }
+ if (!info.getTable().equals(tableName)) {
+ future.completeExceptionally(new TableNotFoundException(
+ "Table '" + tableName + "' was not found, got: '" + info.getTable() + "'"));
+ return;
+ }
+ if (info.isSplit()) {
+ future.completeExceptionally(new RegionOfflineException(
+ "the only available region for the required row is a split parent,"
+ + " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"));
+ return;
+ }
+ if (info.isOffline()) {
+ future.completeExceptionally(new RegionOfflineException("the region is offline, could"
+ + " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"));
+ return;
+ }
+ if (loc.getServerName() == null) {
+ future.completeExceptionally(new NoServerForRegionException(
+ String.format("No server address listed for region '%s', %s='%s'",
+ info.getRegionNameAsString(), rowNameInErrorMsg, Bytes.toStringBinary(row))));
+ return;
+ }
+ otherCheck.accept(loc);
+ if (future.isDone()) {
+ return;
+ }
+ addToCache(loc);
+ future.complete(loc);
}
- CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
- boolean reload) {
+ private CompletableFuture<HRegionLocation> locateInMeta(TableName tableName, byte[] row) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(row) + "' in meta");
+ }
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
- try {
- future.complete(conn.getRegionLocation(tableName, row, reload));
- } catch (IOException e) {
- future.completeExceptionally(e);
+ byte[] metaKey = createRegionName(tableName, row, NINES, false);
+ conn.getTable(META_TABLE_NAME)
+ .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
+ .whenComplete(
+ (results, error) -> onScanComplete(future, tableName, row, results, error, "row", loc -> {
+ }));
+ return future;
+ }
+
+ private CompletableFuture<HRegionLocation> locateRegion(TableName tableName, byte[] row) {
+ HRegionLocation loc = locateInCache(tableName, row);
+ if (loc != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
+ + Bytes.toStringBinary(row) + "'");
+ }
+ return CompletableFuture.completedFuture(loc);
+ }
+ return locateInMeta(tableName, row);
+ }
+
+ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row) {
+ if (tableName.equals(META_TABLE_NAME)) {
+ return locateMetaRegion();
+ } else {
+ return locateRegion(tableName, row);
+ }
+ }
+
+ private HRegionLocation locatePreviousInCache(TableName tableName,
+ byte[] startRowOfCurrentRegion) {
+ ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = cache.get(tableName);
+ if (tableCache == null) {
+ return null;
+ }
+ Map.Entry<byte[], HRegionLocation> entry;
+ if (isEmptyStopRow(startRowOfCurrentRegion)) {
+ entry = tableCache.lastEntry();
+ } else {
+ entry = tableCache.lowerEntry(startRowOfCurrentRegion);
+ }
+ if (entry == null) {
+ return null;
}
+ HRegionLocation loc = entry.getValue();
+ if (Bytes.equals(loc.getRegionInfo().getEndKey(), startRowOfCurrentRegion)) {
+ return loc;
+ } else {
+ return null;
+ }
+ }
+
+ private CompletableFuture<HRegionLocation> locatePreviousInMeta(TableName tableName,
+ byte[] startRowOfCurrentRegion) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Try locate '" + tableName + "', startRowOfCurrentRegion='"
+ + Bytes.toStringBinary(startRowOfCurrentRegion) + "' in meta");
+ }
+ byte[] metaKey;
+ if (isEmptyStopRow(startRowOfCurrentRegion)) {
+ byte[] binaryTableName = tableName.getName();
+ metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
+ } else {
+ metaKey = createRegionName(tableName, startRowOfCurrentRegion, ZEROES, false);
+ }
+ CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
+ conn.getTable(META_TABLE_NAME)
+ .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
+ .whenComplete((results, error) -> onScanComplete(future, tableName, startRowOfCurrentRegion,
+ results, error, "startRowOfCurrentRegion", loc -> {
+ HRegionInfo info = loc.getRegionInfo();
+ if (!Bytes.equals(info.getEndKey(), startRowOfCurrentRegion)) {
+ future.completeExceptionally(new IOException("The end key of '"
+ + info.getRegionNameAsString() + "' is '" + Bytes.toStringBinary(info.getEndKey())
+ + "', expected '" + Bytes.toStringBinary(startRowOfCurrentRegion) + "'"));
+ }
+ }));
return future;
}
+ private CompletableFuture<HRegionLocation> locatePreviousRegion(TableName tableName,
+ byte[] startRowOfCurrentRegion) {
+ HRegionLocation loc = locatePreviousInCache(tableName, startRowOfCurrentRegion);
+ if (loc != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found " + loc + " in cache for '" + tableName + "', startRowOfCurrentRegion='"
+ + Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
+ }
+ return CompletableFuture.completedFuture(loc);
+ }
+ return locatePreviousInMeta(tableName, startRowOfCurrentRegion);
+ }
+
+ /**
+ * Locate the previous region using the current regions start key. Used for reverse scan.
+ */
CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
- byte[] startRowOfCurrentRegion, boolean reload) {
- CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
- byte[] toLocateRow = createClosestRowBefore(startRowOfCurrentRegion);
- try {
- for (;;) {
- HRegionLocation loc = conn.getRegionLocation(tableName, toLocateRow, reload);
- byte[] endKey = loc.getRegionInfo().getEndKey();
- if (Bytes.equals(startRowOfCurrentRegion, endKey)) {
- future.complete(loc);
- break;
- }
- toLocateRow = endKey;
+ byte[] startRowOfCurrentRegion) {
+ if (tableName.equals(META_TABLE_NAME)) {
+ return locateMetaRegion();
+ } else {
+ return locatePreviousRegion(tableName, startRowOfCurrentRegion);
+ }
+ }
+
+ private boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
+ // Do not need to update if no such location, or the location is newer, or the location is not
+ // same with us
+ return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum()
+ && oldLoc.getServerName().equals(loc.getServerName());
+ }
+
+ private void updateCachedLoation(HRegionLocation loc, Throwable exception,
+ Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
+ Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
+ HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Try updating " + loc + ", the old value is " + oldLoc, exception);
+ }
+ if (!canUpdate(loc, oldLoc)) {
+ return;
+ }
+ Throwable cause = findException(exception);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The actual exception when updating " + loc, cause);
+ }
+ if (cause == null || !isMetaClearingException(cause)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Will not update " + loc + " because the exception is null or not the one we care about");
}
- } catch (IOException e) {
- future.completeExceptionally(e);
+ return;
+ }
+ if (cause instanceof RegionMovedException) {
+ RegionMovedException rme = (RegionMovedException) cause;
+ HRegionLocation newLoc =
+ new HRegionLocation(loc.getRegionInfo(), rme.getServerName(), rme.getLocationSeqNum());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Try updating " + loc + " with the new location " + newLoc + " constructed by " + rme);
+ }
+ addToCache.accept(newLoc);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Try removing " + loc + " from cache");
+ }
+ removeFromCache.accept(loc);
}
- return future;
}
- void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, Object exception,
- ServerName source) {
- conn.updateCachedLocations(tableName, regionName, row, exception, source);
+ void updateCachedLocation(HRegionLocation loc, Throwable exception) {
+ if (loc.getRegionInfo().isMetaTable()) {
+ updateCachedLoation(loc, exception, l -> metaRegionLocation.get(), newLoc -> {
+ for (;;) {
+ HRegionLocation oldLoc = metaRegionLocation.get();
+ if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum()
+ || oldLoc.getServerName().equals(newLoc.getServerName()))) {
+ return;
+ }
+ if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) {
+ return;
+ }
+ }
+ }, l -> {
+ for (;;) {
+ HRegionLocation oldLoc = metaRegionLocation.get();
+ if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) {
+ return;
+ }
+ }
+ });
+ } else {
+ updateCachedLoation(loc, exception, l -> {
+ ConcurrentNavigableMap<byte[], HRegionLocation> tableCache =
+ cache.get(l.getRegionInfo().getTable());
+ if (tableCache == null) {
+ return null;
+ }
+ return tableCache.get(l.getRegionInfo().getStartKey());
+ }, this::addToCache, this::removeFromCache);
+ }
}
- @Override
- public void close() {
- IOUtils.closeQuietly(conn);
+ void clearCache(TableName tableName) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Clear meta cache for " + tableName);
+ }
+ cache.remove(tableName);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 9020ce5..0d23c39 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -68,8 +68,8 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
- public SingleRequestCallerBuilder<T>
- action(AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
+ public SingleRequestCallerBuilder<T> action(
+ AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
this.callable = callable;
return this;
}
@@ -92,12 +92,9 @@ class AsyncRpcRetryingCallerFactory {
public AsyncSingleRequestRpcRetryingCaller<T> build() {
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
- locateToPreviousRegion
- ? (c, tn, r, re) -> c.getLocator().getPreviousRegionLocation(tn, r, re)
- : (c, tn, r, re) -> c.getLocator().getRegionLocation(tn, r, re),
- checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(),
- conn.connConf.getMaxRetries(), operationTimeoutNs, rpcTimeoutNs,
- conn.connConf.getStartLogErrorsCnt());
+ locateToPreviousRegion, checkNotNull(callable, "action is null"),
+ conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs,
+ rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 1d0357d..f10c9a5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -60,12 +60,6 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
ClientService.Interface stub);
}
- @FunctionalInterface
- public interface RegionLocator {
- CompletableFuture<HRegionLocation> locate(AsyncConnectionImpl conn, TableName tableName,
- byte[] row, boolean reload);
- }
-
private final HashedWheelTimer retryTimer;
private final AsyncConnectionImpl conn;
@@ -74,7 +68,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
private final byte[] row;
- private final RegionLocator locator;
+ private final Supplier<CompletableFuture<HRegionLocation>> locate;
private final Callable<T> callable;
@@ -97,13 +91,18 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
private final long startNs;
public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
- TableName tableName, byte[] row, RegionLocator locator, Callable<T> callable, long pauseNs,
- int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+ TableName tableName, byte[] row, boolean locateToPreviousRegion, Callable<T> callable,
+ long pauseNs, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs,
+ int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
this.row = row;
- this.locator = locator;
+ if (locateToPreviousRegion) {
+ this.locate = this::locatePrevious;
+ } else {
+ this.locate = this::locate;
+ }
this.callable = callable;
this.pauseNs = pauseNs;
this.maxAttempts = retries2Attempts(maxRetries);
@@ -145,8 +144,9 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
if (tries > startLogErrorsCnt) {
LOG.warn(errMsg.get(), error);
}
- RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(
- error, EnvironmentEdgeManager.currentTime(), "");
+ RetriesExhaustedException.ThrowableWithExtraContext qt =
+ new RetriesExhaustedException.ThrowableWithExtraContext(error,
+ EnvironmentEdgeManager.currentTime(), "");
exceptions.add(qt);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
completeExceptionally();
@@ -194,8 +194,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
+ " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
+ TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
+ elapsedMs() + " ms",
- err -> conn.getLocator().updateCachedLocations(tableName,
- loc.getRegionInfo().getRegionName(), row, err, loc.getServerName()));
+ err -> conn.getLocator().updateCachedLocation(loc, err));
return;
}
resetController();
@@ -207,8 +206,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
+ tries + ", maxAttempts = " + maxAttempts + ", timeout = "
+ TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
+ elapsedMs() + " ms",
- err -> conn.getLocator().updateCachedLocations(tableName,
- loc.getRegionInfo().getRegionName(), row, err, loc.getServerName()));
+ err -> conn.getLocator().updateCachedLocation(loc, err));
return;
}
future.complete(result);
@@ -216,7 +214,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
}
private void locateThenCall() {
- locator.locate(conn, tableName, row, tries > 1).whenComplete((loc, error) -> {
+ locate.get().whenComplete((loc, error) -> {
if (error != null) {
onError(error,
() -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = "
@@ -231,6 +229,14 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
});
}
+ private CompletableFuture<HRegionLocation> locate() {
+ return conn.getLocator().getRegionLocation(tableName, row);
+ }
+
+ private CompletableFuture<HRegionLocation> locatePrevious() {
+ return conn.getLocator().getPreviousRegionLocation(tableName, row);
+ }
+
public CompletableFuture<T> call() {
locateThenCall();
return future;
http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
index d715e24..b29f878 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -45,6 +45,6 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
@Override
public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
- return locator.getRegionLocation(tableName, row, reload);
+ return locator.getRegionLocation(tableName, row);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java
index 775f8bd..4e19b77 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -108,6 +109,19 @@ public class CollectionUtils {
}
/**
+ * In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the
+ * value already exists. So here we copy the implementation of
+ * {@link ConcurrentMap#computeIfAbsent(Object, java.util.function.Function)}. It uses get and
+ * putIfAbsent to implement computeIfAbsent. And notice that the implementation does not guarantee
+ * that the supplier will only be executed once.
+ */
+ public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, Supplier<V> supplier) {
+ V v, newValue;
+ return ((v = map.get(key)) == null && (newValue = supplier.get()) != null
+ && (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v;
+ }
+
+ /**
* A supplier that throws IOException when get.
*/
@FunctionalInterface
http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
new file mode 100644
index 0000000..b20e616
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
+import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
+import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Will split the table, and move region randomly when testing.
+ */
+@Category({ LargeTests.class, ClientTests.class })
+public class TestAsyncGetMultiThread {
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("async");
+
+ private static byte[] FAMILY = Bytes.toBytes("cf");
+
+ private static byte[] QUALIFIER = Bytes.toBytes("cq");
+
+ private static int COUNT = 1000;
+
+ private static AsyncConnection CONN;
+
+ private static byte[][] SPLIT_KEYS;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
+ TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
+ TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L);
+ TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000);
+ TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
+ TEST_UTIL.startMiniCluster(5);
+ SPLIT_KEYS = new byte[8][];
+ for (int i = 111; i < 999; i += 111) {
+ SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+ }
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+ AsyncTable table = CONN.getTable(TABLE_NAME);
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+ IntStream.range(0, COUNT)
+ .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
+ .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))));
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ CONN.close();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException {
+ while (!stop.get()) {
+ int i = ThreadLocalRandom.current().nextInt(COUNT);
+ assertEquals(i,
+ Bytes.toInt(CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i))))
+ .get().getValue(FAMILY, QUALIFIER)));
+ }
+ }
+
+ @Test
+ public void test() throws IOException, InterruptedException, ExecutionException {
+ int numThreads = 20;
+ AtomicBoolean stop = new AtomicBoolean(false);
+ ExecutorService executor =
+ Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-"));
+ List<Future<?>> futures = new ArrayList<>();
+ IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {
+ run(stop);
+ return null;
+ })));
+ Collections.shuffle(Arrays.asList(SPLIT_KEYS), new Random(123));
+ Admin admin = TEST_UTIL.getAdmin();
+ for (byte[] splitPoint : SPLIT_KEYS) {
+ admin.split(TABLE_NAME, splitPoint);
+ for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) {
+ region.compact(true);
+ }
+ Thread.sleep(5000);
+ admin.balancer(true);
+ Thread.sleep(5000);
+ ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
+ ServerName newMetaServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
+ .map(t -> t.getRegionServer().getServerName()).filter(s -> !s.equals(metaServer))
+ .findAny().get();
+ admin.move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
+ Bytes.toBytes(newMetaServer.getServerName()));
+ Thread.sleep(5000);
+ }
+ stop.set(true);
+ executor.shutdown();
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
new file mode 100644
index 0000000..2e46d8a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncRegionLocator {
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("async");
+
+ private static byte[] FAMILY = Bytes.toBytes("cf");
+
+ private static AsyncConnectionImpl CONN;
+
+ private static AsyncRegionLocator LOCATOR;
+
+ private static byte[][] SPLIT_KEYS;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL.startMiniCluster(3);
+ TEST_UTIL.getAdmin().setBalancerRunning(false, true);
+ CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
+ LOCATOR = CONN.getLocator();
+ SPLIT_KEYS = new byte[8][];
+ for (int i = 111; i < 999; i += 111) {
+ SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ CONN.close();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @After
+ public void tearDownAfterTest() throws IOException {
+ Admin admin = TEST_UTIL.getAdmin();
+ if (admin.tableExists(TABLE_NAME)) {
+ if (admin.isTableEnabled(TABLE_NAME)) {
+ TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
+ }
+ TEST_UTIL.getAdmin().deleteTable(TABLE_NAME);
+ }
+ LOCATOR.clearCache(TABLE_NAME);
+ }
+
+ private void createSingleRegionTable() throws IOException, InterruptedException {
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ }
+
+ @Test
+ public void testNoTable() throws InterruptedException {
+ try {
+ LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
+ } catch (ExecutionException e) {
+ assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+ }
+ try {
+ LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
+ } catch (ExecutionException e) {
+ assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+ }
+ }
+
+ @Test
+ public void testDisableTable() throws IOException, InterruptedException {
+ createSingleRegionTable();
+ TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
+ try {
+ LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
+ } catch (ExecutionException e) {
+ assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+ }
+ try {
+ LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
+ } catch (ExecutionException e) {
+ assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+ }
+ }
+
+ private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName,
+ HRegionLocation loc) {
+ HRegionInfo info = loc.getRegionInfo();
+ assertEquals(TABLE_NAME, info.getTable());
+ assertArrayEquals(startKey, info.getStartKey());
+ assertArrayEquals(endKey, info.getEndKey());
+ assertEquals(serverName, loc.getServerName());
+ }
+
+ @Test
+ public void testSingleRegionTable() throws IOException, InterruptedException, ExecutionException {
+ createSingleRegionTable();
+ ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
+ assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+ LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+ assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+ LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+ byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
+ ThreadLocalRandom.current().nextBytes(randKey);
+ assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+ LOCATOR.getRegionLocation(TABLE_NAME, randKey).get());
+ // Use a key which is not the endKey of a region will cause error
+ try {
+ assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+ LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }).get());
+ } catch (ExecutionException e) {
+ assertThat(e.getCause(), instanceOf(IOException.class));
+ assertTrue(e.getCause().getMessage().contains("end key of"));
+ }
+ }
+
+ private void createMultiRegionTable() throws IOException, InterruptedException {
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ }
+
+ private static byte[][] getStartKeys() {
+ byte[][] startKeys = new byte[SPLIT_KEYS.length + 1][];
+ startKeys[0] = EMPTY_START_ROW;
+ System.arraycopy(SPLIT_KEYS, 0, startKeys, 1, SPLIT_KEYS.length);
+ return startKeys;
+ }
+
+ private static byte[][] getEndKeys() {
+ byte[][] endKeys = Arrays.copyOf(SPLIT_KEYS, SPLIT_KEYS.length + 1);
+ endKeys[endKeys.length - 1] = EMPTY_START_ROW;
+ return endKeys;
+ }
+
+ @Test
+ public void testMultiRegionTable() throws IOException, InterruptedException {
+ createMultiRegionTable();
+ byte[][] startKeys = getStartKeys();
+ ServerName[] serverNames = new ServerName[startKeys.length];
+ TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
+ .forEach(rs -> {
+ rs.getOnlineRegions(TABLE_NAME).forEach(r -> {
+ serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
+ Bytes::compareTo)] = rs.getServerName();
+ });
+ });
+ IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
+ try {
+ assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
+ serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i]).get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ LOCATOR.clearCache(TABLE_NAME);
+ byte[][] endKeys = getEndKeys();
+ IntStream.range(0, 2).forEach(
+ n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
+ try {
+ assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
+ LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i]).get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ }
+
+ @Test
+ public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
+ createSingleRegionTable();
+ ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
+ HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
+ assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
+ ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
+ .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
+ .findAny().get();
+
+ TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegionInfo().getEncodedName()),
+ Bytes.toBytes(newServerName.getServerName()));
+ while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName()
+ .equals(newServerName)) {
+ Thread.sleep(100);
+ }
+ // Should be same as it is in cache
+ assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+ LOCATOR.updateCachedLocation(loc, null);
+ // null error will not trigger a cache cleanup
+ assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+ LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
+ assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
+ LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index fd3938e..67d2661 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -151,11 +150,9 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
AtomicBoolean errorTriggered = new AtomicBoolean(false);
AtomicInteger count = new AtomicInteger(0);
HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
-
- try (AsyncRegionLocator mockedLocator = new AsyncRegionLocator(asyncConn.getConfiguration()) {
+ AsyncRegionLocator mockedLocator = new AsyncRegionLocator(asyncConn) {
@Override
- CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
- boolean reload) {
+ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row) {
if (tableName.equals(TABLE_NAME)) {
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
if (count.getAndIncrement() == 0) {
@@ -166,17 +163,22 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
}
return future;
} else {
- return super.getRegionLocation(tableName, row, reload);
+ return super.getRegionLocation(tableName, row);
}
}
@Override
- void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row,
- Object exception, ServerName source) {
+ CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
+ byte[] startRowOfCurrentRegion) {
+ return super.getPreviousRegionLocation(tableName, startRowOfCurrentRegion);
+ }
+
+ @Override
+ void updateCachedLocation(HRegionLocation loc, Throwable exception) {
}
};
- AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(asyncConn.getConfiguration(),
- User.getCurrent()) {
+ try (AsyncConnectionImpl mockedConn =
+ new AsyncConnectionImpl(asyncConn.getConfiguration(), User.getCurrent()) {
@Override
AsyncRegionLocator getLocator() {