You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/04 06:40:34 UTC
[08/15] hbase git commit: HBASE-17356 Add replica get support
HBASE-17356 Add replica get support
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/db66e6cc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/db66e6cc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/db66e6cc
Branch: refs/heads/HBASE-21512
Commit: db66e6cc9e1c6ea027631388aba688cb623b7d0a
Parents: e4b6b4a
Author: zhangduo <zh...@apache.org>
Authored: Tue Jan 1 21:59:37 2019 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Jan 3 08:38:20 2019 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/RegionLocations.java | 30 +-
.../client/AsyncBatchRpcRetryingCaller.java | 114 +-
.../client/AsyncConnectionConfiguration.java | 12 +
.../hbase/client/AsyncConnectionImpl.java | 1 -
.../hbase/client/AsyncMetaRegionLocator.java | 125 +-
.../hbase/client/AsyncNonMetaRegionLocator.java | 291 +--
.../hadoop/hbase/client/AsyncRegionLocator.java | 129 +-
.../hbase/client/AsyncRegionLocatorHelper.java | 147 ++
.../hbase/client/AsyncRpcRetryingCaller.java | 15 +-
.../client/AsyncRpcRetryingCallerFactory.java | 55 +-
.../AsyncSingleRequestRpcRetryingCaller.java | 71 +-
.../hbase/client/AsyncTableRegionLocator.java | 28 +-
.../client/AsyncTableRegionLocatorImpl.java | 6 +-
.../hbase/client/ConnectionConfiguration.java | 5 +-
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 2033 +++++++++---------
.../hadoop/hbase/client/RawAsyncTableImpl.java | 208 +-
.../apache/hadoop/hbase/util/FutureUtils.java | 60 +
.../hbase/client/RegionReplicaTestHelper.java | 161 ++
.../client/TestAsyncMetaRegionLocator.java | 55 +-
.../client/TestAsyncNonMetaRegionLocator.java | 126 +-
...syncNonMetaRegionLocatorConcurrenyLimit.java | 20 +-
...TestAsyncSingleRequestRpcRetryingCaller.java | 56 +-
.../client/TestAsyncTableLocatePrefetch.java | 4 +-
.../client/TestAsyncTableRegionReplicasGet.java | 204 ++
.../hbase/client/TestZKAsyncRegistry.java | 44 +-
25 files changed, 2366 insertions(+), 1634 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
index fd6f3c7..f98bf03 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
@@ -56,8 +56,8 @@ public class RegionLocations {
int index = 0;
for (HRegionLocation loc : locations) {
if (loc != null) {
- if (loc.getRegionInfo().getReplicaId() >= maxReplicaId) {
- maxReplicaId = loc.getRegionInfo().getReplicaId();
+ if (loc.getRegion().getReplicaId() >= maxReplicaId) {
+ maxReplicaId = loc.getRegion().getReplicaId();
maxReplicaIdIndex = index;
}
}
@@ -72,7 +72,7 @@ public class RegionLocations {
this.locations = new HRegionLocation[maxReplicaId + 1];
for (HRegionLocation loc : locations) {
if (loc != null) {
- this.locations[loc.getRegionInfo().getReplicaId()] = loc;
+ this.locations[loc.getRegion().getReplicaId()] = loc;
}
}
}
@@ -146,7 +146,7 @@ public class RegionLocations {
public RegionLocations remove(HRegionLocation location) {
if (location == null) return this;
if (location.getRegion() == null) return this;
- int replicaId = location.getRegionInfo().getReplicaId();
+ int replicaId = location.getRegion().getReplicaId();
if (replicaId >= locations.length) return this;
// check whether something to remove. HRL.compareTo() compares ONLY the
@@ -203,14 +203,14 @@ public class RegionLocations {
// in case of region replication going down, we might have a leak here.
int max = other.locations.length;
- HRegionInfo regionInfo = null;
+ RegionInfo regionInfo = null;
for (int i = 0; i < max; i++) {
HRegionLocation thisLoc = this.getRegionLocation(i);
HRegionLocation otherLoc = other.getRegionLocation(i);
- if (regionInfo == null && otherLoc != null && otherLoc.getRegionInfo() != null) {
+ if (regionInfo == null && otherLoc != null && otherLoc.getRegion() != null) {
// regionInfo is the first non-null HRI from other RegionLocations. We use it to ensure that
// all replica region infos belong to the same region with same region id.
- regionInfo = otherLoc.getRegionInfo();
+ regionInfo = otherLoc.getRegion();
}
HRegionLocation selectedLoc = selectRegionLocation(thisLoc,
@@ -232,7 +232,7 @@ public class RegionLocations {
for (int i=0; i < newLocations.length; i++) {
if (newLocations[i] != null) {
if (!RegionReplicaUtil.isReplicasForSameRegion(regionInfo,
- newLocations[i].getRegionInfo())) {
+ newLocations[i].getRegion())) {
newLocations[i] = null;
}
}
@@ -273,9 +273,9 @@ public class RegionLocations {
boolean checkForEquals, boolean force) {
assert location != null;
- int replicaId = location.getRegionInfo().getReplicaId();
+ int replicaId = location.getRegion().getReplicaId();
- HRegionLocation oldLoc = getRegionLocation(location.getRegionInfo().getReplicaId());
+ HRegionLocation oldLoc = getRegionLocation(location.getRegion().getReplicaId());
HRegionLocation selectedLoc = selectRegionLocation(oldLoc, location,
checkForEquals, force);
@@ -288,8 +288,8 @@ public class RegionLocations {
// ensure that all replicas share the same start code. Otherwise delete them
for (int i=0; i < newLocations.length; i++) {
if (newLocations[i] != null) {
- if (!RegionReplicaUtil.isReplicasForSameRegion(location.getRegionInfo(),
- newLocations[i].getRegionInfo())) {
+ if (!RegionReplicaUtil.isReplicasForSameRegion(location.getRegion(),
+ newLocations[i].getRegion())) {
newLocations[i] = null;
}
}
@@ -317,8 +317,8 @@ public class RegionLocations {
public HRegionLocation getRegionLocationByRegionName(byte[] regionName) {
for (HRegionLocation loc : locations) {
if (loc != null) {
- if (Bytes.equals(loc.getRegionInfo().getRegionName(), regionName)
- || Bytes.equals(loc.getRegionInfo().getEncodedNameAsBytes(), regionName)) {
+ if (Bytes.equals(loc.getRegion().getRegionName(), regionName)
+ || Bytes.equals(loc.getRegion().getEncodedNameAsBytes(), regionName)) {
return loc;
}
}
@@ -331,7 +331,7 @@ public class RegionLocations {
}
public HRegionLocation getDefaultRegionLocation() {
- return locations[HRegionInfo.DEFAULT_REPLICA_ID];
+ return locations[RegionInfo.DEFAULT_REPLICA_ID];
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 51b89a9..e268b2e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -23,8 +23,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.ArrayList;
@@ -43,24 +42,26 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Retry caller for batch.
@@ -121,10 +122,10 @@ class AsyncBatchRpcRetryingCaller<T> {
private static final class ServerRequest {
public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
- new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+ new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
public void addAction(HRegionLocation loc, Action action) {
- computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(),
+ computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(),
() -> new RegionRequest(loc)).actions.add(action);
}
}
@@ -173,11 +174,10 @@ class AsyncBatchRpcRetryingCaller<T> {
Throwable error, ServerName serverName) {
if (tries > startLogErrorsCnt) {
String regions =
- regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'")
- .collect(Collectors.joining(",", "[", "]"));
- LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName
- + " failed, tries=" + tries,
- error);
+ regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'")
+ .collect(Collectors.joining(",", "[", "]"));
+ LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName +
+ " failed, tries=" + tries, error);
}
}
@@ -191,7 +191,7 @@ class AsyncBatchRpcRetryingCaller<T> {
errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>());
}
errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
- getExtraContextForError(serverName)));
+ getExtraContextForError(serverName)));
}
private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) {
@@ -204,7 +204,7 @@ class AsyncBatchRpcRetryingCaller<T> {
return;
}
ThrowableWithExtraContext errorWithCtx =
- new ThrowableWithExtraContext(error, currentTime, extras);
+ new ThrowableWithExtraContext(error, currentTime, extras);
List<ThrowableWithExtraContext> errors = removeErrors(action);
if (errors == null) {
errors = Collections.singletonList(errorWithCtx);
@@ -227,7 +227,7 @@ class AsyncBatchRpcRetryingCaller<T> {
return;
}
future.completeExceptionally(new RetriesExhaustedException(tries,
- Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
+ Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
});
}
@@ -242,9 +242,9 @@ class AsyncBatchRpcRetryingCaller<T> {
// multiRequestBuilder will be populated with region actions.
// rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
// action list.
- RequestConverter.buildNoDataRegionActions(entry.getKey(),
- entry.getValue().actions, cells, multiRequestBuilder, regionActionBuilder, actionBuilder,
- mutationBuilder, nonceGroup, rowMutationsIndexMap);
+ RequestConverter.buildNoDataRegionActions(entry.getKey(), entry.getValue().actions, cells,
+ multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
+ rowMutationsIndexMap);
}
return multiRequestBuilder.build();
}
@@ -254,15 +254,15 @@ class AsyncBatchRpcRetryingCaller<T> {
RegionResult regionResult, List<Action> failedActions, Throwable regionException) {
Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
if (result == null) {
- LOG.error("Server " + serverName + " sent us neither result nor exception for row '"
- + Bytes.toStringBinary(action.getAction().getRow()) + "' of "
- + regionReq.loc.getRegionInfo().getRegionNameAsString());
+ LOG.error("Server " + serverName + " sent us neither result nor exception for row '" +
+ Bytes.toStringBinary(action.getAction().getRow()) + "' of " +
+ regionReq.loc.getRegion().getRegionNameAsString());
addError(action, new RuntimeException("Invalid response"), serverName);
failedActions.add(action);
} else if (result instanceof Throwable) {
Throwable error = translateException((Throwable) result);
logException(tries, () -> Stream.of(regionReq), error, serverName);
- conn.getLocator().updateCachedLocation(regionReq.loc, error);
+ conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
getExtraContextForError(serverName));
@@ -281,20 +281,19 @@ class AsyncBatchRpcRetryingCaller<T> {
RegionResult regionResult = resp.getResults().get(rn);
Throwable regionException = resp.getException(rn);
if (regionResult != null) {
- regionReq.actions.forEach(
- action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions,
- regionException));
+ regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName,
+ regionResult, failedActions, regionException));
} else {
Throwable error;
if (regionException == null) {
- LOG.error(
- "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
+ LOG
+ .error("Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
error = new RuntimeException("Invalid response");
} else {
error = translateException(regionException);
}
logException(tries, () -> Stream.of(regionReq), error, serverName);
- conn.getLocator().updateCachedLocation(regionReq.loc, error);
+ conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failAll(regionReq.actions.stream(), tries, error, serverName);
return;
@@ -314,8 +313,7 @@ class AsyncBatchRpcRetryingCaller<T> {
remainingNs = remainingTimeNs();
if (remainingNs <= 0) {
failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream())
- .flatMap(r -> r.actions.stream()),
- tries);
+ .flatMap(r -> r.actions.stream()), tries);
return;
}
} else {
@@ -366,15 +364,15 @@ class AsyncBatchRpcRetryingCaller<T> {
ServerName serverName) {
Throwable error = translateException(t);
logException(tries, () -> actionsByRegion.values().stream(), error, serverName);
- actionsByRegion
- .forEach((rn, regionReq) -> conn.getLocator().updateCachedLocation(regionReq.loc, error));
+ actionsByRegion.forEach(
+ (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error));
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,
serverName);
return;
}
List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
- .collect(Collectors.toList());
+ .collect(Collectors.toList());
addError(copiedActions, error, serverName);
tryResubmit(copiedActions.stream(), tries);
}
@@ -407,30 +405,30 @@ class AsyncBatchRpcRetryingCaller<T> {
}
ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
- CompletableFuture.allOf(actions
- .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
- RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
- if (error != null) {
- error = translateException(error);
- if (error instanceof DoNotRetryIOException) {
- failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
- return;
- }
- addError(action, error, null);
- locateFailed.add(action);
- } else {
- computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new)
- .addAction(loc, action);
+ addListener(CompletableFuture.allOf(actions
+ .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
+ RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
+ if (error != null) {
+ error = translateException(error);
+ if (error instanceof DoNotRetryIOException) {
+ failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
+ return;
}
- }))
- .toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {
- if (!actionsByServer.isEmpty()) {
- send(actionsByServer, tries);
- }
- if (!locateFailed.isEmpty()) {
- tryResubmit(locateFailed.stream(), tries);
+ addError(action, error, null);
+ locateFailed.add(action);
+ } else {
+ computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc,
+ action);
}
- });
+ }))
+ .toArray(CompletableFuture[]::new)), (v, r) -> {
+ if (!actionsByServer.isEmpty()) {
+ send(actionsByServer, tries);
+ }
+ if (!locateFailed.isEmpty()) {
+ tryResubmit(locateFailed.stream(), tries);
+ }
+ });
}
public List<CompletableFuture<T>> call() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 915e9dd..fa051a5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -39,6 +39,8 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
@@ -94,6 +96,10 @@ class AsyncConnectionConfiguration {
private final long writeBufferPeriodicFlushTimeoutNs;
+ // this is for supporting region replica get, if the primary does not finished within this
+ // timeout, we will send request to secondaries.
+ private final long primaryCallTimeoutNs;
+
@SuppressWarnings("deprecation")
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
@@ -124,6 +130,8 @@ class AsyncConnectionConfiguration {
this.writeBufferPeriodicFlushTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
+ this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
+ conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT));
}
long getMetaOperationTimeoutNs() {
@@ -181,4 +189,8 @@ class AsyncConnectionConfiguration {
long getWriteBufferPeriodicFlushTimeoutNs() {
return writeBufferPeriodicFlushTimeoutNs;
}
+
+ long getPrimaryCallTimeoutNs() {
+ return primaryCallTimeoutNs;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 078395b..361d5b2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -152,7 +152,6 @@ class AsyncConnectionImpl implements AsyncConnection {
}
// we will override this method for testing retry caller, so do not remove this method.
- @VisibleForTesting
AsyncRegionLocator getLocator() {
return locator;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
index 06b5b57..9fef15d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
@@ -17,11 +17,16 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocator.*;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,43 +41,43 @@ class AsyncMetaRegionLocator {
private final AsyncRegistry registry;
- private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
+ private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>();
- private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture =
- new AtomicReference<>();
+ private final AtomicReference<CompletableFuture<RegionLocations>> metaRelocateFuture =
+ new AtomicReference<>();
AsyncMetaRegionLocator(AsyncRegistry registry) {
this.registry = registry;
}
- CompletableFuture<HRegionLocation> getRegionLocation(boolean reload) {
+ /**
+ * Get the region locations for meta region. If the location for the given replica is not
+ * available in the cached locations, then fetch from the HBase cluster.
+ * <p/>
+ * The <code>replicaId</code> parameter is important. If the region replication config for meta
+ * region is changed, then the cached region locations may not have the locations for new
+ * replicas. If we do not check the location for the given replica, we will always return the
+ * cached region locations and cause an infinite loop.
+ */
+ CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) {
for (;;) {
if (!reload) {
- HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
- if (metaRegionLocation != null) {
- return CompletableFuture.completedFuture(metaRegionLocation);
+ RegionLocations locs = this.metaRegionLocations.get();
+ if (isGood(locs, replicaId)) {
+ return CompletableFuture.completedFuture(locs);
}
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("Meta region location cache is null, try fetching from registry.");
- }
+ LOG.trace("Meta region location cache is null, try fetching from registry.");
if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Start fetching meta region location from registry.");
- }
- CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+ LOG.debug("Start fetching meta region location from registry.");
+ CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
registry.getMetaRegionLocation().whenComplete((locs, error) -> {
if (error != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to fetch meta region location from registry", error);
- }
+ LOG.debug("Failed to fetch meta region location from registry", error);
metaRelocateFuture.getAndSet(null).completeExceptionally(error);
return;
}
- HRegionLocation loc = locs.getDefaultRegionLocation();
- if (LOG.isDebugEnabled()) {
- LOG.debug("The fetched meta region location is " + loc);
- }
+ LOG.debug("The fetched meta region location is {}" + locs);
// Here we update cache before reset future, so it is possible that someone can get a
// stale value. Consider this:
// 1. update cache
@@ -82,12 +87,12 @@ class AsyncMetaRegionLocator {
// cleared in step 2.
// But we do not think it is a big deal as it rarely happens, and even if it happens, the
// caller will retry again later, no correctness problems.
- this.metaRegionLocation.set(loc);
+ this.metaRegionLocations.set(locs);
metaRelocateFuture.set(null);
- future.complete(loc);
+ future.complete(locs);
});
} else {
- CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+ CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
if (future != null) {
return future;
}
@@ -95,30 +100,56 @@ class AsyncMetaRegionLocator {
}
}
- void updateCachedLocation(HRegionLocation loc, Throwable exception) {
- AsyncRegionLocator.updateCachedLocation(loc, exception, l -> metaRegionLocation.get(),
- newLoc -> {
- for (;;) {
- HRegionLocation oldLoc = metaRegionLocation.get();
- if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum() ||
- oldLoc.getServerName().equals(newLoc.getServerName()))) {
- return;
- }
- if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) {
- return;
- }
- }
- }, l -> {
- for (;;) {
- HRegionLocation oldLoc = metaRegionLocation.get();
- if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) {
- return;
- }
+ private HRegionLocation getCacheLocation(HRegionLocation loc) {
+ RegionLocations locs = metaRegionLocations.get();
+ return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
+ }
+
+ private void addLocationToCache(HRegionLocation loc) {
+ for (;;) {
+ int replicaId = loc.getRegion().getReplicaId();
+ RegionLocations oldLocs = metaRegionLocations.get();
+ if (oldLocs == null) {
+ RegionLocations newLocs = createRegionLocations(loc);
+ if (metaRegionLocations.compareAndSet(null, newLocs)) {
+ return;
}
- });
+ }
+ HRegionLocation oldLoc = oldLocs.getRegionLocation(replicaId);
+ if (oldLoc != null && (oldLoc.getSeqNum() > loc.getSeqNum() ||
+ oldLoc.getServerName().equals(loc.getServerName()))) {
+ return;
+ }
+ RegionLocations newLocs = replaceRegionLocation(oldLocs, loc);
+ if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
+ return;
+ }
+ }
+ }
+
+ private void removeLocationFromCache(HRegionLocation loc) {
+ for (;;) {
+ RegionLocations oldLocs = metaRegionLocations.get();
+ if (oldLocs == null) {
+ return;
+ }
+ HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
+ if (!canUpdateOnError(loc, oldLoc)) {
+ return;
+ }
+ RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
+ if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
+ return;
+ }
+ }
+ }
+
+ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+ AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
+ this::addLocationToCache, this::removeLocationFromCache);
}
void clearCache() {
- metaRegionLocation.set(null);
+ metaRegionLocations.set(null);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 7e3d56c..1fcfbb0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.NINES;
import static org.apache.hadoop.hbase.HConstants.ZEROES;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.mergeRegionLocations;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
@@ -39,7 +44,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
@@ -53,6 +60,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Objects;
/**
* The asynchronous locator for regions other than meta.
@@ -83,9 +91,9 @@ class AsyncNonMetaRegionLocator {
private static final class LocateRequest {
- public final byte[] row;
+ private final byte[] row;
- public final RegionLocateType locateType;
+ private final RegionLocateType locateType;
public LocateRequest(byte[] row, RegionLocateType locateType) {
this.row = row;
@@ -109,12 +117,12 @@ class AsyncNonMetaRegionLocator {
private static final class TableCache {
- public final ConcurrentNavigableMap<byte[], HRegionLocation> cache =
+ private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
- public final Set<LocateRequest> pendingRequests = new HashSet<>();
+ private final Set<LocateRequest> pendingRequests = new HashSet<>();
- public final Map<LocateRequest, CompletableFuture<HRegionLocation>> allRequests =
+ private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
new LinkedHashMap<>();
public boolean hasQuota(int max) {
@@ -133,25 +141,29 @@ class AsyncNonMetaRegionLocator {
return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
}
- public void clearCompletedRequests(Optional<HRegionLocation> location) {
- for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter =
+ public void clearCompletedRequests(Optional<RegionLocations> locations) {
+ for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
allRequests.entrySet().iterator(); iter.hasNext();) {
- Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next();
- if (tryComplete(entry.getKey(), entry.getValue(), location)) {
+ Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
+ if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
iter.remove();
}
}
}
- private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future,
- Optional<HRegionLocation> location) {
+ private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
+ Optional<RegionLocations> locations) {
if (future.isDone()) {
return true;
}
- if (!location.isPresent()) {
+ if (!locations.isPresent()) {
return false;
}
- HRegionLocation loc = location.get();
+ RegionLocations locs = locations.get();
+ HRegionLocation loc = ObjectUtils.firstNonNull(locs.getRegionLocations());
+ // we should at least have one location available, otherwise the request should fail and
+ // should not arrive here
+ assert loc != null;
boolean completed;
if (req.locateType.equals(RegionLocateType.BEFORE)) {
// for locating the row before current row, the common case is to find the previous region
@@ -166,7 +178,7 @@ class AsyncNonMetaRegionLocator {
completed = loc.getRegion().containsRow(req.row);
}
if (completed) {
- future.complete(loc);
+ future.complete(locs);
return true;
} else {
return false;
@@ -186,59 +198,59 @@ class AsyncNonMetaRegionLocator {
return computeIfAbsent(cache, tableName, TableCache::new);
}
- private void removeFromCache(HRegionLocation loc) {
- TableCache tableCache = cache.get(loc.getRegion().getTable());
- if (tableCache == null) {
- return;
+ private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
+ HRegionLocation[] locArr1 = locs1.getRegionLocations();
+ HRegionLocation[] locArr2 = locs2.getRegionLocations();
+ if (locArr1.length != locArr2.length) {
+ return false;
}
- tableCache.cache.computeIfPresent(loc.getRegion().getStartKey(), (k, oldLoc) -> {
- if (oldLoc.getSeqNum() > loc.getSeqNum() ||
- !oldLoc.getServerName().equals(loc.getServerName())) {
- return oldLoc;
+ for (int i = 0; i < locArr1.length; i++) {
+ // do not need to compare region info
+ HRegionLocation loc1 = locArr1[i];
+ HRegionLocation loc2 = locArr2[i];
+ if (loc1 == null) {
+ if (loc2 != null) {
+ return false;
+ }
+ } else {
+ if (loc2 == null) {
+ return false;
+ }
+ if (loc1.getSeqNum() != loc2.getSeqNum()) {
+ return false;
+ }
+ if (Objects.equal(loc1.getServerName(), loc2.getServerName())) {
+ return false;
+ }
}
- return null;
- });
+ }
+ return true;
}
// return whether we add this loc to cache
- private boolean addToCache(TableCache tableCache, HRegionLocation loc) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Try adding " + loc + " to cache");
- }
- byte[] startKey = loc.getRegion().getStartKey();
- HRegionLocation oldLoc = tableCache.cache.putIfAbsent(startKey, loc);
- if (oldLoc == null) {
- return true;
- }
- if (oldLoc.getSeqNum() > loc.getSeqNum() ||
- oldLoc.getServerName().equals(loc.getServerName())) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc +
- " is newer than us or has the same server name");
- }
- return false;
- }
- return loc == tableCache.cache.compute(startKey, (k, oldValue) -> {
- if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) {
- return loc;
+ private boolean addToCache(TableCache tableCache, RegionLocations locs) {
+ LOG.trace("Try adding {} to cache", locs);
+ byte[] startKey = locs.getDefaultRegionLocation().getRegion().getStartKey();
+ for (;;) {
+ RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
+ if (oldLocs == null) {
+ return true;
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue +
+ RegionLocations mergedLocs = mergeRegionLocations(locs, oldLocs);
+ if (isEqual(mergedLocs, oldLocs)) {
+ // the merged one is the same with the old one, give up
+ LOG.trace("Will not add {} to cache because the old value {} " +
" is newer than us or has the same server name." +
- " Maybe it is updated before we replace it");
+ " Maybe it is updated before we replace it", locs, oldLocs);
+ return false;
}
- return oldValue;
- });
- }
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
- justification = "Called by lambda expression")
- private void addToCache(HRegionLocation loc) {
- addToCache(getTableCache(loc.getRegion().getTable()), loc);
- LOG.trace("Try adding {} to cache", loc);
+ if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
+ return true;
+ }
+ }
}
- private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
+ private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
Throwable error) {
if (error != null) {
LOG.warn("Failed to locate region in '" + tableName + "', row='" +
@@ -246,8 +258,8 @@ class AsyncNonMetaRegionLocator {
}
Optional<LocateRequest> toSend = Optional.empty();
TableCache tableCache = getTableCache(tableName);
- if (loc != null) {
- if (!addToCache(tableCache, loc)) {
+ if (locs != null) {
+ if (!addToCache(tableCache, locs)) {
// someone is ahead of us.
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
@@ -269,7 +281,7 @@ class AsyncNonMetaRegionLocator {
future.completeExceptionally(error);
}
}
- tableCache.clearCompletedRequests(Optional.ofNullable(loc));
+ tableCache.clearCompletedRequests(Optional.ofNullable(locs));
// Remove a complete locate request in a synchronized block, so the table cache must have
// quota to send a candidate request.
toSend = tableCache.getCandidate();
@@ -286,9 +298,11 @@ class AsyncNonMetaRegionLocator {
Bytes.toStringBinary(req.row), req.locateType, locs);
}
+ // the default region location should always be presented when fetching from meta, otherwise
+ // let's fail the request.
if (locs == null || locs.getDefaultRegionLocation() == null) {
complete(tableName, req, null,
- new IOException(String.format("No location found for '%s', row='%s', locateType=%s",
+ new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
tableName, Bytes.toStringBinary(req.row), req.locateType)));
return true;
}
@@ -296,58 +310,60 @@ class AsyncNonMetaRegionLocator {
RegionInfo info = loc.getRegion();
if (info == null) {
complete(tableName, req, null,
- new IOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
+ new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
tableName, Bytes.toStringBinary(req.row), req.locateType)));
return true;
}
if (info.isSplitParent()) {
return false;
}
- if (loc.getServerName() == null) {
- complete(tableName, req, null,
- new IOException(
- String.format("No server address listed for region '%s', row='%s', locateType=%s",
- info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType)));
- return true;
- }
- complete(tableName, req, loc, null);
+ complete(tableName, req, locs, null);
return true;
}
- private HRegionLocation locateRowInCache(TableCache tableCache, TableName tableName, byte[] row) {
- Map.Entry<byte[], HRegionLocation> entry = tableCache.cache.floorEntry(row);
+ private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
+ int replicaId) {
+ Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
if (entry == null) {
return null;
}
- HRegionLocation loc = entry.getValue();
+ RegionLocations locs = entry.getValue();
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null) {
+ return null;
+ }
byte[] endKey = loc.getRegion().getEndKey();
if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
- Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT);
+ LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+ Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
}
- return loc;
+ return locs;
} else {
return null;
}
}
- private HRegionLocation locateRowBeforeInCache(TableCache tableCache, TableName tableName,
- byte[] row) {
+ private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
+ byte[] row, int replicaId) {
boolean isEmptyStopRow = isEmptyStopRow(row);
- Map.Entry<byte[], HRegionLocation> entry =
- isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
+ Map.Entry<byte[], RegionLocations> entry =
+ isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
if (entry == null) {
return null;
}
- HRegionLocation loc = entry.getValue();
+ RegionLocations locs = entry.getValue();
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null) {
+ return null;
+ }
if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
(!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
- Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE);
+ LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+ Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
}
- return loc;
+ return locs;
} else {
return null;
}
@@ -390,8 +406,8 @@ class AsyncNonMetaRegionLocator {
if (tableNotFound) {
complete(tableName, req, null, new TableNotFoundException(tableName));
} else if (!completeNormally) {
- complete(tableName, req, null, new IOException(
- "Unable to find region for " + Bytes.toStringBinary(req.row) + " in " + tableName));
+ complete(tableName, req, null, new IOException("Unable to find region for '" +
+ Bytes.toStringBinary(req.row) + "' in " + tableName));
}
}
@@ -423,13 +439,12 @@ class AsyncNonMetaRegionLocator {
continue;
}
RegionInfo info = loc.getRegion();
- if (info == null || info.isOffline() || info.isSplitParent() ||
- loc.getServerName() == null) {
+ if (info == null || info.isOffline() || info.isSplitParent()) {
continue;
}
- if (addToCache(tableCache, loc)) {
+ if (addToCache(tableCache, locs)) {
synchronized (tableCache) {
- tableCache.clearCompletedRequests(Optional.of(loc));
+ tableCache.clearCompletedRequests(Optional.of(locs));
}
}
}
@@ -438,36 +453,36 @@ class AsyncNonMetaRegionLocator {
});
}
- private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row,
- RegionLocateType locateType) {
+ private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
+ int replicaId, RegionLocateType locateType) {
return locateType.equals(RegionLocateType.BEFORE)
- ? locateRowBeforeInCache(tableCache, tableName, row)
- : locateRowInCache(tableCache, tableName, row);
+ ? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
+ : locateRowInCache(tableCache, tableName, row, replicaId);
}
// locateToPrevious is true means we will use the start key of a region to locate the region
// placed before it. Used for reverse scan. See the comment of
// AsyncRegionLocator.getPreviousRegionLocation.
- private CompletableFuture<HRegionLocation> getRegionLocationInternal(TableName tableName,
- byte[] row, RegionLocateType locateType, boolean reload) {
+ private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName,
+ byte[] row, int replicaId, RegionLocateType locateType, boolean reload) {
// AFTER should be convert to CURRENT before calling this method
assert !locateType.equals(RegionLocateType.AFTER);
TableCache tableCache = getTableCache(tableName);
if (!reload) {
- HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType);
- if (loc != null) {
- return CompletableFuture.completedFuture(loc);
+ RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
+ if (isGood(locs, replicaId)) {
+ return CompletableFuture.completedFuture(locs);
}
}
- CompletableFuture<HRegionLocation> future;
+ CompletableFuture<RegionLocations> future;
LocateRequest req;
boolean sendRequest = false;
synchronized (tableCache) {
// check again
if (!reload) {
- HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType);
- if (loc != null) {
- return CompletableFuture.completedFuture(loc);
+ RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
+ if (isGood(locs, replicaId)) {
+ return CompletableFuture.completedFuture(locs);
}
}
req = new LocateRequest(row, locateType);
@@ -487,28 +502,58 @@ class AsyncNonMetaRegionLocator {
return future;
}
- CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
- RegionLocateType locateType, boolean reload) {
- if (locateType.equals(RegionLocateType.BEFORE)) {
- return getRegionLocationInternal(tableName, row, locateType, reload);
- } else {
- // as we know the exact row after us, so we can just create the new row, and use the same
- // algorithm to locate it.
- if (locateType.equals(RegionLocateType.AFTER)) {
- row = createClosestRowAfter(row);
- }
- return getRegionLocationInternal(tableName, row, RegionLocateType.CURRENT, reload);
+ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
+ int replicaId, RegionLocateType locateType, boolean reload) {
+ // as we know the exact row after us, so we can just create the new row, and use the same
+ // algorithm to locate it.
+ if (locateType.equals(RegionLocateType.AFTER)) {
+ row = createClosestRowAfter(row);
+ locateType = RegionLocateType.CURRENT;
}
+ return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
}
- void updateCachedLocation(HRegionLocation loc, Throwable exception) {
- AsyncRegionLocator.updateCachedLocation(loc, exception, l -> {
- TableCache tableCache = cache.get(l.getRegion().getTable());
- if (tableCache == null) {
- return null;
+ private void removeLocationFromCache(HRegionLocation loc) {
+ TableCache tableCache = cache.get(loc.getRegion().getTable());
+ if (tableCache == null) {
+ return;
+ }
+ byte[] startKey = loc.getRegion().getStartKey();
+ for (;;) {
+ RegionLocations oldLocs = tableCache.cache.get(startKey);
+ HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
+ if (!canUpdateOnError(loc, oldLoc)) {
+ return;
}
- return tableCache.cache.get(l.getRegion().getStartKey());
- }, this::addToCache, this::removeFromCache);
+ RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
+ if (newLocs == null) {
+ if (tableCache.cache.remove(startKey, oldLocs)) {
+ return;
+ }
+ } else {
+ if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
+ return;
+ }
+ }
+ }
+ }
+
+ private void addLocationToCache(HRegionLocation loc) {
+ addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
+ }
+
+ private HRegionLocation getCachedLocation(HRegionLocation loc) {
+ TableCache tableCache = cache.get(loc.getRegion().getTable());
+ if (tableCache == null) {
+ return null;
+ }
+ RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
+ return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
+ }
+
+ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+ AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
+ this::addLocationToCache, this::removeLocationFromCache);
}
void clearCache(TableName tableName) {
@@ -526,11 +571,11 @@ class AsyncNonMetaRegionLocator {
// only used for testing whether we have cached the location for a region.
@VisibleForTesting
- HRegionLocation getRegionLocationInCache(TableName tableName, byte[] row) {
+ RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
TableCache tableCache = cache.get(tableName);
if (tableCache == null) {
return null;
}
- return locateRowInCache(tableCache, tableName, row);
+ return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 56228ab..d624974 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -18,26 +18,24 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
-import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
-import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
import java.util.function.Supplier;
-
+import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionException;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.exceptions.RegionMovedException;
-import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timeout;
/**
* The asynchronous region locator.
@@ -59,8 +57,8 @@ class AsyncRegionLocator {
this.retryTimer = retryTimer;
}
- private CompletableFuture<HRegionLocation> withTimeout(CompletableFuture<HRegionLocation> future,
- long timeoutNs, Supplier<String> timeoutMsg) {
+ private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs,
+ Supplier<String> timeoutMsg) {
if (future.isDone() || timeoutNs <= 0) {
return future;
}
@@ -78,74 +76,75 @@ class AsyncRegionLocator {
});
}
- CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+ private boolean isMeta(TableName tableName) {
+ return TableName.isMetaTableName(tableName);
+ }
+
+ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
RegionLocateType type, boolean reload, long timeoutNs) {
+ CompletableFuture<RegionLocations> future = isMeta(tableName)
+ ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload)
+ : nonMetaRegionLocator.getRegionLocations(tableName, row,
+ RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
+ return withTimeout(future, timeoutNs,
+ () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
+ "ms) waiting for region locations for " + tableName + ", row='" +
+ Bytes.toStringBinary(row) + "'");
+ }
+
+ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+ int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
// meta region can not be split right now so we always call the same method.
// Change it later if the meta table can have more than one regions.
- CompletableFuture<HRegionLocation> future =
- tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation(reload)
- : nonMetaRegionLocator.getRegionLocation(tableName, row, type, reload);
+ CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
+ CompletableFuture<RegionLocations> locsFuture =
+ isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload)
+ : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
+ addListener(locsFuture, (locs, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null) {
+ future
+ .completeExceptionally(new RegionException("No location for " + tableName + ", row='" +
+ Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId));
+ } else if (loc.getServerName() == null) {
+ future.completeExceptionally(new HBaseIOException("No server address listed for region '" +
+ loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) +
+ "', locateType=" + type + ", replicaId=" + replicaId));
+ } else {
+ future.complete(loc);
+ }
+ });
return withTimeout(future, timeoutNs,
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
- "ms) waiting for region location for " + tableName + ", row='" +
- Bytes.toStringBinary(row) + "'");
+ "ms) waiting for region location for " + tableName + ", row='" + Bytes.toStringBinary(row) +
+ "', replicaId=" + replicaId);
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
- RegionLocateType type, long timeoutNs) {
- return getRegionLocation(tableName, row, type, false, timeoutNs);
+ int replicaId, RegionLocateType type, long timeoutNs) {
+ return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs);
}
- static boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
- // Do not need to update if no such location, or the location is newer, or the location is not
- // same with us
- return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() &&
- oldLoc.getServerName().equals(loc.getServerName());
+ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+ RegionLocateType type, boolean reload, long timeoutNs) {
+ return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload,
+ timeoutNs);
}
- static void updateCachedLocation(HRegionLocation loc, Throwable exception,
- Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
- Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
- HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Try updating " + loc + ", the old value is " + oldLoc, exception);
- }
- if (!canUpdate(loc, oldLoc)) {
- return;
- }
- Throwable cause = findException(exception);
- if (LOG.isDebugEnabled()) {
- LOG.debug("The actual exception when updating " + loc, cause);
- }
- if (cause == null || !isMetaClearingException(cause)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Will not update " + loc + " because the exception is null or not the one we care about");
- }
- return;
- }
- if (cause instanceof RegionMovedException) {
- RegionMovedException rme = (RegionMovedException) cause;
- HRegionLocation newLoc =
- new HRegionLocation(loc.getRegionInfo(), rme.getServerName(), rme.getLocationSeqNum());
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Try updating " + loc + " with the new location " + newLoc + " constructed by " + rme);
- }
- addToCache.accept(newLoc);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Try removing " + loc + " from cache");
- }
- removeFromCache.accept(loc);
- }
+ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+ RegionLocateType type, long timeoutNs) {
+ return getRegionLocation(tableName, row, type, false, timeoutNs);
}
- void updateCachedLocation(HRegionLocation loc, Throwable exception) {
+ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
if (loc.getRegion().isMetaRegion()) {
- metaRegionLocator.updateCachedLocation(loc, exception);
+ metaRegionLocator.updateCachedLocationOnError(loc, exception);
} else {
- nonMetaRegionLocator.updateCachedLocation(loc, exception);
+ nonMetaRegionLocator.updateCachedLocationOnError(loc, exception);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
new file mode 100644
index 0000000..5c9c091
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
+import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
+
+import java.util.Arrays;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class for asynchronous region locator.
+ */
+@InterfaceAudience.Private
+final class AsyncRegionLocatorHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocatorHelper.class);
+
+ private AsyncRegionLocatorHelper() {
+ }
+
+ static boolean canUpdateOnError(HRegionLocation loc, HRegionLocation oldLoc) {
+ // Do not need to update if no such location, or the location is newer, or the location is not
+ // the same with us
+ return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() &&
+ oldLoc.getServerName().equals(loc.getServerName());
+ }
+
+ static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
+ Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
+ Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
+ HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
+ LOG.debug("Try updating {} , the old value is {}", loc, oldLoc, exception);
+ if (!canUpdateOnError(loc, oldLoc)) {
+ return;
+ }
+ Throwable cause = findException(exception);
+ LOG.debug("The actual exception when updating {}", loc, cause);
+ if (cause == null || !isMetaClearingException(cause)) {
+ LOG.debug("Will not update {} because the exception is null or not the one we care about",
+ loc);
+ return;
+ }
+ if (cause instanceof RegionMovedException) {
+ RegionMovedException rme = (RegionMovedException) cause;
+ HRegionLocation newLoc =
+ new HRegionLocation(loc.getRegion(), rme.getServerName(), rme.getLocationSeqNum());
+ LOG.debug("Try updating {} with the new location {} constructed by {}", loc, newLoc, rme);
+ addToCache.accept(newLoc);
+ } else {
+ LOG.debug("Try removing {} from cache", loc);
+ removeFromCache.accept(loc);
+ }
+ }
+
+ static RegionLocations createRegionLocations(HRegionLocation loc) {
+ int replicaId = loc.getRegion().getReplicaId();
+ HRegionLocation[] locs = new HRegionLocation[replicaId + 1];
+ locs[replicaId] = loc;
+ return new RegionLocations(locs);
+ }
+
+ /**
+ * Create a new {@link RegionLocations} based on the given {@code oldLocs}, and replace the
+ * location for the given {@code replicaId} with the given {@code loc}.
+ * <p/>
+ * All the {@link RegionLocations} in async locator related class are immutable because we want to
+ * access them concurrently, so here we need to create a new one, instead of calling
+ * {@link RegionLocations#updateLocation(HRegionLocation, boolean, boolean)}.
+ */
+ static RegionLocations replaceRegionLocation(RegionLocations oldLocs, HRegionLocation loc) {
+ int replicaId = loc.getRegion().getReplicaId();
+ HRegionLocation[] locs = oldLocs.getRegionLocations();
+ locs = Arrays.copyOf(locs, Math.max(replicaId + 1, locs.length));
+ locs[replicaId] = loc;
+ return new RegionLocations(locs);
+ }
+
+ /**
+ * Create a new {@link RegionLocations} based on the given {@code oldLocs}, and remove the
+ * location for the given {@code replicaId}.
+ * <p/>
+ * All the {@link RegionLocations} in async locator related class are immutable because we want to
+ * access them concurrently, so here we need to create a new one, instead of calling
+ * {@link RegionLocations#remove(int)}.
+ */
+ static RegionLocations removeRegionLocation(RegionLocations oldLocs, int replicaId) {
+ HRegionLocation[] locs = oldLocs.getRegionLocations();
+ if (locs.length < replicaId + 1) {
+ // Here we do not modify the oldLocs so it is safe to return it.
+ return oldLocs;
+ }
+ locs = Arrays.copyOf(locs, locs.length);
+ locs[replicaId] = null;
+ if (ObjectUtils.firstNonNull(locs) != null) {
+ return new RegionLocations(locs);
+ } else {
+ // if all the locations are null, just return null
+ return null;
+ }
+ }
+
+ /**
+ * Create a new {@link RegionLocations} which is the merging result for the given two
+ * {@link RegionLocations}.
+ * <p/>
+ * All the {@link RegionLocations} in async locator related class are immutable because we want to
+ * access them concurrently, so here we need to create a new one, instead of calling
+ * {@link RegionLocations#mergeLocations(RegionLocations)} directly.
+ */
+ static RegionLocations mergeRegionLocations(RegionLocations newLocs, RegionLocations oldLocs) {
+ RegionLocations locs = new RegionLocations(newLocs.getRegionLocations());
+ locs.mergeLocations(oldLocs);
+ return locs;
+ }
+
+ static boolean isGood(RegionLocations locs, int replicaId) {
+ if (locs == null) {
+ return false;
+ }
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ return loc != null && loc.getServerName() != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index d30012f..e03049a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -88,15 +88,15 @@ public abstract class AsyncRpcRetryingCaller<T> {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
}
- protected long remainingTimeNs() {
+ protected final long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
}
- protected void completeExceptionally() {
+ protected final void completeExceptionally() {
future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
}
- protected void resetCallTimeout() {
+ protected final void resetCallTimeout() {
long callTimeoutNs;
if (operationTimeoutNs > 0) {
callTimeoutNs = remainingTimeNs();
@@ -111,8 +111,15 @@ public abstract class AsyncRpcRetryingCaller<T> {
resetController(controller, callTimeoutNs);
}
- protected void onError(Throwable error, Supplier<String> errMsg,
+ protected final void onError(Throwable error, Supplier<String> errMsg,
Consumer<Throwable> updateCachedLocation) {
+ if (future.isDone()) {
+ // Give up if the future is already done, this is possible if user has already canceled the
+ // future. And for timeline consistent read, we will also cancel some requests if we have
+ // already get one of the responses.
+ LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled());
+ return;
+ }
error = translateException(error);
if (error instanceof DoNotRetryIOException) {
future.completeExceptionally(error);
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index f80b4e5..a660e74 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -17,22 +17,22 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
@@ -75,6 +75,8 @@ class AsyncRpcRetryingCallerFactory {
private RegionLocateType locateType = RegionLocateType.CURRENT;
+ private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
+
public SingleRequestCallerBuilder<T> table(TableName tableName) {
this.tableName = tableName;
return this;
@@ -121,11 +123,17 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public SingleRequestCallerBuilder<T> replicaId(int replicaId) {
+ this.replicaId = replicaId;
+ return this;
+ }
+
public AsyncSingleRequestRpcRetryingCaller<T> build() {
+ checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
- checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
- checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
- pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+ checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), replicaId,
+ checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
+ pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
/**
@@ -241,11 +249,11 @@ class AsyncRpcRetryingCallerFactory {
public AsyncScanSingleRegionRpcRetryingCaller build() {
checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
- checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
- checkNotNull(resultCache, "resultCache is null"),
- checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
- checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
- pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+ checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
+ checkNotNull(resultCache, "resultCache is null"),
+ checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
+ checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
+ pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
/**
@@ -311,7 +319,7 @@ class AsyncRpcRetryingCallerFactory {
public <T> AsyncBatchRpcRetryingCaller<T> build() {
return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
- maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+ maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
public <T> List<CompletableFuture<T>> call() {
@@ -363,8 +371,8 @@ class AsyncRpcRetryingCallerFactory {
public AsyncMasterRequestRpcRetryingCaller<T> build() {
return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn,
- checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
- rpcTimeoutNs, startLogErrorsCnt);
+ checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
+ rpcTimeoutNs, startLogErrorsCnt);
}
/**
@@ -390,7 +398,8 @@ class AsyncRpcRetryingCallerFactory {
private ServerName serverName;
- public AdminRequestCallerBuilder<T> action(AsyncAdminRequestRetryingCaller.Callable<T> callable) {
+ public AdminRequestCallerBuilder<T> action(
+ AsyncAdminRequestRetryingCaller.Callable<T> callable) {
this.callable = callable;
return this;
}
@@ -420,15 +429,15 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
- public AdminRequestCallerBuilder<T> serverName(ServerName serverName){
+ public AdminRequestCallerBuilder<T> serverName(ServerName serverName) {
this.serverName = serverName;
return this;
}
public AsyncAdminRequestRetryingCaller<T> build() {
return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
- operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName,
- "serverName is null"), checkNotNull(callable, "action is null"));
+ operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
+ checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
}
public CompletableFuture<T> call() {
@@ -436,7 +445,7 @@ class AsyncRpcRetryingCallerFactory {
}
}
- public <T> AdminRequestCallerBuilder<T> adminRequest(){
+ public <T> AdminRequestCallerBuilder<T> adminRequest() {
return new AdminRequestCallerBuilder<>();
}
@@ -488,8 +497,8 @@ class AsyncRpcRetryingCallerFactory {
public AsyncServerRequestRpcRetryingCaller<T> build() {
return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
- operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName,
- "serverName is null"), checkNotNull(callable, "action is null"));
+ operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
+ checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
}
public CompletableFuture<T> call() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 56c82fb..1a52e5c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -17,17 +17,19 @@
*/
package org.apache.hadoop.hbase.client;
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
/**
* Retry caller for a single request, such as get, put, delete, etc.
@@ -45,18 +47,21 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
private final byte[] row;
+ private final int replicaId;
+
private final RegionLocateType locateType;
private final Callable<T> callable;
public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
- TableName tableName, byte[] row, RegionLocateType locateType, Callable<T> callable,
- long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
- int startLogErrorsCnt) {
+ TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
+ Callable<T> callable, long pauseNs, int maxAttempts, long operationTimeoutNs,
+ long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
- startLogErrorsCnt);
+ startLogErrorsCnt);
this.tableName = tableName;
this.row = row;
+ this.replicaId = replicaId;
this.locateType = locateType;
this.callable = callable;
}
@@ -67,23 +72,22 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
stub = conn.getRegionServerStub(loc.getServerName());
} catch (IOException e) {
onError(e,
- () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row)
- + "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
- err -> conn.getLocator().updateCachedLocation(loc, err));
+ () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) +
+ "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
+ err -> conn.getLocator().updateCachedLocationOnError(loc, err));
return;
}
resetCallTimeout();
- callable.call(controller, loc, stub).whenComplete(
- (result, error) -> {
- if (error != null) {
- onError(error,
- () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in "
- + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
- err -> conn.getLocator().updateCachedLocation(loc, err));
- return;
- }
- future.complete(result);
- });
+ callable.call(controller, loc, stub).whenComplete((result, error) -> {
+ if (error != null) {
+ onError(error,
+ () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
+ loc.getRegion().getEncodedName() + " of " + tableName + " failed",
+ err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+ return;
+ }
+ future.complete(result);
+ });
}
@Override
@@ -98,18 +102,17 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
} else {
locateTimeoutNs = -1L;
}
- conn.getLocator()
- .getRegionLocation(tableName, row, locateType, locateTimeoutNs)
- .whenComplete(
- (loc, error) -> {
- if (error != null) {
- onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName
- + " failed", err -> {
- });
- return;
- }
- call(loc);
- });
+ addListener(
+ conn.getLocator().getRegionLocation(tableName, row, replicaId, locateType, locateTimeoutNs),
+ (loc, error) -> {
+ if (error != null) {
+ onError(error,
+ () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> {
+ });
+ return;
+ }
+ call(loc);
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
index dbfcef5..3bda38e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.client;
import java.util.concurrent.CompletableFuture;
-
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
@@ -55,5 +54,30 @@ public interface AsyncTableRegionLocator {
* @param row Row to find.
* @param reload true to reload information or false to use cached information
*/
- CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload);
+ default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
+ return getRegionLocation(row, RegionReplicaUtil.DEFAULT_REPLICA_ID, reload);
+ }
+
+ /**
+ * Finds the region with the given <code>replicaId</code> on which the given row is being served.
+ * <p>
+ * Returns the location of the region with the given <code>replicaId</code> to which the row
+ * belongs.
+ * @param row Row to find.
+ * @param replicaId the replica id of the region
+ */
+ default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId) {
+ return getRegionLocation(row, replicaId, false);
+ }
+
+ /**
+ * Finds the region with the given <code>replicaId</code> on which the given row is being served.
+ * <p>
+ * Returns the location of the region with the given <code>replicaId</code> to which the row
+ * belongs.
+ * @param row Row to find.
+ * @param replicaId the replica id of the region
+ * @param reload true to reload information or false to use cached information
+ */
+ CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, boolean reload);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
index 7d199df..465a411 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -44,7 +44,9 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
}
@Override
- public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
- return locator.getRegionLocation(tableName, row, RegionLocateType.CURRENT, reload, -1L);
+ public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId,
+ boolean reload) {
+ return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload,
+ -1L);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index d996004..55c62e7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -38,6 +38,9 @@ public class ConnectionConfiguration {
public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1 second
public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";
public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;
+ public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND =
+ "hbase.client.primaryCallTimeout.get";
+ public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; // 10ms
private final long writeBufferSize;
private final long writeBufferPeriodicFlushTimeoutMs;
@@ -86,7 +89,7 @@ public class ConnectionConfiguration {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
this.primaryCallTimeoutMicroSecond =
- conf.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10ms
+ conf.getInt(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT);
this.replicaCallTimeoutMicroSecondScan =
conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms