You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/04 06:40:32 UTC
[06/15] hbase git commit: HBASE-17356 Add replica get support
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index d705d7c..28db7e8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.RpcChannel;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -32,11 +32,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -45,9 +45,12 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@@ -63,7 +66,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
/**
* The implementation of RawAsyncTable.
- * <p>
+ * <p/>
* The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
* be finished inside the rpc framework thread, which means that the callbacks registered to the
* {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
@@ -74,6 +77,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
@InterfaceAudience.Private
class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
+ private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
+
private final AsyncConnectionImpl conn;
private final TableName tableName;
@@ -204,58 +209,126 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
return conn.callerFactory.<T> single().table(tableName).row(row)
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
- .startLogErrorsCnt(startLogErrorsCnt);
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+ .startLogErrorsCnt(startLogErrorsCnt);
}
private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
return newCaller(row.getRow(), rpcTimeoutNs);
}
+ private CompletableFuture<Result> get(Get get, int replicaId, long timeoutNs) {
+ return this.<Result> newCaller(get, timeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl
+ .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
+ RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
+ (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
+ .replicaId(replicaId).call();
+ }
+
+ // Connect the two futures, if the src future is done, then mark the dst future as done. And if
+ // the dst future is done, then cancel the src future. This is used for timeline consistent read.
+ private <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture) {
+ addListener(srcFuture, (r, e) -> {
+ if (e != null) {
+ dstFuture.completeExceptionally(e);
+ } else {
+ dstFuture.complete(r);
+ }
+ });
+ // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture.
+ // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst
+ // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in
+ // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the
+ // tie.
+ addListener(dstFuture, (r, e) -> srcFuture.cancel(false));
+ }
+
+ private void timelineConsistentGet(Get get, RegionLocations locs,
+ CompletableFuture<Result> future) {
+ if (future.isDone()) {
+ // do not send requests to secondary replicas if the future is done, i.e, the primary request
+ // has already been finished.
+ return;
+ }
+ for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
+ CompletableFuture<Result> secondaryFuture = get(get, replicaId, readRpcTimeoutNs);
+ connect(secondaryFuture, future);
+ }
+ }
+
@Override
public CompletableFuture<Result> get(Get get) {
- return this.<Result> newCaller(get, readRpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl
- .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
- RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
- (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
- .call();
+ CompletableFuture<Result> primaryFuture =
+ get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
+ if (get.getConsistency() == Consistency.STRONG) {
+ return primaryFuture;
+ }
+ // Timeline consistent read, where we will send requests to other region replicas
+ CompletableFuture<Result> future = new CompletableFuture<>();
+ connect(primaryFuture, future);
+ long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();
+ long startNs = System.nanoTime();
+ addListener(conn.getLocator().getRegionLocations(tableName, get.getRow(),
+ RegionLocateType.CURRENT, false, readRpcTimeoutNs), (locs, error) -> {
+ if (error != null) {
+ LOG.warn(
+ "Failed to locate all the replicas for table={}, row='{}'," +
+ " give up timeline consistent read",
+ tableName, Bytes.toStringBinary(get.getRow()), error);
+ return;
+ }
+ if (locs.size() <= 1) {
+ LOG.warn(
+ "There are no secondary replicas for region {}," + " give up timeline consistent read",
+ locs.getDefaultRegionLocation().getRegion());
+ return;
+ }
+ long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
+ if (delayNs <= 0) {
+ timelineConsistentGet(get, locs, future);
+ } else {
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(
+ timeout -> timelineConsistentGet(get, locs, future), delayNs, TimeUnit.NANOSECONDS);
+ }
+ });
+ return future;
}
@Override
public CompletableFuture<Void> put(Put put) {
return this.<Void> newCaller(put, writeRpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
- put, RequestConverter::buildMutateRequest))
- .call();
+ .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
+ put, RequestConverter::buildMutateRequest))
+ .call();
}
@Override
public CompletableFuture<Void> delete(Delete delete) {
return this.<Void> newCaller(delete, writeRpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
- stub, delete, RequestConverter::buildMutateRequest))
- .call();
+ .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
+ stub, delete, RequestConverter::buildMutateRequest))
+ .call();
}
@Override
public CompletableFuture<Result> append(Append append) {
checkHasFamilies(append);
return this.<Result> newCaller(append, rpcTimeoutNs)
- .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
- append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
- .call();
+ .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
+ append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
+ .call();
}
@Override
public CompletableFuture<Result> increment(Increment increment) {
checkHasFamilies(increment);
return this.<Result> newCaller(increment, rpcTimeoutNs)
- .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
- stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
- .call();
+ .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
+ stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
+ .call();
}
private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
@@ -313,36 +386,36 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public CompletableFuture<Boolean> thenPut(Put put) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller,
- loc, stub, put,
- (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
- new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
- (c, r) -> r.getProcessed()))
- .call();
+ .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
+ stub, put,
+ (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
+ new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
+ (c, r) -> r.getProcessed()))
+ .call();
}
@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
- loc, stub, delete,
- (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
- new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
- (c, r) -> r.getProcessed()))
- .call();
+ .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
+ loc, stub, delete,
+ (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
+ new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
+ (c, r) -> r.getProcessed()))
+ .call();
}
@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(mutation, rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
- stub, mutation,
- (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
- new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
- resp -> resp.getExists()))
- .call();
+ .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
+ stub, mutation,
+ (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
+ new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
+ resp -> resp.getExists()))
+ .call();
}
}
@@ -375,10 +448,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
if (ex != null) {
future.completeExceptionally(ex instanceof IOException ? ex
: new IOException(
- "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
+ "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
} else {
future.complete(respConverter
- .apply((Result) multiResp.getResults().get(regionName).result.get(0)));
+ .apply((Result) multiResp.getResults().get(regionName).result.get(0)));
}
} catch (IOException e) {
future.completeExceptionally(e);
@@ -399,7 +472,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true);
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
- }, resp -> null)).call();
+ }, resp -> null))
+ .call();
}
private Scan setDefaultScanConfig(Scan scan) {
@@ -416,7 +490,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs,
- maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
+ maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
}
private long resultSize2CacheSize(long maxResultSize) {
@@ -427,8 +501,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public ResultScanner getScanner(Scan scan) {
return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan),
- resultSize2CacheSize(
- scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
+ resultSize2CacheSize(
+ scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
}
@Override
@@ -477,14 +551,14 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
- .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
+ .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
}
private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
return conn.callerFactory.batch().table(tableName).actions(actions)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
- .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
}
@Override
@@ -515,7 +589,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
- region, row, rpcTimeoutNs, operationTimeoutNs);
+ region, row, rpcTimeoutNs, operationTimeoutNs);
S stub = stubMaker.apply(channel);
CompletableFuture<R> future = new CompletableFuture<>();
ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
@@ -553,10 +627,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
- ServiceCaller<S, R> callable, CoprocessorCallback<R> callback,
- List<HRegionLocation> locs, byte[] endKey, boolean endKeyInclusive,
- AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc,
- Throwable error) {
+ ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
+ byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
+ AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
if (error != null) {
callback.onError(error);
return;
@@ -566,11 +639,11 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
if (locateFinished(region, endKey, endKeyInclusive)) {
locateFinished.set(true);
} else {
- conn.getLocator()
- .getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
- operationTimeoutNs)
- .whenComplete((l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey,
- endKeyInclusive, locateFinished, unfinishedRequest, l, e));
+ addListener(
+ conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
+ operationTimeoutNs),
+ (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
+ locateFinished, unfinishedRequest, l, e));
}
coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> {
if (e != null) {
@@ -630,11 +703,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public void execute() {
- conn.getLocator().getRegionLocation(tableName, startKey,
- startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs)
- .whenComplete(
- (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(),
- endKey, endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
+ addListener(conn.getLocator().getRegionLocation(tableName, startKey,
+ startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs),
+ (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey,
+ endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
new file mode 100644
index 0000000..067e66b
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class for processing futures.
+ */
+@InterfaceAudience.Private
+public final class FutureUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class);
+
+ private FutureUtils() {
+ }
+
+ /**
+ * This is method is used when you just want to add a listener to the given future. We will call
+ * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the
+ * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may
+ * suppress exceptions thrown from the code that completes the future, and this method will catch
+ * all the exception thrown from the {@code action} to catch possible code bugs.
+ * <p/>
+ * And the error phone check will always report FutureReturnValueIgnored because every method in
+ * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always
+ * have one future that has not been checked. So we introduce this method and add a suppress
+ * warnings annotation here.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public static <T> void addListener(CompletableFuture<T> future,
+ BiConsumer<? super T, ? super Throwable> action) {
+ future.whenComplete((resp, error) -> {
+ try {
+ action.accept(resp, error);
+ } catch (Throwable t) {
+ LOG.error("Unexpected error caught when processing CompletableFuture", t);
+ }
+ });
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
new file mode 100644
index 0000000..c14f69f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.util.Bytes;
+
+final class RegionReplicaTestHelper {
+
+ private RegionReplicaTestHelper() {
+ }
+
+ // waits for all replicas to have region location
+ static void waitUntilAllMetaReplicasHavingRegionLocation(AsyncRegistry registry,
+ int regionReplication) throws IOException {
+ TestZKAsyncRegistry.TEST_UTIL.waitFor(
+ TestZKAsyncRegistry.TEST_UTIL.getConfiguration()
+ .getLong("hbase.client.sync.wait.timeout.msec", 60000),
+ 200, true, new ExplainingPredicate<IOException>() {
+ @Override
+ public String explainFailure() throws IOException {
+ return "Not all meta replicas get assigned";
+ }
+
+ @Override
+ public boolean evaluate() throws IOException {
+ try {
+ RegionLocations locs = registry.getMetaRegionLocation().get();
+ if (locs.size() < regionReplication) {
+ return false;
+ }
+ for (int i = 0; i < regionReplication; i++) {
+ if (locs.getRegionLocation(i) == null) {
+ return false;
+ }
+ }
+ return true;
+ } catch (Exception e) {
+ TestZKAsyncRegistry.LOG.warn("Failed to get meta region locations", e);
+ return false;
+ }
+ }
+ });
+ }
+
+ static Optional<ServerName> getRSCarryingReplica(HBaseTestingUtility util, TableName tableName,
+ int replicaId) {
+ return util.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
+ .filter(rs -> rs.getRegions(tableName).stream()
+ .anyMatch(r -> r.getRegionInfo().getReplicaId() == replicaId))
+ .findAny().map(rs -> rs.getServerName());
+ }
+
+ /**
+ * Return the new location.
+ */
+ static ServerName moveRegion(HBaseTestingUtility util, HRegionLocation currentLoc)
+ throws Exception {
+ ServerName serverName = currentLoc.getServerName();
+ RegionInfo regionInfo = currentLoc.getRegion();
+ TableName tableName = regionInfo.getTable();
+ int replicaId = regionInfo.getReplicaId();
+ ServerName newServerName = util.getHBaseCluster().getRegionServerThreads().stream()
+ .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
+ .get();
+ util.getAdmin().move(regionInfo.getEncodedNameAsBytes(),
+ Bytes.toBytes(newServerName.getServerName()));
+ util.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ Optional<ServerName> newServerName = getRSCarryingReplica(util, tableName, replicaId);
+ return newServerName.isPresent() && !newServerName.get().equals(serverName);
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return regionInfo.getRegionNameAsString() + " is still on " + serverName;
+ }
+ });
+ return newServerName;
+ }
+
+ interface Locator {
+ RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
+ throws Exception;
+
+ void updateCachedLocationOnError(HRegionLocation loc, Throwable error) throws Exception;
+ }
+
+ static void testLocator(HBaseTestingUtility util, TableName tableName, Locator locator)
+ throws Exception {
+ RegionLocations locs =
+ locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false);
+ assertEquals(3, locs.size());
+ for (int i = 0; i < 3; i++) {
+ HRegionLocation loc = locs.getRegionLocation(i);
+ assertNotNull(loc);
+ ServerName serverName = getRSCarryingReplica(util, tableName, i).get();
+ assertEquals(serverName, loc.getServerName());
+ }
+ ServerName newServerName = moveRegion(util, locs.getDefaultRegionLocation());
+ // The cached location should not be changed
+ assertEquals(locs.getDefaultRegionLocation().getServerName(),
+ locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false)
+ .getDefaultRegionLocation().getServerName());
+ // should get the new location when reload = true
+ assertEquals(newServerName,
+ locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, true)
+ .getDefaultRegionLocation().getServerName());
+ // the cached location should be replaced
+ assertEquals(newServerName,
+ locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false)
+ .getDefaultRegionLocation().getServerName());
+
+ ServerName newServerName1 = moveRegion(util, locs.getRegionLocation(1));
+ ServerName newServerName2 = moveRegion(util, locs.getRegionLocation(2));
+
+ // The cached location should not be change
+ assertEquals(locs.getRegionLocation(1).getServerName(),
+ locator.getRegionLocations(tableName, 1, false).getRegionLocation(1).getServerName());
+ // clear the cached location for replica 1
+ locator.updateCachedLocationOnError(locs.getRegionLocation(1), new NotServingRegionException());
+ // the cached location for replica 2 should not be changed
+ assertEquals(locs.getRegionLocation(2).getServerName(),
+ locator.getRegionLocations(tableName, 2, false).getRegionLocation(2).getServerName());
+ // should get the new location as we have cleared the old location
+ assertEquals(newServerName1,
+ locator.getRegionLocations(tableName, 1, false).getRegionLocation(1).getServerName());
+ // as we will get the new location for replica 2 at once, we should also get the new location
+ // for replica 2
+ assertEquals(newServerName2,
+ locator.getRegionLocations(tableName, 2, false).getRegionLocation(2).getServerName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
index 7c08d6d..df1fe08 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
@@ -17,20 +17,19 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.junit.Assert.assertEquals;
+import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
-import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -42,7 +41,7 @@ public class TestAsyncMetaRegionLocator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncMetaRegionLocator.class);
+ HBaseClassTestRule.forClass(TestAsyncMetaRegionLocator.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -53,10 +52,11 @@ public class TestAsyncMetaRegionLocator {
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().set(BaseLoadBalancer.TABLES_ON_MASTER, "none");
+ TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
TEST_UTIL.startMiniCluster(3);
- TEST_UTIL.waitUntilAllSystemRegionsAssigned();
- TEST_UTIL.getAdmin().setBalancerRunning(false, true);
REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3);
+ TEST_UTIL.getAdmin().balancerSwitch(false, true);
LOCATOR = new AsyncMetaRegionLocator(REGISTRY);
}
@@ -66,42 +66,21 @@ public class TestAsyncMetaRegionLocator {
TEST_UTIL.shutdownMiniCluster();
}
- private Optional<ServerName> getRSCarryingMeta() {
- return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
- .map(t -> t.getRegionServer())
- .filter(rs -> !rs.getRegions(TableName.META_TABLE_NAME).isEmpty()).findAny()
- .map(rs -> rs.getServerName());
- }
-
@Test
- public void testReload() throws Exception {
- ServerName serverName = getRSCarryingMeta().get();
- assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName());
-
- ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
- .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
- .findAny().get();
- TEST_UTIL.getAdmin().move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
- Bytes.toBytes(newServerName.getServerName()));
- TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+ public void test() throws Exception {
+ testLocator(TEST_UTIL, TableName.META_TABLE_NAME, new Locator() {
@Override
- public boolean evaluate() throws Exception {
- Optional<ServerName> newServerName = getRSCarryingMeta();
- return newServerName.isPresent() && !newServerName.get().equals(serverName);
+ public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
+ throws Exception {
+ LOCATOR.updateCachedLocationOnError(loc, error);
}
@Override
- public String explainFailure() throws Exception {
- return HRegionInfo.FIRST_META_REGIONINFO.getRegionNameAsString() + " is still on " +
- serverName;
+ public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
+ throws Exception {
+ return LOCATOR.getRegionLocations(replicaId, reload).get();
}
});
- // The cached location will not change
- assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName());
- // should get the new location when reload = true
- assertEquals(newServerName, LOCATOR.getRegionLocation(true).get().getServerName());
- // the cached location should be replaced
- assertEquals(newServerName, LOCATOR.getRegionLocation(false).get().getServerName());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index 38dc78d..eeaf99f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -38,10 +39,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -58,7 +61,7 @@ public class TestAsyncNonMetaRegionLocator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
+ HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -78,7 +81,7 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.getAdmin().balancerSwitch(false, true);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
- registry.getClusterId().get(), User.getCurrent());
+ registry.getClusterId().get(), User.getCurrent());
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
SPLIT_KEYS = new byte[8][];
for (int i = 111; i < 999; i += 111) {
@@ -109,11 +112,18 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.waitTableAvailable(TABLE_NAME);
}
+ private CompletableFuture<HRegionLocation> getDefaultRegionLocation(TableName tableName,
+ byte[] row, RegionLocateType locateType, boolean reload) {
+ return LOCATOR
+ .getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, locateType, reload)
+ .thenApply(RegionLocations::getDefaultRegionLocation);
+ }
+
@Test
public void testNoTable() throws InterruptedException {
for (RegionLocateType locateType : RegionLocateType.values()) {
try {
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
}
@@ -126,7 +136,7 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
for (RegionLocateType locateType : RegionLocateType.values()) {
try {
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
}
@@ -148,13 +158,13 @@ public class TestAsyncNonMetaRegionLocator {
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
ThreadLocalRandom.current().nextBytes(randKey);
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
- LOCATOR.getRegionLocation(TABLE_NAME, randKey, locateType, false).get());
+ getDefaultRegionLocation(TABLE_NAME, randKey, locateType, false).get());
}
}
@@ -179,12 +189,12 @@ public class TestAsyncNonMetaRegionLocator {
private ServerName[] getLocations(byte[][] startKeys) {
ServerName[] serverNames = new ServerName[startKeys.length];
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
- .forEach(rs -> {
- rs.getRegions(TABLE_NAME).forEach(r -> {
- serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
- Bytes::compareTo)] = rs.getServerName();
- });
+ .forEach(rs -> {
+ rs.getRegions(TABLE_NAME).forEach(r -> {
+ serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
+ Bytes::compareTo)] = rs.getServerName();
});
+ });
return serverNames;
}
@@ -196,8 +206,9 @@ public class TestAsyncNonMetaRegionLocator {
IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
try {
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
- serverNames[i], LOCATOR
- .getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false).get());
+ serverNames[i],
+ getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false)
+ .get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@@ -208,7 +219,7 @@ public class TestAsyncNonMetaRegionLocator {
try {
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
serverNames[i],
- LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get());
+ getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@@ -220,8 +231,7 @@ public class TestAsyncNonMetaRegionLocator {
n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
try {
assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
- LOCATOR.getRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false)
- .get());
+ getDefaultRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false).get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@@ -232,29 +242,29 @@ public class TestAsyncNonMetaRegionLocator {
public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
createSingleRegionTable();
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
- HRegionLocation loc = LOCATOR
- .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
+ HRegionLocation loc =
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
- .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
- .findAny().get();
+ .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
+ .get();
TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegion().getEncodedName()),
Bytes.toBytes(newServerName.getServerName()));
while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName()
- .equals(newServerName)) {
+ .equals(newServerName)) {
Thread.sleep(100);
}
// Should be same as it is in cache
- assertSame(loc, LOCATOR
- .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
- LOCATOR.updateCachedLocation(loc, null);
+ assertSame(loc,
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
+ LOCATOR.updateCachedLocationOnError(loc, null);
// null error will not trigger a cache cleanup
- assertSame(loc, LOCATOR
- .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
- LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
- assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, LOCATOR
- .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
+ assertSame(loc,
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
+ LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException());
+ assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
}
// usually locate after will return the same result, so we add a test to make it return different
@@ -266,21 +276,21 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.createTable(TABLE_NAME, FAMILY, new byte[][] { splitKey });
TEST_UTIL.waitTableAvailable(TABLE_NAME);
HRegionLocation currentLoc =
- LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get();
+ getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get();
ServerName currentServerName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
assertLocEquals(EMPTY_START_ROW, splitKey, currentServerName, currentLoc);
HRegionLocation afterLoc =
- LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get();
+ getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get();
ServerName afterServerName =
- TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
- .filter(rs -> rs.getRegions(TABLE_NAME).stream()
- .anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey())))
- .findAny().get().getServerName();
+ TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
+ .filter(rs -> rs.getRegions(TABLE_NAME).stream()
+ .anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey())))
+ .findAny().get().getServerName();
assertLocEquals(splitKey, EMPTY_END_ROW, afterServerName, afterLoc);
assertSame(afterLoc,
- LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get());
+ getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get());
}
// For HBASE-17402
@@ -292,9 +302,9 @@ public class TestAsyncNonMetaRegionLocator {
ServerName[] serverNames = getLocations(startKeys);
for (int i = 0; i < 100; i++) {
LOCATOR.clearCache(TABLE_NAME);
- List<CompletableFuture<HRegionLocation>> futures = IntStream.range(0, 1000)
- .mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
- .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
+ List<CompletableFuture<HRegionLocation>> futures =
+ IntStream.range(0, 1000).mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
+ .map(r -> getDefaultRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
.collect(toList());
for (int j = 0; j < 1000; j++) {
int index = Math.min(8, j / 111);
@@ -309,11 +319,11 @@ public class TestAsyncNonMetaRegionLocator {
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
- .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
- .findAny().get();
+ .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
+ .get();
Admin admin = TEST_UTIL.getAdmin();
RegionInfo region = admin.getRegions(TABLE_NAME).stream().findAny().get();
admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(newServerName.getServerName()));
@@ -334,15 +344,15 @@ public class TestAsyncNonMetaRegionLocator {
// The cached location will not change
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
// should get the new location when reload = true
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get());
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get());
// the cached location should be replaced
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
}
@@ -351,10 +361,32 @@ public class TestAsyncNonMetaRegionLocator {
public void testLocateBeforeLastRegion()
throws IOException, InterruptedException, ExecutionException {
createMultiRegionTable();
- LOCATOR.getRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
+ getDefaultRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
HRegionLocation loc =
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_END_ROW, RegionLocateType.BEFORE, false).get();
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_END_ROW, RegionLocateType.BEFORE, false).get();
// should locate to the last region
assertArrayEquals(loc.getRegion().getEndKey(), EMPTY_END_ROW);
}
+
+ @Test
+ public void testRegionReplicas() throws Exception {
+ TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3).build());
+ TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
+ testLocator(TEST_UTIL, TABLE_NAME, new Locator() {
+
+ @Override
+ public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
+ throws Exception {
+ LOCATOR.updateCachedLocationOnError(loc, error);
+ }
+
+ @Override
+ public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
+ throws Exception {
+ return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
+ RegionLocateType.CURRENT, reload).get();
+ }
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
index c6624e7..8cdb4a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
@@ -59,7 +60,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class);
+ HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -124,10 +125,10 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
TEST_UTIL.getAdmin().balancerSwitch(false, true);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
- registry.getClusterId().get(), User.getCurrent());
+ registry.getClusterId().get(), User.getCurrent());
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
- .toArray(byte[][]::new);
+ .toArray(byte[][]::new);
TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
}
@@ -138,11 +139,11 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
TEST_UTIL.shutdownMiniCluster();
}
- private void assertLocs(List<CompletableFuture<HRegionLocation>> futures)
+ private void assertLocs(List<CompletableFuture<RegionLocations>> futures)
throws InterruptedException, ExecutionException {
assertEquals(256, futures.size());
for (int i = 0; i < futures.size(); i++) {
- HRegionLocation loc = futures.get(i).get();
+ HRegionLocation loc = futures.get(i).get().getDefaultRegionLocation();
if (i == 0) {
assertTrue(isEmptyStartRow(loc.getRegion().getStartKey()));
} else {
@@ -158,10 +159,11 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
@Test
public void test() throws InterruptedException, ExecutionException {
- List<CompletableFuture<HRegionLocation>> futures =
- IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
- .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
- .collect(toList());
+ List<CompletableFuture<RegionLocations>> futures =
+ IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
+ .map(r -> LOCATOR.getRegionLocations(TABLE_NAME, r, RegionReplicaUtil.DEFAULT_REPLICA_ID,
+ RegionLocateType.CURRENT, false))
+ .collect(toList());
assertLocs(futures);
assertTrue("max allowed is " + MAX_ALLOWED + " but actual is " + MAX_CONCURRENCY.get(),
MAX_CONCURRENCY.get() <= MAX_ALLOWED);
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index a6c2efb..7d8956b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -49,7 +49,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class);
+ HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -73,7 +73,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
TEST_UTIL.waitTableAvailable(TABLE_NAME);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
- registry.getClusterId().get(), User.getCurrent());
+ registry.getClusterId().get(), User.getCurrent());
}
@AfterClass
@@ -89,8 +89,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegion().getRegionName());
TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), Bytes.toBytes(
TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName()));
- AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME)
- .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(30).build();
+ AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS)
+ .setMaxRetries(30).build();
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
// move back
@@ -110,8 +110,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
public void testMaxRetries() throws IOException, InterruptedException {
try {
CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
- .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
- .action((controller, loc, stub) -> failedFuture()).call().get();
+ .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
+ .action((controller, loc, stub) -> failedFuture()).call().get();
fail();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
@@ -123,8 +123,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
long startNs = System.nanoTime();
try {
CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS)
- .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
- .action((controller, loc, stub) -> failedFuture()).call().get();
+ .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
+ .action((controller, loc, stub) -> failedFuture()).call().get();
fail();
} catch (ExecutionException e) {
e.printStackTrace();
@@ -141,30 +141,30 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
AtomicInteger count = new AtomicInteger(0);
HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
AsyncRegionLocator mockedLocator =
- new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
- @Override
- CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
- RegionLocateType locateType, long timeoutNs) {
- if (tableName.equals(TABLE_NAME)) {
- CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
- if (count.getAndIncrement() == 0) {
- errorTriggered.set(true);
- future.completeExceptionally(new RuntimeException("Inject error!"));
- } else {
- future.complete(loc);
- }
- return future;
+ new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
+ @Override
+ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+ int replicaId, RegionLocateType locateType, long timeoutNs) {
+ if (tableName.equals(TABLE_NAME)) {
+ CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
+ if (count.getAndIncrement() == 0) {
+ errorTriggered.set(true);
+ future.completeExceptionally(new RuntimeException("Inject error!"));
} else {
- return super.getRegionLocation(tableName, row, locateType, timeoutNs);
+ future.complete(loc);
}
+ return future;
+ } else {
+ return super.getRegionLocation(tableName, row, replicaId, locateType, timeoutNs);
}
+ }
- @Override
- void updateCachedLocation(HRegionLocation loc, Throwable exception) {
- }
- };
+ @Override
+ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+ }
+ };
try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(),
- CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
+ CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
@Override
AsyncRegionLocator getLocator() {
@@ -172,7 +172,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
}
}) {
AsyncTable<?> table = mockedConn.getTableBuilder(TABLE_NAME)
- .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
+ .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
assertTrue(errorTriggered.get());
errorTriggered.set(false);
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
index 13d8000..6c6bb98 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
@@ -69,8 +69,8 @@ public class TestAsyncTableLocatePrefetch {
@Test
public void test() throws InterruptedException, ExecutionException {
- assertNotNull(LOCATOR
- .getRegionLocation(TABLE_NAME, Bytes.toBytes("zzz"), RegionLocateType.CURRENT, false).get());
+ assertNotNull(LOCATOR.getRegionLocations(TABLE_NAME, Bytes.toBytes("zzz"),
+ RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false).get());
// we finish the request before we adding the remaining results to cache so sleep a bit here
Thread.sleep(1000);
// confirm that the locations of all the regions have been cached.
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
new file mode 100644
index 0000000..0445a0e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableRegionReplicasGet {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasGet.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("async");
+
+ private static byte[] FAMILY = Bytes.toBytes("cf");
+
+ private static byte[] QUALIFIER = Bytes.toBytes("cq");
+
+ private static byte[] ROW = Bytes.toBytes("row");
+
+ private static byte[] VALUE = Bytes.toBytes("value");
+
+ private static AsyncConnection ASYNC_CONN;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Parameter
+ public Supplier<AsyncTable<?>> getTable;
+
+ private static AsyncTable<?> getRawTable() {
+ return ASYNC_CONN.getTable(TABLE_NAME);
+ }
+
+ private static AsyncTable<?> getTable() {
+ return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
+ }
+
+ @Parameters
+ public static List<Object[]> params() {
+ return Arrays.asList(new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getRawTable },
+ new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getTable });
+ }
+
+ private static volatile boolean FAIL_PRIMARY_GET = false;
+
+ private static AtomicInteger PRIMARY_GET_COUNT = new AtomicInteger(0);
+
+ private static AtomicInteger SECONDARY_GET_COUNT = new AtomicInteger(0);
+
+ public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
+ List<Cell> result) throws IOException {
+ RegionInfo region = c.getEnvironment().getRegionInfo();
+ if (!region.getTable().equals(TABLE_NAME)) {
+ return;
+ }
+ if (region.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) {
+ SECONDARY_GET_COUNT.incrementAndGet();
+ } else {
+ PRIMARY_GET_COUNT.incrementAndGet();
+ if (FAIL_PRIMARY_GET) {
+ throw new IOException("Inject error");
+ }
+ }
+ }
+ }
+
+ private static boolean allReplicasHaveRow() throws IOException {
+ for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+ for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
+ if (region.get(new Get(ROW), false).isEmpty()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // 10 mins
+ TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+ TimeUnit.MINUTES.toMillis(10));
+ // 1 second
+ TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND,
+ TimeUnit.SECONDS.toMicros(1));
+ // set a small pause so we will retry very quickly
+ TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
+ // infinite retry
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
+ TEST_UTIL.startMiniCluster(3);
+ TEST_UTIL.getAdmin()
+ .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3)
+ .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
+ TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
+ ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+ AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
+ table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
+ // this is the fastest way to let all replicas have the row
+ TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
+ TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
+ TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ IOUtils.closeQuietly(ASYNC_CONN);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testNoReplicaRead() throws Exception {
+ FAIL_PRIMARY_GET = false;
+ SECONDARY_GET_COUNT.set(0);
+ AsyncTable<?> table = getTable.get();
+ Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
+ for (int i = 0; i < 1000; i++) {
+ assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
+ }
+ // the primary region is fine and the primary timeout is 1 second which is long enough, so we
+ // should not send any requests to secondary replicas even if the consistency is timeline.
+ Thread.sleep(5000);
+ assertEquals(0, SECONDARY_GET_COUNT.get());
+ }
+
+ @Test
+ public void testReplicaRead() throws Exception {
+ // fail the primary get request
+ FAIL_PRIMARY_GET = true;
+ Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
+ // make sure that we could still get the value from secondary replicas
+ AsyncTable<?> table = getTable.get();
+ assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
+ // make sure that the primary request has been canceled
+ Thread.sleep(5000);
+ int count = PRIMARY_GET_COUNT.get();
+ Thread.sleep(10000);
+ assertEquals(count, PRIMARY_GET_COUNT.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
index db7546f..46890d0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertNotSame;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
@@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
@@ -52,43 +50,13 @@ public class TestZKAsyncRegistry {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestZKAsyncRegistry.class);
+ HBaseClassTestRule.forClass(TestZKAsyncRegistry.class);
- private static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class);
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class);
+ static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static ZKAsyncRegistry REGISTRY;
- // waits for all replicas to have region location
- static void waitUntilAllReplicasHavingRegionLocation(TableName tbl) throws IOException {
- TEST_UTIL.waitFor(
- TEST_UTIL.getConfiguration().getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
- new ExplainingPredicate<IOException>() {
- @Override
- public String explainFailure() throws IOException {
- return TEST_UTIL.explainTableAvailability(tbl);
- }
-
- @Override
- public boolean evaluate() throws IOException {
- AtomicBoolean ready = new AtomicBoolean(true);
- try {
- RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
- assertEquals(3, locs.getRegionLocations().length);
- IntStream.range(0, 3).forEach(i -> {
- HRegionLocation loc = locs.getRegionLocation(i);
- if (loc == null) {
- ready.set(false);
- }
- });
- } catch (Exception e) {
- ready.set(false);
- }
- return ready.get();
- }
- });
- }
-
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
@@ -107,14 +75,14 @@ public class TestZKAsyncRegistry {
LOG.info("STARTED TEST");
String clusterId = REGISTRY.getClusterId().get();
String expectedClusterId = TEST_UTIL.getHBaseCluster().getMaster().getClusterId();
- assertEquals("Expected " + expectedClusterId + ", found=" + clusterId,
- expectedClusterId, clusterId);
+ assertEquals("Expected " + expectedClusterId + ", found=" + clusterId, expectedClusterId,
+ clusterId);
assertEquals(TEST_UTIL.getHBaseCluster().getClusterMetrics().getLiveServerMetrics().size(),
REGISTRY.getCurrentNrHRS().get().intValue());
assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
REGISTRY.getMasterAddress().get());
assertEquals(-1, REGISTRY.getMasterInfoPort().get().intValue());
- waitUntilAllReplicasHavingRegionLocation(TableName.META_TABLE_NAME);
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3);
RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
assertEquals(3, locs.getRegionLocations().length);
IntStream.range(0, 3).forEach(i -> {