You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/12/18 00:53:11 UTC
hbase git commit: HBASE-17282 Reduce the redundant requests to meta
table
Repository: hbase
Updated Branches:
refs/heads/master da356069f -> f041306cd
HBASE-17282 Reduce the redundant requests to meta table
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f041306c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f041306c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f041306c
Branch: refs/heads/master
Commit: f041306cdae701e91b314234b413af98fd1f6b18
Parents: da35606
Author: zhangduo <zh...@apache.org>
Authored: Sat Dec 17 21:01:14 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Dec 18 08:52:06 2016 +0800
----------------------------------------------------------------------
.../hbase/client/AsyncConnectionImpl.java | 2 +-
.../hbase/client/AsyncMetaRegionLocator.java | 121 +++++
.../hbase/client/AsyncNonMetaRegionLocator.java | 487 +++++++++++++++++++
.../hadoop/hbase/client/AsyncRegionLocator.java | 377 ++------------
.../hbase/client/TestAsyncGetMultiThread.java | 3 +-
.../client/TestAsyncNonMetaRegionLocator.java | 240 +++++++++
...syncNonMetaRegionLocatorConcurrenyLimit.java | 159 ++++++
.../hbase/client/TestAsyncRegionLocator.java | 239 ---------
.../client/TestAsyncRegionLocatorTimeout.java | 3 +-
.../hadoop/hbase/client/TestAsyncTable.java | 3 +-
.../hbase/client/TestAsyncTableNoncedRetry.java | 3 +-
11 files changed, 1048 insertions(+), 589 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 92785fb..d660b02 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -95,7 +95,6 @@ class AsyncConnectionImpl implements AsyncConnection {
this.conf = conf;
this.user = user;
this.connConf = new AsyncConnectionConfiguration(conf);
- this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
this.registry = AsyncRegistryFactory.getRegistry(conf);
this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> {
if (LOG.isDebugEnabled()) {
@@ -107,6 +106,7 @@ class AsyncConnectionImpl implements AsyncConnection {
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
+ this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
nonceGenerator = PerClientRandomNonceGenerator.get();
http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
new file mode 100644
index 0000000..5b7a68f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.AsyncRegionLocator.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * The asynchronous locator for meta region.
+ */
+@InterfaceAudience.Private
+class AsyncMetaRegionLocator {
+
+ private static final Log LOG = LogFactory.getLog(AsyncMetaRegionLocator.class);
+
+ private final AsyncRegistry registry;
+
+ private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
+
+ private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture =
+ new AtomicReference<>();
+
+ AsyncMetaRegionLocator(AsyncRegistry registry) {
+ this.registry = registry;
+ }
+
+ CompletableFuture<HRegionLocation> getRegionLocation() {
+ for (;;) {
+ HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
+ if (metaRegionLocation != null) {
+ return CompletableFuture.completedFuture(metaRegionLocation);
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Meta region location cache is null, try fetching from registry.");
+ }
+ if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Start fetching meta region location from registry.");
+ }
+ CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+ registry.getMetaRegionLocation().whenComplete((locs, error) -> {
+ if (error != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to fetch meta region location from registry", error);
+ }
+ metaRelocateFuture.getAndSet(null).completeExceptionally(error);
+ return;
+ }
+ HRegionLocation loc = locs.getDefaultRegionLocation();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The fetched meta region location is " + loc);
+ }
+ // Here we update cache before reset future, so it is possible that someone can get a
+ // stale value. Consider this:
+ // 1. update cache
+ // 2. someone clear the cache and relocate again
+ // 3. the metaRelocateFuture is not null so the old future is used.
+ // 4. we clear metaRelocateFuture and complete the future in it with the value being
+ // cleared in step 2.
+ // But we do not think it is a big deal as it rarely happens, and even if it happens, the
+ // caller will retry again later, no correctness problems.
+ this.metaRegionLocation.set(loc);
+ metaRelocateFuture.set(null);
+ future.complete(loc);
+ });
+ } else {
+ CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+ if (future != null) {
+ return future;
+ }
+ }
+ }
+ }
+
+ void updateCachedLocation(HRegionLocation loc, Throwable exception) {
+ updateCachedLoation(loc, exception, l -> metaRegionLocation.get(), newLoc -> {
+ for (;;) {
+ HRegionLocation oldLoc = metaRegionLocation.get();
+ if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum()
+ || oldLoc.getServerName().equals(newLoc.getServerName()))) {
+ return;
+ }
+ if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) {
+ return;
+ }
+ }
+ }, l -> {
+ for (;;) {
+ HRegionLocation oldLoc = metaRegionLocation.get();
+ if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) {
+ return;
+ }
+ }
+ });
+ }
+
+ void clearCache() {
+ metaRegionLocation.set(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
new file mode 100644
index 0000000..c22d210
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY;
+import static org.apache.hadoop.hbase.HConstants.NINES;
+import static org.apache.hadoop.hbase.HConstants.ZEROES;
+import static org.apache.hadoop.hbase.HRegionInfo.createRegionName;
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocator.updateCachedLoation;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * The asynchronous locator for regions other than meta.
+ */
+@InterfaceAudience.Private
+class AsyncNonMetaRegionLocator {
+
+ private static final Log LOG = LogFactory.getLog(AsyncNonMetaRegionLocator.class);
+
+ static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
+ "hbase.client.meta.max.concurrent.locate.per.table";
+
+ private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
+
+ private final AsyncConnectionImpl conn;
+
+ private final int maxConcurrentLocateRequestPerTable;
+
+ private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
+
+ private static final class LocateRequest {
+
+ public final byte[] row;
+
+ public final boolean locateToPrevious;
+
+ public LocateRequest(byte[] row, boolean locateToPrevious) {
+ this.row = row;
+ this.locateToPrevious = locateToPrevious;
+ }
+
+ @Override
+ public int hashCode() {
+ return Bytes.hashCode(row) ^ Boolean.hashCode(locateToPrevious);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || obj.getClass() != LocateRequest.class) {
+ return false;
+ }
+ LocateRequest that = (LocateRequest) obj;
+ return locateToPrevious == that.locateToPrevious && Bytes.equals(row, that.row);
+ }
+ }
+
+ private static final class TableCache {
+
+ public final ConcurrentNavigableMap<byte[], HRegionLocation> cache =
+ new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
+
+ public final Set<LocateRequest> pendingRequests = new HashSet<>();
+
+ public final Map<LocateRequest, CompletableFuture<HRegionLocation>> allRequests =
+ new HashMap<>();
+
+ public boolean hasQuota(int max) {
+ return pendingRequests.size() < max;
+ }
+
+ public boolean isPending(LocateRequest req) {
+ return pendingRequests.contains(req);
+ }
+
+ public void send(LocateRequest req) {
+ pendingRequests.add(req);
+ }
+ }
+
+ AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
+ this.conn = conn;
+ this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
+ MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
+ }
+
+ private TableCache getTableCache(TableName tableName) {
+ return computeIfAbsent(cache, tableName, TableCache::new);
+ }
+
+ private void removeFromCache(HRegionLocation loc) {
+ TableCache tableCache = cache.get(loc.getRegionInfo().getTable());
+ if (tableCache == null) {
+ return;
+ }
+ tableCache.cache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> {
+ if (oldLoc.getSeqNum() > loc.getSeqNum()
+ || !oldLoc.getServerName().equals(loc.getServerName())) {
+ return oldLoc;
+ }
+ return null;
+ });
+ }
+
+ // return whether we add this loc to cache
+ private boolean addToCache(TableCache tableCache, HRegionLocation loc) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Try adding " + loc + " to cache");
+ }
+ byte[] startKey = loc.getRegionInfo().getStartKey();
+ HRegionLocation oldLoc = tableCache.cache.putIfAbsent(startKey, loc);
+ if (oldLoc == null) {
+ return true;
+ }
+ if (oldLoc.getSeqNum() > loc.getSeqNum()
+ || oldLoc.getServerName().equals(loc.getServerName())) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc
+ + " is newer than us or has the same server name");
+ }
+ return false;
+ }
+ return loc == tableCache.cache.compute(startKey, (k, oldValue) -> {
+ if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) {
+ return loc;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue
+ + " is newer than us or has the same server name."
+ + " Maybe it is updated before we replace it");
+ }
+ return oldValue;
+ });
+ }
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "Called by lambda expression")
+ private void addToCache(HRegionLocation loc) {
+ addToCache(getTableCache(loc.getRegionInfo().getTable()), loc);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Try adding " + loc + " to cache");
+ }
+ }
+
+ private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future,
+ HRegionLocation loc) {
+ if (future.isDone()) {
+ return true;
+ }
+ boolean completed;
+ if (req.locateToPrevious) {
+ completed = Bytes.equals(loc.getRegionInfo().getEndKey(), req.row);
+ } else {
+ completed = loc.getRegionInfo().containsRow(req.row);
+ }
+ if (completed) {
+ future.complete(loc);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
+ Throwable error, String rowNameInErrorMsg) {
+ if (error != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to locate region in '" + tableName + "', " + rowNameInErrorMsg + "='"
+ + Bytes.toStringBinary(req.row) + "'",
+ error);
+ }
+ }
+ LocateRequest toSend = null;
+ TableCache tableCache = getTableCache(tableName);
+ if (loc != null) {
+ if (!addToCache(tableCache, loc)) {
+ // someone is ahead of us.
+ synchronized (tableCache) {
+ tableCache.pendingRequests.remove(req);
+ }
+ return;
+ }
+ }
+ synchronized (tableCache) {
+ tableCache.pendingRequests.remove(req);
+ if (error instanceof DoNotRetryIOException) {
+ CompletableFuture<?> future = tableCache.allRequests.remove(req);
+ if (future != null) {
+ future.completeExceptionally(error);
+ }
+ }
+ if (loc != null) {
+ for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter =
+ tableCache.allRequests.entrySet().iterator(); iter.hasNext();) {
+ Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next();
+ if (tryComplete(entry.getKey(), entry.getValue(), loc)) {
+ iter.remove();
+ }
+ }
+ }
+ if (!tableCache.allRequests.isEmpty()
+ && tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) {
+ LocateRequest[] candidates = tableCache.allRequests.keySet().stream()
+ .filter(r -> !tableCache.isPending(r)).toArray(LocateRequest[]::new);
+ if (candidates.length > 0) {
+ // TODO: use a better algorithm to send a request which is more likely to fetch a new
+ // location.
+ toSend = candidates[ThreadLocalRandom.current().nextInt(candidates.length)];
+ }
+ }
+ }
+ if (toSend != null) {
+ if (toSend.locateToPrevious) {
+ locatePreviousInMeta(tableName, toSend);
+ } else {
+ locateInMeta(tableName, toSend);
+ }
+ }
+ }
+
+ private void onScanComplete(TableName tableName, LocateRequest req, List<Result> results,
+ Throwable error, String rowNameInErrorMsg) {
+ if (error != null) {
+ complete(tableName, req, null, error, rowNameInErrorMsg);
+ return;
+ }
+ if (results.isEmpty()) {
+ complete(tableName, req, null, new TableNotFoundException(tableName), rowNameInErrorMsg);
+ return;
+ }
+ RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The fetched location of '" + tableName + "', " + rowNameInErrorMsg + "='"
+ + Bytes.toStringBinary(req.row) + "' is " + locs);
+ }
+ if (locs == null || locs.getDefaultRegionLocation() == null) {
+ complete(tableName, req, null,
+ new IOException(String.format("No location found for '%s', %s='%s'", tableName,
+ rowNameInErrorMsg, Bytes.toStringBinary(req.row))),
+ rowNameInErrorMsg);
+ return;
+ }
+ HRegionLocation loc = locs.getDefaultRegionLocation();
+ HRegionInfo info = loc.getRegionInfo();
+ if (info == null) {
+ complete(tableName, req, null,
+ new IOException(String.format("HRegionInfo is null for '%s', %s='%s'", tableName,
+ rowNameInErrorMsg, Bytes.toStringBinary(req.row))),
+ rowNameInErrorMsg);
+ return;
+ }
+ if (!info.getTable().equals(tableName)) {
+ complete(tableName, req, null,
+ new TableNotFoundException(
+ "Table '" + tableName + "' was not found, got: '" + info.getTable() + "'"),
+ rowNameInErrorMsg);
+ return;
+ }
+ if (info.isSplit()) {
+ complete(tableName, req, null,
+ new RegionOfflineException(
+ "the only available region for the required row is a split parent,"
+ + " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"),
+ rowNameInErrorMsg);
+ return;
+ }
+ if (info.isOffline()) {
+ complete(tableName, req, null,
+ new RegionOfflineException("the region is offline, could"
+ + " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"),
+ rowNameInErrorMsg);
+ return;
+ }
+ if (loc.getServerName() == null) {
+ complete(tableName, req, null,
+ new NoServerForRegionException(
+ String.format("No server address listed for region '%s', %s='%s'",
+ info.getRegionNameAsString(), rowNameInErrorMsg, Bytes.toStringBinary(req.row))),
+ rowNameInErrorMsg);
+ return;
+ }
+ if (req.locateToPrevious && !Bytes.equals(info.getEndKey(), req.row)) {
+ complete(tableName, req, null,
+ new DoNotRetryIOException("The end key of '" + info.getRegionNameAsString() + "' is '"
+ + Bytes.toStringBinary(info.getEndKey()) + "', expected '"
+ + Bytes.toStringBinary(req.row) + "'"),
+ rowNameInErrorMsg);
+ return;
+ }
+ complete(tableName, req, loc, null, rowNameInErrorMsg);
+ }
+
+ private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row) {
+ Map.Entry<byte[], HRegionLocation> entry = tableCache.cache.floorEntry(row);
+ if (entry == null) {
+ return null;
+ }
+ HRegionLocation loc = entry.getValue();
+ byte[] endKey = loc.getRegionInfo().getEndKey();
+ if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
+ + Bytes.toStringBinary(row) + "'");
+ }
+ return loc;
+ } else {
+ return null;
+ }
+ }
+
+ private HRegionLocation locatePreviousInCache(TableCache tableCache, TableName tableName,
+ byte[] startRowOfCurrentRegion) {
+ Map.Entry<byte[], HRegionLocation> entry;
+ if (isEmptyStopRow(startRowOfCurrentRegion)) {
+ entry = tableCache.cache.lastEntry();
+ } else {
+ entry = tableCache.cache.lowerEntry(startRowOfCurrentRegion);
+ }
+ if (entry == null) {
+ return null;
+ }
+ HRegionLocation loc = entry.getValue();
+ if (Bytes.equals(loc.getRegionInfo().getEndKey(), startRowOfCurrentRegion)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found " + loc + " in cache for '" + tableName + "', startRowOfCurrentRegion='"
+ + Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
+ }
+ return loc;
+ } else {
+ return null;
+ }
+ }
+
+ private void locateInMeta(TableName tableName, LocateRequest req) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) + "' in meta");
+ }
+ byte[] metaKey = createRegionName(tableName, req.row, NINES, false);
+ conn.getRawTable(META_TABLE_NAME)
+ .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
+ .whenComplete((results, error) -> onScanComplete(tableName, req, results, error, "row"));
+ }
+
+ private void locatePreviousInMeta(TableName tableName, LocateRequest req) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Try locate '" + tableName + "', startRowOfCurrentRegion='"
+ + Bytes.toStringBinary(req.row) + "' in meta");
+ }
+ byte[] metaKey;
+ if (isEmptyStopRow(req.row)) {
+ byte[] binaryTableName = tableName.getName();
+ metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
+ } else {
+ metaKey = createRegionName(tableName, req.row, ZEROES, false);
+ }
+ conn.getRawTable(META_TABLE_NAME)
+ .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
+ .whenComplete((results, error) -> onScanComplete(tableName, req, results, error,
+ "startRowOfCurrentRegion"));
+ }
+
+ private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row,
+ boolean locateToPrevious) {
+ return locateToPrevious ? locatePreviousInCache(tableCache, tableName, row)
+ : locateInCache(tableCache, tableName, row);
+ }
+
+ // locateToPrevious is true means we will use the start key of a region to locate the region
+ // placed before it. Used for reverse scan. See the comment of
+ // AsyncRegionLocator.getPreviousRegionLocation.
+ private CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+ boolean locateToPrevious) {
+ TableCache tableCache = getTableCache(tableName);
+ HRegionLocation loc = locateInCache(tableCache, tableName, row, locateToPrevious);
+ if (loc != null) {
+ return CompletableFuture.completedFuture(loc);
+ }
+ CompletableFuture<HRegionLocation> future;
+ LocateRequest req;
+ boolean sendRequest = false;
+ synchronized (tableCache) {
+ // check again
+ loc = locateInCache(tableCache, tableName, row, locateToPrevious);
+ if (loc != null) {
+ return CompletableFuture.completedFuture(loc);
+ }
+ req = new LocateRequest(row, locateToPrevious);
+ future = tableCache.allRequests.get(req);
+ if (future == null) {
+ future = new CompletableFuture<>();
+ tableCache.allRequests.put(req, future);
+ if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) {
+ tableCache.send(req);
+ sendRequest = true;
+ }
+ }
+ }
+ if (sendRequest) {
+ if (locateToPrevious) {
+ locatePreviousInMeta(tableName, req);
+ } else {
+ locateInMeta(tableName, req);
+ }
+ }
+ return future;
+ }
+
+ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row) {
+ return getRegionLocation(tableName, row, false);
+ }
+
+ // Used for reverse scan. See the comment of AsyncRegionLocator.getPreviousRegionLocation.
+ // TODO: need to deal with region merge where the startRowOfCurrentRegion will not be the endRow
+ // of a region.
+ CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
+ byte[] startRowOfCurrentRegion) {
+ return getRegionLocation(tableName, startRowOfCurrentRegion, true);
+ }
+
+ void updateCachedLocation(HRegionLocation loc, Throwable exception) {
+ updateCachedLoation(loc, exception, l -> {
+ TableCache tableCache = cache.get(l.getRegionInfo().getTable());
+ if (tableCache == null) {
+ return null;
+ }
+ return tableCache.cache.get(l.getRegionInfo().getStartKey());
+ }, this::addToCache, this::removeFromCache);
+ }
+
+ void clearCache(TableName tableName) {
+ TableCache tableCache = cache.remove(tableName);
+ if (tableCache == null) {
+ return;
+ }
+ synchronized (tableCache) {
+ if (!tableCache.allRequests.isEmpty()) {
+ IOException error = new IOException("Cache cleared");
+ tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index ae8f2a2..1c3569a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -17,42 +17,23 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY;
-import static org.apache.hadoop.hbase.HConstants.NINES;
-import static org.apache.hadoop.hbase.HConstants.ZEROES;
-import static org.apache.hadoop.hbase.HRegionInfo.createRegionName;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
-import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
@@ -66,346 +47,73 @@ class AsyncRegionLocator {
private static final Log LOG = LogFactory.getLog(AsyncRegionLocator.class);
- private final AsyncConnectionImpl conn;
-
private final HashedWheelTimer retryTimer;
- private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
-
- private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture =
- new AtomicReference<>();
+ private final AsyncMetaRegionLocator metaRegionLocator;
- private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], HRegionLocation>> cache =
- new ConcurrentHashMap<>();
+ private final AsyncNonMetaRegionLocator nonMetaRegionLocator;
AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
- this.conn = conn;
+ this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry);
+ this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn);
this.retryTimer = retryTimer;
}
- private CompletableFuture<HRegionLocation> locateMetaRegion() {
- for (;;) {
- HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
- if (metaRegionLocation != null) {
- return CompletableFuture.completedFuture(metaRegionLocation);
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Meta region location cache is null, try fetching from registry.");
- }
- if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Start fetching meta region location from registry.");
- }
- CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
- conn.registry.getMetaRegionLocation().whenComplete((locs, error) -> {
- if (error != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to fetch meta region location from registry", error);
- }
- metaRelocateFuture.getAndSet(null).completeExceptionally(error);
- return;
- }
- HRegionLocation loc = locs.getDefaultRegionLocation();
- if (LOG.isDebugEnabled()) {
- LOG.debug("The fetched meta region location is " + loc);
- }
- // Here we update cache before reset future, so it is possible that someone can get a
- // stale value. Consider this:
- // 1. update cache
- // 2. someone clear the cache and relocate again
- // 3. the metaRelocateFuture is not null so the old future is used.
- // 4. we clear metaRelocateFuture and complete the future in it with the value being
- // cleared in step 2.
- // But we do not think it is a big deal as it rarely happens, and even if it happens, the
- // caller will retry again later, no correctness problems.
- this.metaRegionLocation.set(loc);
- metaRelocateFuture.set(null);
- future.complete(loc);
- });
- } else {
- CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
- if (future != null) {
- return future;
- }
- }
- }
- }
-
- private static ConcurrentNavigableMap<byte[], HRegionLocation> createTableCache() {
- return new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
- }
-
- private void removeFromCache(HRegionLocation loc) {
- ConcurrentNavigableMap<byte[], HRegionLocation> tableCache =
- cache.get(loc.getRegionInfo().getTable());
- if (tableCache == null) {
- return;
- }
- tableCache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> {
- if (oldLoc.getSeqNum() > loc.getSeqNum()
- || !oldLoc.getServerName().equals(loc.getServerName())) {
- return oldLoc;
- }
- return null;
- });
- }
-
- private void addToCache(HRegionLocation loc) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Try adding " + loc + " to cache");
- }
- ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = computeIfAbsent(cache,
- loc.getRegionInfo().getTable(), AsyncRegionLocator::createTableCache);
- byte[] startKey = loc.getRegionInfo().getStartKey();
- HRegionLocation oldLoc = tableCache.putIfAbsent(startKey, loc);
- if (oldLoc == null) {
- return;
- }
- if (oldLoc.getSeqNum() > loc.getSeqNum()
- || oldLoc.getServerName().equals(loc.getServerName())) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc
- + " is newer than us or has the same server name");
- }
- return;
- }
- tableCache.compute(startKey, (k, oldValue) -> {
- if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) {
- return loc;
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue
- + " is newer than us or has the same server name."
- + " Maybe it is updated before we replace it");
- }
- return oldValue;
- });
- }
-
- private HRegionLocation locateInCache(TableName tableName, byte[] row) {
- ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = cache.get(tableName);
- if (tableCache == null) {
- return null;
- }
- Map.Entry<byte[], HRegionLocation> entry = tableCache.floorEntry(row);
- if (entry == null) {
- return null;
- }
- HRegionLocation loc = entry.getValue();
- byte[] endKey = loc.getRegionInfo().getEndKey();
- if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
- return loc;
- } else {
- return null;
- }
- }
-
- private void onScanComplete(CompletableFuture<HRegionLocation> future, TableName tableName,
- byte[] row, List<Result> results, Throwable error, String rowNameInErrorMsg,
- Consumer<HRegionLocation> otherCheck) {
- if (error != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to fetch location of '" + tableName + "', " + rowNameInErrorMsg + "='"
- + Bytes.toStringBinary(row) + "'",
- error);
- }
- future.completeExceptionally(error);
- return;
- }
- if (results.isEmpty()) {
- future.completeExceptionally(new TableNotFoundException(tableName));
- return;
- }
- RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0));
- if (LOG.isDebugEnabled()) {
- LOG.debug("The fetched location of '" + tableName + "', " + rowNameInErrorMsg + "='"
- + Bytes.toStringBinary(row) + "' is " + locs);
- }
- if (locs == null || locs.getDefaultRegionLocation() == null) {
- future.completeExceptionally(
- new IOException(String.format("No location found for '%s', %s='%s'", tableName,
- rowNameInErrorMsg, Bytes.toStringBinary(row))));
- return;
- }
- HRegionLocation loc = locs.getDefaultRegionLocation();
- HRegionInfo info = loc.getRegionInfo();
- if (info == null) {
- future.completeExceptionally(
- new IOException(String.format("HRegionInfo is null for '%s', %s='%s'", tableName,
- rowNameInErrorMsg, Bytes.toStringBinary(row))));
- return;
- }
- if (!info.getTable().equals(tableName)) {
- future.completeExceptionally(new TableNotFoundException(
- "Table '" + tableName + "' was not found, got: '" + info.getTable() + "'"));
- return;
- }
- if (info.isSplit()) {
- future.completeExceptionally(new RegionOfflineException(
- "the only available region for the required row is a split parent,"
- + " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"));
- return;
- }
- if (info.isOffline()) {
- future.completeExceptionally(new RegionOfflineException("the region is offline, could"
- + " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"));
- return;
- }
- if (loc.getServerName() == null) {
- future.completeExceptionally(new NoServerForRegionException(
- String.format("No server address listed for region '%s', %s='%s'",
- info.getRegionNameAsString(), rowNameInErrorMsg, Bytes.toStringBinary(row))));
- return;
- }
- otherCheck.accept(loc);
- addToCache(loc);
- future.complete(loc);
- }
-
- private CompletableFuture<HRegionLocation> locateInMeta(TableName tableName, byte[] row) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(row) + "' in meta");
- }
- CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
- byte[] metaKey = createRegionName(tableName, row, NINES, false);
- conn.getRawTable(META_TABLE_NAME)
- .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
- .whenComplete(
- (results, error) -> onScanComplete(future, tableName, row, results, error, "row", loc -> {
- }));
- return future;
- }
-
- private CompletableFuture<HRegionLocation> locateRegion(TableName tableName, byte[] row) {
- HRegionLocation loc = locateInCache(tableName, row);
- if (loc != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
- + Bytes.toStringBinary(row) + "'");
- }
- return CompletableFuture.completedFuture(loc);
- }
- return locateInMeta(tableName, row);
- }
-
private CompletableFuture<HRegionLocation> withTimeout(CompletableFuture<HRegionLocation> future,
long timeoutNs, Supplier<String> timeoutMsg) {
if (future.isDone() || timeoutNs <= 0) {
return future;
}
- CompletableFuture<HRegionLocation> timeoutFuture = new CompletableFuture<>();
- Timeout timeoutTask = retryTimer.newTimeout(
- t -> timeoutFuture.completeExceptionally(new TimeoutIOException(timeoutMsg.get())), timeoutNs,
- TimeUnit.NANOSECONDS);
- future.whenComplete((loc, error) -> {
- timeoutTask.cancel();
- if (error != null) {
- timeoutFuture.completeExceptionally(error);
- } else {
- timeoutFuture.complete(loc);
+ Timeout timeoutTask = retryTimer.newTimeout(t -> {
+ if (future.isDone()) {
+ return;
+ }
+ future.completeExceptionally(new TimeoutIOException(timeoutMsg.get()));
+ }, timeoutNs, TimeUnit.NANOSECONDS);
+ return future.whenComplete((loc, error) -> {
+ if (error.getClass() != TimeoutIOException.class) {
+ // cancel timeout task if we are not completed by it.
+ timeoutTask.cancel();
}
});
- return timeoutFuture;
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
long timeoutNs) {
CompletableFuture<HRegionLocation> future =
- tableName.equals(META_TABLE_NAME) ? locateMetaRegion() : locateRegion(tableName, row);
+ tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation()
+ : nonMetaRegionLocator.getRegionLocation(tableName, row);
return withTimeout(future, timeoutNs,
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
+ "ms) waiting for region location for " + tableName + ", row='"
+ Bytes.toStringBinary(row) + "'");
}
- private HRegionLocation locatePreviousInCache(TableName tableName,
- byte[] startRowOfCurrentRegion) {
- ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = cache.get(tableName);
- if (tableCache == null) {
- return null;
- }
- Map.Entry<byte[], HRegionLocation> entry;
- if (isEmptyStopRow(startRowOfCurrentRegion)) {
- entry = tableCache.lastEntry();
- } else {
- entry = tableCache.lowerEntry(startRowOfCurrentRegion);
- }
- if (entry == null) {
- return null;
- }
- HRegionLocation loc = entry.getValue();
- if (Bytes.equals(loc.getRegionInfo().getEndKey(), startRowOfCurrentRegion)) {
- return loc;
- } else {
- return null;
- }
- }
-
- private CompletableFuture<HRegionLocation> locatePreviousInMeta(TableName tableName,
- byte[] startRowOfCurrentRegion) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Try locate '" + tableName + "', startRowOfCurrentRegion='"
- + Bytes.toStringBinary(startRowOfCurrentRegion) + "' in meta");
- }
- byte[] metaKey;
- if (isEmptyStopRow(startRowOfCurrentRegion)) {
- byte[] binaryTableName = tableName.getName();
- metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
- } else {
- metaKey = createRegionName(tableName, startRowOfCurrentRegion, ZEROES, false);
- }
- CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
- conn.getRawTable(META_TABLE_NAME)
- .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
- .whenComplete((results, error) -> onScanComplete(future, tableName, startRowOfCurrentRegion,
- results, error, "startRowOfCurrentRegion", loc -> {
- HRegionInfo info = loc.getRegionInfo();
- if (!Bytes.equals(info.getEndKey(), startRowOfCurrentRegion)) {
- future.completeExceptionally(new IOException("The end key of '"
- + info.getRegionNameAsString() + "' is '" + Bytes.toStringBinary(info.getEndKey())
- + "', expected '" + Bytes.toStringBinary(startRowOfCurrentRegion) + "'"));
- }
- }));
- return future;
- }
-
- private CompletableFuture<HRegionLocation> locatePreviousRegion(TableName tableName,
- byte[] startRowOfCurrentRegion) {
- HRegionLocation loc = locatePreviousInCache(tableName, startRowOfCurrentRegion);
- if (loc != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found " + loc + " in cache for '" + tableName + "', startRowOfCurrentRegion='"
- + Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
- }
- return CompletableFuture.completedFuture(loc);
- }
- return locatePreviousInMeta(tableName, startRowOfCurrentRegion);
- }
-
/**
- * Locate the previous region using the current regions start key. Used for reverse scan.
- * <p>
- * TODO: need to deal with region merge where the startRowOfCurrentRegion will not be the endRow
- * of a region.
+ * Locate the previous region using the current regions start key. Used for reverse scan as the
+ * end key is not included in a region so we need to treat it differently.
*/
CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
byte[] startRowOfCurrentRegion, long timeoutNs) {
- CompletableFuture<HRegionLocation> future = tableName.equals(META_TABLE_NAME)
- ? locateMetaRegion() : locatePreviousRegion(tableName, startRowOfCurrentRegion);
+ // meta region can not be split right now so we call the same method as getRegionLocation.
+ // Change it later if the meta table can have more than one regions.
+ CompletableFuture<HRegionLocation> future =
+ tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation()
+ : nonMetaRegionLocator.getPreviousRegionLocation(tableName, startRowOfCurrentRegion);
return withTimeout(future, timeoutNs,
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
+ "ms) waiting for region location for " + tableName + ", startRowOfCurrentRegion='"
+ Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
}
- private boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
+ static boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
// Do not need to update if no such location, or the location is newer, or the location is not
// same with us
return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum()
&& oldLoc.getServerName().equals(loc.getServerName());
}
- private void updateCachedLoation(HRegionLocation loc, Throwable exception,
+ static void updateCachedLoation(HRegionLocation loc, Throwable exception,
Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
@@ -445,34 +153,9 @@ class AsyncRegionLocator {
void updateCachedLocation(HRegionLocation loc, Throwable exception) {
if (loc.getRegionInfo().isMetaTable()) {
- updateCachedLoation(loc, exception, l -> metaRegionLocation.get(), newLoc -> {
- for (;;) {
- HRegionLocation oldLoc = metaRegionLocation.get();
- if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum()
- || oldLoc.getServerName().equals(newLoc.getServerName()))) {
- return;
- }
- if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) {
- return;
- }
- }
- }, l -> {
- for (;;) {
- HRegionLocation oldLoc = metaRegionLocation.get();
- if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) {
- return;
- }
- }
- });
+ metaRegionLocator.updateCachedLocation(loc, exception);
} else {
- updateCachedLoation(loc, exception, l -> {
- ConcurrentNavigableMap<byte[], HRegionLocation> tableCache =
- cache.get(l.getRegionInfo().getTable());
- if (tableCache == null) {
- return null;
- }
- return tableCache.get(l.getRegionInfo().getStartKey());
- }, this::addToCache, this::removeFromCache);
+ nonMetaRegionLocator.updateCachedLocation(loc, exception);
}
}
@@ -480,6 +163,10 @@ class AsyncRegionLocator {
if (LOG.isDebugEnabled()) {
LOG.debug("Clear meta cache for " + tableName);
}
- cache.remove(tableName);
+ if (tableName.equals(META_TABLE_NAME)) {
+ metaRegionLocator.clearCache();
+ } else {
+ nonMetaRegionLocator.clearCache(tableName);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
index fe988aa..d24501d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
@@ -97,7 +98,7 @@ public class TestAsyncGetMultiThread {
@AfterClass
public static void tearDown() throws Exception {
- CONN.close();
+ IOUtils.closeQuietly(CONN);
TEST_UTIL.shutdownMiniCluster();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
new file mode 100644
index 0000000..f3aa26b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.IntStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncNonMetaRegionLocator {
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("async");
+
+ private static byte[] FAMILY = Bytes.toBytes("cf");
+
+ private static AsyncConnectionImpl CONN;
+
+ private static AsyncNonMetaRegionLocator LOCATOR;
+
+ private static byte[][] SPLIT_KEYS;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL.startMiniCluster(3);
+ TEST_UTIL.getAdmin().setBalancerRunning(false, true);
+ CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
+ LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+ SPLIT_KEYS = new byte[8][];
+ for (int i = 111; i < 999; i += 111) {
+ SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ IOUtils.closeQuietly(CONN);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @After
+ public void tearDownAfterTest() throws IOException {
+ Admin admin = TEST_UTIL.getAdmin();
+ if (admin.tableExists(TABLE_NAME)) {
+ if (admin.isTableEnabled(TABLE_NAME)) {
+ TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
+ }
+ TEST_UTIL.getAdmin().deleteTable(TABLE_NAME);
+ }
+ LOCATOR.clearCache(TABLE_NAME);
+ }
+
+ private void createSingleRegionTable() throws IOException, InterruptedException {
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ }
+
+ @Test
+ public void testNoTable() throws InterruptedException {
+ try {
+ LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
+ } catch (ExecutionException e) {
+ assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+ }
+ try {
+ LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
+ } catch (ExecutionException e) {
+ assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+ }
+ }
+
+ @Test
+ public void testDisableTable() throws IOException, InterruptedException {
+ createSingleRegionTable();
+ TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
+ try {
+ LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
+ } catch (ExecutionException e) {
+ assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+ }
+ try {
+ LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
+ } catch (ExecutionException e) {
+ assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+ }
+ }
+
+ private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName,
+ HRegionLocation loc) {
+ HRegionInfo info = loc.getRegionInfo();
+ assertEquals(TABLE_NAME, info.getTable());
+ assertArrayEquals(startKey, info.getStartKey());
+ assertArrayEquals(endKey, info.getEndKey());
+ assertEquals(serverName, loc.getServerName());
+ }
+
+ @Test
+ public void testSingleRegionTable() throws IOException, InterruptedException, ExecutionException {
+ createSingleRegionTable();
+ ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
+ assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+ LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+ assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+ LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+ byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
+ ThreadLocalRandom.current().nextBytes(randKey);
+ assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+ LOCATOR.getRegionLocation(TABLE_NAME, randKey).get());
+ // Use a key which is not the endKey of a region will cause error
+ try {
+ assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+ LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }).get());
+ } catch (ExecutionException e) {
+ assertThat(e.getCause(), instanceOf(IOException.class));
+ assertTrue(e.getCause().getMessage().contains("end key of"));
+ }
+ }
+
+ private void createMultiRegionTable() throws IOException, InterruptedException {
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ }
+
+ private static byte[][] getStartKeys() {
+ byte[][] startKeys = new byte[SPLIT_KEYS.length + 1][];
+ startKeys[0] = EMPTY_START_ROW;
+ System.arraycopy(SPLIT_KEYS, 0, startKeys, 1, SPLIT_KEYS.length);
+ return startKeys;
+ }
+
+ private static byte[][] getEndKeys() {
+ byte[][] endKeys = Arrays.copyOf(SPLIT_KEYS, SPLIT_KEYS.length + 1);
+ endKeys[endKeys.length - 1] = EMPTY_START_ROW;
+ return endKeys;
+ }
+
+ @Test
+ public void testMultiRegionTable() throws IOException, InterruptedException {
+ createMultiRegionTable();
+ byte[][] startKeys = getStartKeys();
+ ServerName[] serverNames = new ServerName[startKeys.length];
+ TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
+ .forEach(rs -> {
+ rs.getOnlineRegions(TABLE_NAME).forEach(r -> {
+ serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
+ Bytes::compareTo)] = rs.getServerName();
+ });
+ });
+ IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
+ try {
+ assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
+ serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i]).get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ LOCATOR.clearCache(TABLE_NAME);
+ byte[][] endKeys = getEndKeys();
+ IntStream.range(0, 2).forEach(
+ n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
+ try {
+ assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
+ LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i]).get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ }
+
+ @Test
+ public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
+ createSingleRegionTable();
+ ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
+ HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
+ assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
+ ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
+ .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
+ .findAny().get();
+
+ TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegionInfo().getEncodedName()),
+ Bytes.toBytes(newServerName.getServerName()));
+ while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName()
+ .equals(newServerName)) {
+ Thread.sleep(100);
+ }
+ // Should be same as it is in cache
+ assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+ LOCATOR.updateCachedLocation(loc, null);
+ // null error will not trigger a cache cleanup
+ assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+ LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
+ assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
+ LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
new file mode 100644
index 0000000..e82703b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.apache.hadoop.hbase.client.AsyncNonMetaRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("async");
+
+ private static byte[] FAMILY = Bytes.toBytes("cf");
+
+ private static AsyncConnectionImpl CONN;
+
+ private static AsyncNonMetaRegionLocator LOCATOR;
+
+ private static byte[][] SPLIT_KEYS;
+
+ private static int MAX_ALLOWED = 2;
+
+ private static AtomicInteger CONCURRENCY = new AtomicInteger(0);
+
+ private static AtomicInteger MAX_CONCURRENCY = new AtomicInteger(0);
+
+ public static final class CountingRegionObserver extends BaseRegionObserver {
+
+ @Override
+ public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
+ RegionScanner s) throws IOException {
+ if (e.getEnvironment().getRegionInfo().isMetaTable()) {
+ int concurrency = CONCURRENCY.incrementAndGet();
+ for (;;) {
+ int max = MAX_CONCURRENCY.get();
+ if (concurrency <= max) {
+ break;
+ }
+ if (MAX_CONCURRENCY.compareAndSet(max, concurrency)) {
+ break;
+ }
+ }
+ Threads.sleepWithoutInterrupt(10);
+ }
+ return s;
+ }
+
+ @Override
+ public void postScannerClose(ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s)
+ throws IOException {
+ if (e.getEnvironment().getRegionInfo().isMetaTable()) {
+ CONCURRENCY.decrementAndGet();
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.set(REGION_COPROCESSOR_CONF_KEY, CountingRegionObserver.class.getName());
+ conf.setInt(MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, MAX_ALLOWED);
+ TEST_UTIL.startMiniCluster(3);
+ TEST_UTIL.getAdmin().setBalancerRunning(false, true);
+ CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
+ LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+ SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
+ .toArray(byte[][]::new);
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ IOUtils.closeQuietly(CONN);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private void assertLocs(List<CompletableFuture<HRegionLocation>> futures)
+ throws InterruptedException, ExecutionException {
+ assertEquals(256, futures.size());
+ for (int i = 0; i < futures.size(); i++) {
+ HRegionLocation loc = futures.get(i).get();
+ if (i == 0) {
+ assertTrue(isEmptyStartRow(loc.getRegionInfo().getStartKey()));
+ } else {
+ assertEquals(String.format("%02x", i), Bytes.toString(loc.getRegionInfo().getStartKey()));
+ }
+ if (i == futures.size() - 1) {
+ assertTrue(isEmptyStopRow(loc.getRegionInfo().getEndKey()));
+ } else {
+ assertEquals(String.format("%02x", i + 1), Bytes.toString(loc.getRegionInfo().getEndKey()));
+ }
+ }
+ }
+
+ @Test
+ public void test() throws InterruptedException, ExecutionException {
+ List<CompletableFuture<HRegionLocation>> futures = IntStream.range(0, 128)
+ .mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
+ .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r)).collect(toCollection(ArrayList::new));
+ futures.addAll(IntStream.range(129, 257)
+ .mapToObj(i -> i < 256 ? Bytes.toBytes(String.format("%02x", i)) : EMPTY_START_ROW)
+ .map(r -> LOCATOR.getPreviousRegionLocation(TABLE_NAME, r)).collect(toList()));
+ assertLocs(futures);
+ assertTrue(MAX_CONCURRENCY.get() <= MAX_ALLOWED);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
deleted file mode 100644
index a679192..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
-import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.IntStream;
-
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncRegionLocator {
-
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
- private static TableName TABLE_NAME = TableName.valueOf("async");
-
- private static byte[] FAMILY = Bytes.toBytes("cf");
-
- private static AsyncConnectionImpl CONN;
-
- private static AsyncRegionLocator LOCATOR;
-
- private static byte[][] SPLIT_KEYS;
-
- @BeforeClass
- public static void setUp() throws Exception {
- TEST_UTIL.startMiniCluster(3);
- TEST_UTIL.getAdmin().setBalancerRunning(false, true);
- CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
- LOCATOR = CONN.getLocator();
- SPLIT_KEYS = new byte[8][];
- for (int i = 111; i < 999; i += 111) {
- SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
- }
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- CONN.close();
- TEST_UTIL.shutdownMiniCluster();
- }
-
- @After
- public void tearDownAfterTest() throws IOException {
- Admin admin = TEST_UTIL.getAdmin();
- if (admin.tableExists(TABLE_NAME)) {
- if (admin.isTableEnabled(TABLE_NAME)) {
- TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
- }
- TEST_UTIL.getAdmin().deleteTable(TABLE_NAME);
- }
- LOCATOR.clearCache(TABLE_NAME);
- }
-
- private void createSingleRegionTable() throws IOException, InterruptedException {
- TEST_UTIL.createTable(TABLE_NAME, FAMILY);
- TEST_UTIL.waitTableAvailable(TABLE_NAME);
- }
-
- @Test
- public void testNoTable() throws InterruptedException {
- try {
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get();
- } catch (ExecutionException e) {
- assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
- }
- try {
- LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW, 0L).get();
- } catch (ExecutionException e) {
- assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
- }
- }
-
- @Test
- public void testDisableTable() throws IOException, InterruptedException {
- createSingleRegionTable();
- TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
- try {
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get();
- } catch (ExecutionException e) {
- assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
- }
- try {
- LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW, 0L).get();
- } catch (ExecutionException e) {
- assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
- }
- }
-
- private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName,
- HRegionLocation loc) {
- HRegionInfo info = loc.getRegionInfo();
- assertEquals(TABLE_NAME, info.getTable());
- assertArrayEquals(startKey, info.getStartKey());
- assertArrayEquals(endKey, info.getEndKey());
- assertEquals(serverName, loc.getServerName());
- }
-
- @Test
- public void testSingleRegionTable() throws IOException, InterruptedException, ExecutionException {
- createSingleRegionTable();
- ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
- assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
- assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
- LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
- byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
- ThreadLocalRandom.current().nextBytes(randKey);
- assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
- LOCATOR.getRegionLocation(TABLE_NAME, randKey, 0L).get());
- // Use a key which is not the endKey of a region will cause error
- try {
- assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
- LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }, 0L).get());
- } catch (ExecutionException e) {
- assertThat(e.getCause(), instanceOf(IOException.class));
- assertTrue(e.getCause().getMessage().contains("end key of"));
- }
- }
-
- private void createMultiRegionTable() throws IOException, InterruptedException {
- TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
- TEST_UTIL.waitTableAvailable(TABLE_NAME);
- }
-
- private static byte[][] getStartKeys() {
- byte[][] startKeys = new byte[SPLIT_KEYS.length + 1][];
- startKeys[0] = EMPTY_START_ROW;
- System.arraycopy(SPLIT_KEYS, 0, startKeys, 1, SPLIT_KEYS.length);
- return startKeys;
- }
-
- private static byte[][] getEndKeys() {
- byte[][] endKeys = Arrays.copyOf(SPLIT_KEYS, SPLIT_KEYS.length + 1);
- endKeys[endKeys.length - 1] = EMPTY_START_ROW;
- return endKeys;
- }
-
- @Test
- public void testMultiRegionTable() throws IOException, InterruptedException {
- createMultiRegionTable();
- byte[][] startKeys = getStartKeys();
- ServerName[] serverNames = new ServerName[startKeys.length];
- TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
- .forEach(rs -> {
- rs.getOnlineRegions(TABLE_NAME).forEach(r -> {
- serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
- Bytes::compareTo)] = rs.getServerName();
- });
- });
- IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
- try {
- assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
- serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], 0L).get());
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- }));
- LOCATOR.clearCache(TABLE_NAME);
- byte[][] endKeys = getEndKeys();
- IntStream.range(0, 2).forEach(
- n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
- try {
- assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
- LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i], 0L).get());
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- }));
- }
-
- @Test
- public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
- createSingleRegionTable();
- ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
- HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get();
- assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
- ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
- .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
- .findAny().get();
-
- TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegionInfo().getEncodedName()),
- Bytes.toBytes(newServerName.getServerName()));
- while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName()
- .equals(newServerName)) {
- Thread.sleep(100);
- }
- // Should be same as it is in cache
- assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
- LOCATOR.updateCachedLocation(loc, null);
- // null error will not trigger a cache cleanup
- assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
- LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
- assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
index 2a902a6..40190cb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -90,7 +91,7 @@ public class TestAsyncRegionLocatorTimeout {
@AfterClass
public static void tearDown() throws Exception {
- CONN.close();
+ IOUtils.closeQuietly(CONN);
TEST_UTIL.shutdownMiniCluster();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index 7a85727..bb6cc2d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.IntStream;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -103,7 +104,7 @@ public class TestAsyncTable {
@AfterClass
public static void tearDownAfterClass() throws Exception {
- ASYNC_CONN.close();
+ IOUtils.closeQuietly(ASYNC_CONN);
TEST_UTIL.shutdownMiniCluster();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index c8e1c7a..ea999f9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.security.User;
@@ -88,7 +89,7 @@ public class TestAsyncTableNoncedRetry {
@AfterClass
public static void tearDownAfterClass() throws Exception {
- ASYNC_CONN.close();
+ IOUtils.closeQuietly(ASYNC_CONN);
TEST_UTIL.shutdownMiniCluster();
}