You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/03 02:55:27 UTC
[1/4] hbase git commit: HBASE-17356 Add replica get support
Repository: hbase
Updated Branches:
refs/heads/branch-2.0 40d2787e2 -> ff23c0221
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index d705d7c..28db7e8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.RpcChannel;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -32,11 +32,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -45,9 +45,12 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@@ -63,7 +66,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
/**
* The implementation of RawAsyncTable.
- * <p>
+ * <p/>
* The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
* be finished inside the rpc framework thread, which means that the callbacks registered to the
* {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
@@ -74,6 +77,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
@InterfaceAudience.Private
class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
+ private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
+
private final AsyncConnectionImpl conn;
private final TableName tableName;
@@ -204,58 +209,126 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
return conn.callerFactory.<T> single().table(tableName).row(row)
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
- .startLogErrorsCnt(startLogErrorsCnt);
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+ .startLogErrorsCnt(startLogErrorsCnt);
}
private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
return newCaller(row.getRow(), rpcTimeoutNs);
}
+ private CompletableFuture<Result> get(Get get, int replicaId, long timeoutNs) {
+ return this.<Result> newCaller(get, timeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl
+ .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
+ RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
+ (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
+ .replicaId(replicaId).call();
+ }
+
+ // Connect the two futures, if the src future is done, then mark the dst future as done. And if
+ // the dst future is done, then cancel the src future. This is used for timeline consistent read.
+ private <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture) {
+ addListener(srcFuture, (r, e) -> {
+ if (e != null) {
+ dstFuture.completeExceptionally(e);
+ } else {
+ dstFuture.complete(r);
+ }
+ });
+ // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture.
+ // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst
+ // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in
+ // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the
+ // tie.
+ addListener(dstFuture, (r, e) -> srcFuture.cancel(false));
+ }
+
+ private void timelineConsistentGet(Get get, RegionLocations locs,
+ CompletableFuture<Result> future) {
+ if (future.isDone()) {
+ // do not send requests to secondary replicas if the future is done, i.e, the primary request
+ // has already been finished.
+ return;
+ }
+ for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
+ CompletableFuture<Result> secondaryFuture = get(get, replicaId, readRpcTimeoutNs);
+ connect(secondaryFuture, future);
+ }
+ }
+
@Override
public CompletableFuture<Result> get(Get get) {
- return this.<Result> newCaller(get, readRpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl
- .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
- RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
- (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
- .call();
+ CompletableFuture<Result> primaryFuture =
+ get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
+ if (get.getConsistency() == Consistency.STRONG) {
+ return primaryFuture;
+ }
+ // Timeline consistent read, where we will send requests to other region replicas
+ CompletableFuture<Result> future = new CompletableFuture<>();
+ connect(primaryFuture, future);
+ long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();
+ long startNs = System.nanoTime();
+ addListener(conn.getLocator().getRegionLocations(tableName, get.getRow(),
+ RegionLocateType.CURRENT, false, readRpcTimeoutNs), (locs, error) -> {
+ if (error != null) {
+ LOG.warn(
+ "Failed to locate all the replicas for table={}, row='{}'," +
+ " give up timeline consistent read",
+ tableName, Bytes.toStringBinary(get.getRow()), error);
+ return;
+ }
+ if (locs.size() <= 1) {
+ LOG.warn(
+ "There are no secondary replicas for region {}," + " give up timeline consistent read",
+ locs.getDefaultRegionLocation().getRegion());
+ return;
+ }
+ long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
+ if (delayNs <= 0) {
+ timelineConsistentGet(get, locs, future);
+ } else {
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(
+ timeout -> timelineConsistentGet(get, locs, future), delayNs, TimeUnit.NANOSECONDS);
+ }
+ });
+ return future;
}
@Override
public CompletableFuture<Void> put(Put put) {
return this.<Void> newCaller(put, writeRpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
- put, RequestConverter::buildMutateRequest))
- .call();
+ .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
+ put, RequestConverter::buildMutateRequest))
+ .call();
}
@Override
public CompletableFuture<Void> delete(Delete delete) {
return this.<Void> newCaller(delete, writeRpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
- stub, delete, RequestConverter::buildMutateRequest))
- .call();
+ .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
+ stub, delete, RequestConverter::buildMutateRequest))
+ .call();
}
@Override
public CompletableFuture<Result> append(Append append) {
checkHasFamilies(append);
return this.<Result> newCaller(append, rpcTimeoutNs)
- .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
- append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
- .call();
+ .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
+ append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
+ .call();
}
@Override
public CompletableFuture<Result> increment(Increment increment) {
checkHasFamilies(increment);
return this.<Result> newCaller(increment, rpcTimeoutNs)
- .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
- stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
- .call();
+ .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
+ stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
+ .call();
}
private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
@@ -313,36 +386,36 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public CompletableFuture<Boolean> thenPut(Put put) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller,
- loc, stub, put,
- (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
- new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
- (c, r) -> r.getProcessed()))
- .call();
+ .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
+ stub, put,
+ (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
+ new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
+ (c, r) -> r.getProcessed()))
+ .call();
}
@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
- loc, stub, delete,
- (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
- new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
- (c, r) -> r.getProcessed()))
- .call();
+ .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
+ loc, stub, delete,
+ (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
+ new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
+ (c, r) -> r.getProcessed()))
+ .call();
}
@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(mutation, rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
- stub, mutation,
- (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
- new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
- resp -> resp.getExists()))
- .call();
+ .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
+ stub, mutation,
+ (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
+ new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
+ resp -> resp.getExists()))
+ .call();
}
}
@@ -375,10 +448,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
if (ex != null) {
future.completeExceptionally(ex instanceof IOException ? ex
: new IOException(
- "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
+ "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
} else {
future.complete(respConverter
- .apply((Result) multiResp.getResults().get(regionName).result.get(0)));
+ .apply((Result) multiResp.getResults().get(regionName).result.get(0)));
}
} catch (IOException e) {
future.completeExceptionally(e);
@@ -399,7 +472,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true);
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
- }, resp -> null)).call();
+ }, resp -> null))
+ .call();
}
private Scan setDefaultScanConfig(Scan scan) {
@@ -416,7 +490,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs,
- maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
+ maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
}
private long resultSize2CacheSize(long maxResultSize) {
@@ -427,8 +501,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public ResultScanner getScanner(Scan scan) {
return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan),
- resultSize2CacheSize(
- scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
+ resultSize2CacheSize(
+ scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
}
@Override
@@ -477,14 +551,14 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
- .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
+ .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
}
private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
return conn.callerFactory.batch().table(tableName).actions(actions)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
- .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
}
@Override
@@ -515,7 +589,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
- region, row, rpcTimeoutNs, operationTimeoutNs);
+ region, row, rpcTimeoutNs, operationTimeoutNs);
S stub = stubMaker.apply(channel);
CompletableFuture<R> future = new CompletableFuture<>();
ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
@@ -553,10 +627,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
- ServiceCaller<S, R> callable, CoprocessorCallback<R> callback,
- List<HRegionLocation> locs, byte[] endKey, boolean endKeyInclusive,
- AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc,
- Throwable error) {
+ ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
+ byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
+ AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
if (error != null) {
callback.onError(error);
return;
@@ -566,11 +639,11 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
if (locateFinished(region, endKey, endKeyInclusive)) {
locateFinished.set(true);
} else {
- conn.getLocator()
- .getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
- operationTimeoutNs)
- .whenComplete((l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey,
- endKeyInclusive, locateFinished, unfinishedRequest, l, e));
+ addListener(
+ conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
+ operationTimeoutNs),
+ (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
+ locateFinished, unfinishedRequest, l, e));
}
coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> {
if (e != null) {
@@ -630,11 +703,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public void execute() {
- conn.getLocator().getRegionLocation(tableName, startKey,
- startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs)
- .whenComplete(
- (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(),
- endKey, endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
+ addListener(conn.getLocator().getRegionLocation(tableName, startKey,
+ startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs),
+ (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey,
+ endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
new file mode 100644
index 0000000..067e66b
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class for processing futures.
+ */
+@InterfaceAudience.Private
+public final class FutureUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class);
+
+ private FutureUtils() {
+ }
+
+ /**
+ * This is method is used when you just want to add a listener to the given future. We will call
+ * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the
+ * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may
+ * suppress exceptions thrown from the code that completes the future, and this method will catch
+ * all the exception thrown from the {@code action} to catch possible code bugs.
+ * <p/>
+ * And the error phone check will always report FutureReturnValueIgnored because every method in
+ * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always
+ * have one future that has not been checked. So we introduce this method and add a suppress
+ * warnings annotation here.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public static <T> void addListener(CompletableFuture<T> future,
+ BiConsumer<? super T, ? super Throwable> action) {
+ future.whenComplete((resp, error) -> {
+ try {
+ action.accept(resp, error);
+ } catch (Throwable t) {
+ LOG.error("Unexpected error caught when processing CompletableFuture", t);
+ }
+ });
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
new file mode 100644
index 0000000..c14f69f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.util.Bytes;
+
+final class RegionReplicaTestHelper {
+
+ private RegionReplicaTestHelper() {
+ }
+
+ // waits for all replicas to have region location
+ static void waitUntilAllMetaReplicasHavingRegionLocation(AsyncRegistry registry,
+ int regionReplication) throws IOException {
+ TestZKAsyncRegistry.TEST_UTIL.waitFor(
+ TestZKAsyncRegistry.TEST_UTIL.getConfiguration()
+ .getLong("hbase.client.sync.wait.timeout.msec", 60000),
+ 200, true, new ExplainingPredicate<IOException>() {
+ @Override
+ public String explainFailure() throws IOException {
+ return "Not all meta replicas get assigned";
+ }
+
+ @Override
+ public boolean evaluate() throws IOException {
+ try {
+ RegionLocations locs = registry.getMetaRegionLocation().get();
+ if (locs.size() < regionReplication) {
+ return false;
+ }
+ for (int i = 0; i < regionReplication; i++) {
+ if (locs.getRegionLocation(i) == null) {
+ return false;
+ }
+ }
+ return true;
+ } catch (Exception e) {
+ TestZKAsyncRegistry.LOG.warn("Failed to get meta region locations", e);
+ return false;
+ }
+ }
+ });
+ }
+
+ static Optional<ServerName> getRSCarryingReplica(HBaseTestingUtility util, TableName tableName,
+ int replicaId) {
+ return util.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
+ .filter(rs -> rs.getRegions(tableName).stream()
+ .anyMatch(r -> r.getRegionInfo().getReplicaId() == replicaId))
+ .findAny().map(rs -> rs.getServerName());
+ }
+
+ /**
+ * Return the new location.
+ */
+ static ServerName moveRegion(HBaseTestingUtility util, HRegionLocation currentLoc)
+ throws Exception {
+ ServerName serverName = currentLoc.getServerName();
+ RegionInfo regionInfo = currentLoc.getRegion();
+ TableName tableName = regionInfo.getTable();
+ int replicaId = regionInfo.getReplicaId();
+ ServerName newServerName = util.getHBaseCluster().getRegionServerThreads().stream()
+ .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
+ .get();
+ util.getAdmin().move(regionInfo.getEncodedNameAsBytes(),
+ Bytes.toBytes(newServerName.getServerName()));
+ util.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ Optional<ServerName> newServerName = getRSCarryingReplica(util, tableName, replicaId);
+ return newServerName.isPresent() && !newServerName.get().equals(serverName);
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return regionInfo.getRegionNameAsString() + " is still on " + serverName;
+ }
+ });
+ return newServerName;
+ }
+
+ interface Locator {
+ RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
+ throws Exception;
+
+ void updateCachedLocationOnError(HRegionLocation loc, Throwable error) throws Exception;
+ }
+
+ static void testLocator(HBaseTestingUtility util, TableName tableName, Locator locator)
+ throws Exception {
+ RegionLocations locs =
+ locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false);
+ assertEquals(3, locs.size());
+ for (int i = 0; i < 3; i++) {
+ HRegionLocation loc = locs.getRegionLocation(i);
+ assertNotNull(loc);
+ ServerName serverName = getRSCarryingReplica(util, tableName, i).get();
+ assertEquals(serverName, loc.getServerName());
+ }
+ ServerName newServerName = moveRegion(util, locs.getDefaultRegionLocation());
+ // The cached location should not be changed
+ assertEquals(locs.getDefaultRegionLocation().getServerName(),
+ locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false)
+ .getDefaultRegionLocation().getServerName());
+ // should get the new location when reload = true
+ assertEquals(newServerName,
+ locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, true)
+ .getDefaultRegionLocation().getServerName());
+ // the cached location should be replaced
+ assertEquals(newServerName,
+ locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false)
+ .getDefaultRegionLocation().getServerName());
+
+ ServerName newServerName1 = moveRegion(util, locs.getRegionLocation(1));
+ ServerName newServerName2 = moveRegion(util, locs.getRegionLocation(2));
+
+ // The cached location should not be change
+ assertEquals(locs.getRegionLocation(1).getServerName(),
+ locator.getRegionLocations(tableName, 1, false).getRegionLocation(1).getServerName());
+ // clear the cached location for replica 1
+ locator.updateCachedLocationOnError(locs.getRegionLocation(1), new NotServingRegionException());
+ // the cached location for replica 2 should not be changed
+ assertEquals(locs.getRegionLocation(2).getServerName(),
+ locator.getRegionLocations(tableName, 2, false).getRegionLocation(2).getServerName());
+ // should get the new location as we have cleared the old location
+ assertEquals(newServerName1,
+ locator.getRegionLocations(tableName, 1, false).getRegionLocation(1).getServerName());
+ // as we will get the new location for replica 2 at once, we should also get the new location
+ // for replica 2
+ assertEquals(newServerName2,
+ locator.getRegionLocations(tableName, 2, false).getRegionLocation(2).getServerName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
index 7c08d6d..df1fe08 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
@@ -17,20 +17,19 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.junit.Assert.assertEquals;
+import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
-import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -42,7 +41,7 @@ public class TestAsyncMetaRegionLocator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncMetaRegionLocator.class);
+ HBaseClassTestRule.forClass(TestAsyncMetaRegionLocator.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -53,10 +52,11 @@ public class TestAsyncMetaRegionLocator {
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().set(BaseLoadBalancer.TABLES_ON_MASTER, "none");
+ TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
TEST_UTIL.startMiniCluster(3);
- TEST_UTIL.waitUntilAllSystemRegionsAssigned();
- TEST_UTIL.getAdmin().setBalancerRunning(false, true);
REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3);
+ TEST_UTIL.getAdmin().balancerSwitch(false, true);
LOCATOR = new AsyncMetaRegionLocator(REGISTRY);
}
@@ -66,42 +66,21 @@ public class TestAsyncMetaRegionLocator {
TEST_UTIL.shutdownMiniCluster();
}
- private Optional<ServerName> getRSCarryingMeta() {
- return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
- .map(t -> t.getRegionServer())
- .filter(rs -> !rs.getRegions(TableName.META_TABLE_NAME).isEmpty()).findAny()
- .map(rs -> rs.getServerName());
- }
-
@Test
- public void testReload() throws Exception {
- ServerName serverName = getRSCarryingMeta().get();
- assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName());
-
- ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
- .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
- .findAny().get();
- TEST_UTIL.getAdmin().move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
- Bytes.toBytes(newServerName.getServerName()));
- TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+ public void test() throws Exception {
+ testLocator(TEST_UTIL, TableName.META_TABLE_NAME, new Locator() {
@Override
- public boolean evaluate() throws Exception {
- Optional<ServerName> newServerName = getRSCarryingMeta();
- return newServerName.isPresent() && !newServerName.get().equals(serverName);
+ public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
+ throws Exception {
+ LOCATOR.updateCachedLocationOnError(loc, error);
}
@Override
- public String explainFailure() throws Exception {
- return HRegionInfo.FIRST_META_REGIONINFO.getRegionNameAsString() + " is still on " +
- serverName;
+ public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
+ throws Exception {
+ return LOCATOR.getRegionLocations(replicaId, reload).get();
}
});
- // The cached location will not change
- assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName());
- // should get the new location when reload = true
- assertEquals(newServerName, LOCATOR.getRegionLocation(true).get().getServerName());
- // the cached location should be replaced
- assertEquals(newServerName, LOCATOR.getRegionLocation(false).get().getServerName());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index 38dc78d..eeaf99f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -38,10 +39,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -58,7 +61,7 @@ public class TestAsyncNonMetaRegionLocator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
+ HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -78,7 +81,7 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.getAdmin().balancerSwitch(false, true);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
- registry.getClusterId().get(), User.getCurrent());
+ registry.getClusterId().get(), User.getCurrent());
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
SPLIT_KEYS = new byte[8][];
for (int i = 111; i < 999; i += 111) {
@@ -109,11 +112,18 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.waitTableAvailable(TABLE_NAME);
}
+ private CompletableFuture<HRegionLocation> getDefaultRegionLocation(TableName tableName,
+ byte[] row, RegionLocateType locateType, boolean reload) {
+ return LOCATOR
+ .getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, locateType, reload)
+ .thenApply(RegionLocations::getDefaultRegionLocation);
+ }
+
@Test
public void testNoTable() throws InterruptedException {
for (RegionLocateType locateType : RegionLocateType.values()) {
try {
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
}
@@ -126,7 +136,7 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
for (RegionLocateType locateType : RegionLocateType.values()) {
try {
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
}
@@ -148,13 +158,13 @@ public class TestAsyncNonMetaRegionLocator {
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
ThreadLocalRandom.current().nextBytes(randKey);
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
- LOCATOR.getRegionLocation(TABLE_NAME, randKey, locateType, false).get());
+ getDefaultRegionLocation(TABLE_NAME, randKey, locateType, false).get());
}
}
@@ -179,12 +189,12 @@ public class TestAsyncNonMetaRegionLocator {
private ServerName[] getLocations(byte[][] startKeys) {
ServerName[] serverNames = new ServerName[startKeys.length];
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
- .forEach(rs -> {
- rs.getRegions(TABLE_NAME).forEach(r -> {
- serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
- Bytes::compareTo)] = rs.getServerName();
- });
+ .forEach(rs -> {
+ rs.getRegions(TABLE_NAME).forEach(r -> {
+ serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
+ Bytes::compareTo)] = rs.getServerName();
});
+ });
return serverNames;
}
@@ -196,8 +206,9 @@ public class TestAsyncNonMetaRegionLocator {
IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
try {
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
- serverNames[i], LOCATOR
- .getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false).get());
+ serverNames[i],
+ getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false)
+ .get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@@ -208,7 +219,7 @@ public class TestAsyncNonMetaRegionLocator {
try {
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
serverNames[i],
- LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get());
+ getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@@ -220,8 +231,7 @@ public class TestAsyncNonMetaRegionLocator {
n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
try {
assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
- LOCATOR.getRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false)
- .get());
+ getDefaultRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false).get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@@ -232,29 +242,29 @@ public class TestAsyncNonMetaRegionLocator {
public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
createSingleRegionTable();
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
- HRegionLocation loc = LOCATOR
- .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
+ HRegionLocation loc =
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
- .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
- .findAny().get();
+ .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
+ .get();
TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegion().getEncodedName()),
Bytes.toBytes(newServerName.getServerName()));
while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName()
- .equals(newServerName)) {
+ .equals(newServerName)) {
Thread.sleep(100);
}
// Should be same as it is in cache
- assertSame(loc, LOCATOR
- .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
- LOCATOR.updateCachedLocation(loc, null);
+ assertSame(loc,
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
+ LOCATOR.updateCachedLocationOnError(loc, null);
// null error will not trigger a cache cleanup
- assertSame(loc, LOCATOR
- .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
- LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
- assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, LOCATOR
- .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
+ assertSame(loc,
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
+ LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException());
+ assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
}
// usually locate after will return the same result, so we add a test to make it return different
@@ -266,21 +276,21 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.createTable(TABLE_NAME, FAMILY, new byte[][] { splitKey });
TEST_UTIL.waitTableAvailable(TABLE_NAME);
HRegionLocation currentLoc =
- LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get();
+ getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get();
ServerName currentServerName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
assertLocEquals(EMPTY_START_ROW, splitKey, currentServerName, currentLoc);
HRegionLocation afterLoc =
- LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get();
+ getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get();
ServerName afterServerName =
- TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
- .filter(rs -> rs.getRegions(TABLE_NAME).stream()
- .anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey())))
- .findAny().get().getServerName();
+ TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
+ .filter(rs -> rs.getRegions(TABLE_NAME).stream()
+ .anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey())))
+ .findAny().get().getServerName();
assertLocEquals(splitKey, EMPTY_END_ROW, afterServerName, afterLoc);
assertSame(afterLoc,
- LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get());
+ getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get());
}
// For HBASE-17402
@@ -292,9 +302,9 @@ public class TestAsyncNonMetaRegionLocator {
ServerName[] serverNames = getLocations(startKeys);
for (int i = 0; i < 100; i++) {
LOCATOR.clearCache(TABLE_NAME);
- List<CompletableFuture<HRegionLocation>> futures = IntStream.range(0, 1000)
- .mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
- .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
+ List<CompletableFuture<HRegionLocation>> futures =
+ IntStream.range(0, 1000).mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
+ .map(r -> getDefaultRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
.collect(toList());
for (int j = 0; j < 1000; j++) {
int index = Math.min(8, j / 111);
@@ -309,11 +319,11 @@ public class TestAsyncNonMetaRegionLocator {
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
- .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
- .findAny().get();
+ .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
+ .get();
Admin admin = TEST_UTIL.getAdmin();
RegionInfo region = admin.getRegions(TABLE_NAME).stream().findAny().get();
admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(newServerName.getServerName()));
@@ -334,15 +344,15 @@ public class TestAsyncNonMetaRegionLocator {
// The cached location will not change
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
// should get the new location when reload = true
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get());
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get());
// the cached location should be replaced
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
}
@@ -351,10 +361,32 @@ public class TestAsyncNonMetaRegionLocator {
public void testLocateBeforeLastRegion()
throws IOException, InterruptedException, ExecutionException {
createMultiRegionTable();
- LOCATOR.getRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
+ getDefaultRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
HRegionLocation loc =
- LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_END_ROW, RegionLocateType.BEFORE, false).get();
+ getDefaultRegionLocation(TABLE_NAME, EMPTY_END_ROW, RegionLocateType.BEFORE, false).get();
// should locate to the last region
assertArrayEquals(loc.getRegion().getEndKey(), EMPTY_END_ROW);
}
+
+ @Test
+ public void testRegionReplicas() throws Exception {
+ TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3).build());
+ TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
+ testLocator(TEST_UTIL, TABLE_NAME, new Locator() {
+
+ @Override
+ public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
+ throws Exception {
+ LOCATOR.updateCachedLocationOnError(loc, error);
+ }
+
+ @Override
+ public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
+ throws Exception {
+ return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
+ RegionLocateType.CURRENT, reload).get();
+ }
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
index c6624e7..8cdb4a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
@@ -59,7 +60,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class);
+ HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -124,10 +125,10 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
TEST_UTIL.getAdmin().balancerSwitch(false, true);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
- registry.getClusterId().get(), User.getCurrent());
+ registry.getClusterId().get(), User.getCurrent());
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
- .toArray(byte[][]::new);
+ .toArray(byte[][]::new);
TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
}
@@ -138,11 +139,11 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
TEST_UTIL.shutdownMiniCluster();
}
- private void assertLocs(List<CompletableFuture<HRegionLocation>> futures)
+ private void assertLocs(List<CompletableFuture<RegionLocations>> futures)
throws InterruptedException, ExecutionException {
assertEquals(256, futures.size());
for (int i = 0; i < futures.size(); i++) {
- HRegionLocation loc = futures.get(i).get();
+ HRegionLocation loc = futures.get(i).get().getDefaultRegionLocation();
if (i == 0) {
assertTrue(isEmptyStartRow(loc.getRegion().getStartKey()));
} else {
@@ -158,10 +159,11 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
@Test
public void test() throws InterruptedException, ExecutionException {
- List<CompletableFuture<HRegionLocation>> futures =
- IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
- .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
- .collect(toList());
+ List<CompletableFuture<RegionLocations>> futures =
+ IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
+ .map(r -> LOCATOR.getRegionLocations(TABLE_NAME, r, RegionReplicaUtil.DEFAULT_REPLICA_ID,
+ RegionLocateType.CURRENT, false))
+ .collect(toList());
assertLocs(futures);
assertTrue("max allowed is " + MAX_ALLOWED + " but actual is " + MAX_CONCURRENCY.get(),
MAX_CONCURRENCY.get() <= MAX_ALLOWED);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index a6c2efb..7d8956b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -49,7 +49,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class);
+ HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -73,7 +73,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
TEST_UTIL.waitTableAvailable(TABLE_NAME);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
- registry.getClusterId().get(), User.getCurrent());
+ registry.getClusterId().get(), User.getCurrent());
}
@AfterClass
@@ -89,8 +89,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegion().getRegionName());
TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), Bytes.toBytes(
TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName()));
- AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME)
- .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(30).build();
+ AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS)
+ .setMaxRetries(30).build();
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
// move back
@@ -110,8 +110,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
public void testMaxRetries() throws IOException, InterruptedException {
try {
CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
- .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
- .action((controller, loc, stub) -> failedFuture()).call().get();
+ .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
+ .action((controller, loc, stub) -> failedFuture()).call().get();
fail();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
@@ -123,8 +123,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
long startNs = System.nanoTime();
try {
CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS)
- .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
- .action((controller, loc, stub) -> failedFuture()).call().get();
+ .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
+ .action((controller, loc, stub) -> failedFuture()).call().get();
fail();
} catch (ExecutionException e) {
e.printStackTrace();
@@ -141,30 +141,30 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
AtomicInteger count = new AtomicInteger(0);
HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
AsyncRegionLocator mockedLocator =
- new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
- @Override
- CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
- RegionLocateType locateType, long timeoutNs) {
- if (tableName.equals(TABLE_NAME)) {
- CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
- if (count.getAndIncrement() == 0) {
- errorTriggered.set(true);
- future.completeExceptionally(new RuntimeException("Inject error!"));
- } else {
- future.complete(loc);
- }
- return future;
+ new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
+ @Override
+ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+ int replicaId, RegionLocateType locateType, long timeoutNs) {
+ if (tableName.equals(TABLE_NAME)) {
+ CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
+ if (count.getAndIncrement() == 0) {
+ errorTriggered.set(true);
+ future.completeExceptionally(new RuntimeException("Inject error!"));
} else {
- return super.getRegionLocation(tableName, row, locateType, timeoutNs);
+ future.complete(loc);
}
+ return future;
+ } else {
+ return super.getRegionLocation(tableName, row, replicaId, locateType, timeoutNs);
}
+ }
- @Override
- void updateCachedLocation(HRegionLocation loc, Throwable exception) {
- }
- };
+ @Override
+ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+ }
+ };
try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(),
- CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
+ CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
@Override
AsyncRegionLocator getLocator() {
@@ -172,7 +172,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
}
}) {
AsyncTable<?> table = mockedConn.getTableBuilder(TABLE_NAME)
- .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
+ .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
assertTrue(errorTriggered.get());
errorTriggered.set(false);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
index 13d8000..6c6bb98 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
@@ -69,8 +69,8 @@ public class TestAsyncTableLocatePrefetch {
@Test
public void test() throws InterruptedException, ExecutionException {
- assertNotNull(LOCATOR
- .getRegionLocation(TABLE_NAME, Bytes.toBytes("zzz"), RegionLocateType.CURRENT, false).get());
+ assertNotNull(LOCATOR.getRegionLocations(TABLE_NAME, Bytes.toBytes("zzz"),
+ RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false).get());
// we finish the request before we adding the remaining results to cache so sleep a bit here
Thread.sleep(1000);
// confirm that the locations of all the regions have been cached.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
new file mode 100644
index 0000000..0445a0e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableRegionReplicasGet {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasGet.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("async");
+
+ private static byte[] FAMILY = Bytes.toBytes("cf");
+
+ private static byte[] QUALIFIER = Bytes.toBytes("cq");
+
+ private static byte[] ROW = Bytes.toBytes("row");
+
+ private static byte[] VALUE = Bytes.toBytes("value");
+
+ private static AsyncConnection ASYNC_CONN;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Parameter
+ public Supplier<AsyncTable<?>> getTable;
+
+ private static AsyncTable<?> getRawTable() {
+ return ASYNC_CONN.getTable(TABLE_NAME);
+ }
+
+ private static AsyncTable<?> getTable() {
+ return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
+ }
+
+ @Parameters
+ public static List<Object[]> params() {
+ return Arrays.asList(new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getRawTable },
+ new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getTable });
+ }
+
+ private static volatile boolean FAIL_PRIMARY_GET = false;
+
+ private static AtomicInteger PRIMARY_GET_COUNT = new AtomicInteger(0);
+
+ private static AtomicInteger SECONDARY_GET_COUNT = new AtomicInteger(0);
+
+ public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
+ List<Cell> result) throws IOException {
+ RegionInfo region = c.getEnvironment().getRegionInfo();
+ if (!region.getTable().equals(TABLE_NAME)) {
+ return;
+ }
+ if (region.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) {
+ SECONDARY_GET_COUNT.incrementAndGet();
+ } else {
+ PRIMARY_GET_COUNT.incrementAndGet();
+ if (FAIL_PRIMARY_GET) {
+ throw new IOException("Inject error");
+ }
+ }
+ }
+ }
+
+ private static boolean allReplicasHaveRow() throws IOException {
+ for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+ for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
+ if (region.get(new Get(ROW), false).isEmpty()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // 10 mins
+ TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+ TimeUnit.MINUTES.toMillis(10));
+ // 1 second
+ TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND,
+ TimeUnit.SECONDS.toMicros(1));
+ // set a small pause so we will retry very quickly
+ TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
+ // infinite retry
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
+ TEST_UTIL.startMiniCluster(3);
+ TEST_UTIL.getAdmin()
+ .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3)
+ .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
+ TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
+ ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+ AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
+ table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
+ // this is the fastest way to let all replicas have the row
+ TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
+ TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
+ TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ IOUtils.closeQuietly(ASYNC_CONN);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testNoReplicaRead() throws Exception {
+ FAIL_PRIMARY_GET = false;
+ SECONDARY_GET_COUNT.set(0);
+ AsyncTable<?> table = getTable.get();
+ Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
+ for (int i = 0; i < 1000; i++) {
+ assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
+ }
+ // the primary region is fine and the primary timeout is 1 second which is long enough, so we
+ // should not send any requests to secondary replicas even if the consistency is timeline.
+ Thread.sleep(5000);
+ assertEquals(0, SECONDARY_GET_COUNT.get());
+ }
+
+ @Test
+ public void testReplicaRead() throws Exception {
+ // fail the primary get request
+ FAIL_PRIMARY_GET = true;
+ Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
+ // make sure that we could still get the value from secondary replicas
+ AsyncTable<?> table = getTable.get();
+ assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
+ // make sure that the primary request has been canceled
+ Thread.sleep(5000);
+ int count = PRIMARY_GET_COUNT.get();
+ Thread.sleep(10000);
+ assertEquals(count, PRIMARY_GET_COUNT.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
index db7546f..46890d0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertNotSame;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
@@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
@@ -52,43 +50,13 @@ public class TestZKAsyncRegistry {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestZKAsyncRegistry.class);
+ HBaseClassTestRule.forClass(TestZKAsyncRegistry.class);
- private static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class);
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class);
+ static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static ZKAsyncRegistry REGISTRY;
- // waits for all replicas to have region location
- static void waitUntilAllReplicasHavingRegionLocation(TableName tbl) throws IOException {
- TEST_UTIL.waitFor(
- TEST_UTIL.getConfiguration().getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
- new ExplainingPredicate<IOException>() {
- @Override
- public String explainFailure() throws IOException {
- return TEST_UTIL.explainTableAvailability(tbl);
- }
-
- @Override
- public boolean evaluate() throws IOException {
- AtomicBoolean ready = new AtomicBoolean(true);
- try {
- RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
- assertEquals(3, locs.getRegionLocations().length);
- IntStream.range(0, 3).forEach(i -> {
- HRegionLocation loc = locs.getRegionLocation(i);
- if (loc == null) {
- ready.set(false);
- }
- });
- } catch (Exception e) {
- ready.set(false);
- }
- return ready.get();
- }
- });
- }
-
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
@@ -107,14 +75,14 @@ public class TestZKAsyncRegistry {
LOG.info("STARTED TEST");
String clusterId = REGISTRY.getClusterId().get();
String expectedClusterId = TEST_UTIL.getHBaseCluster().getMaster().getClusterId();
- assertEquals("Expected " + expectedClusterId + ", found=" + clusterId,
- expectedClusterId, clusterId);
+ assertEquals("Expected " + expectedClusterId + ", found=" + clusterId, expectedClusterId,
+ clusterId);
assertEquals(TEST_UTIL.getHBaseCluster().getClusterMetrics().getLiveServerMetrics().size(),
REGISTRY.getCurrentNrHRS().get().intValue());
assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
REGISTRY.getMasterAddress().get());
assertEquals(-1, REGISTRY.getMasterInfoPort().get().intValue());
- waitUntilAllReplicasHavingRegionLocation(TableName.META_TABLE_NAME);
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3);
RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
assertEquals(3, locs.getRegionLocations().length);
IntStream.range(0, 3).forEach(i -> {
[2/4] hbase git commit: HBASE-17356 Add replica get support
Posted by zh...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index a826f8c..579d547 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel;
@@ -484,23 +485,23 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
- this.<List<TableSchema>> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
- controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), (s,
- c, req, done) -> s.getTableDescriptors(c, req, done), (resp) -> resp
- .getTableSchemaList())).call().whenComplete((tableSchemas, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (!tableSchemas.isEmpty()) {
- future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
- } else {
- future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
- }
- });
+ addListener(this.<List<TableSchema>> newMasterCaller()
+ .action((controller, stub) -> this
+ .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
+ controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
+ (s, c, req, done) -> s.getTableDescriptors(c, req, done),
+ (resp) -> resp.getTableSchemaList()))
+ .call(), (tableSchemas, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (!tableSchemas.isEmpty()) {
+ future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
+ } else {
+ future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
+ }
+ });
return future;
}
@@ -583,7 +584,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
- AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
+ addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@@ -600,7 +601,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
- AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
+ addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@@ -629,40 +630,37 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private CompletableFuture<Boolean> isTableAvailable(TableName tableName,
Optional<byte[][]> splitKeys) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
- isTableEnabled(tableName).whenComplete(
- (enabled, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (!enabled) {
- future.complete(false);
- } else {
- AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
- .whenComplete(
- (locations, error1) -> {
- if (error1 != null) {
- future.completeExceptionally(error1);
- return;
- }
- List<HRegionLocation> notDeployedRegions =
- locations.stream().filter(loc -> loc.getServerName() == null)
- .collect(Collectors.toList());
- if (notDeployedRegions.size() > 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Table " + tableName + " has " + notDeployedRegions.size()
- + " regions");
- }
- future.complete(false);
- return;
- }
+ addListener(isTableEnabled(tableName), (enabled, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (!enabled) {
+ future.complete(false);
+ } else {
+ addListener(
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)),
+ (locations, error1) -> {
+ if (error1 != null) {
+ future.completeExceptionally(error1);
+ return;
+ }
+ List<HRegionLocation> notDeployedRegions = locations.stream()
+ .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
+ if (notDeployedRegions.size() > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
+ }
+ future.complete(false);
+ return;
+ }
- Optional<Boolean> available =
- splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
- future.complete(available.orElse(true));
- });
- }
- });
+ Optional<Boolean> available =
+ splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
+ future.complete(available.orElse(true));
+ });
+ }
+ });
return future;
}
@@ -784,20 +782,20 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> flush(TableName tableName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- tableExists(tableName).whenComplete((exists, err) -> {
+ addListener(tableExists(tableName), (exists, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else if (!exists) {
future.completeExceptionally(new TableNotFoundException(tableName));
} else {
- isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> {
+ addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else if (!tableEnabled) {
future.completeExceptionally(new TableNotEnabledException(tableName));
} else {
- execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
- new HashMap<>()).whenComplete((ret, err3) -> {
+ addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
+ new HashMap<>()), (ret, err3) -> {
if (err3 != null) {
future.completeExceptionally(err3);
} else {
@@ -814,27 +812,25 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> flushRegion(byte[] regionName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
+ addListener(getRegionLocation(regionName), (location, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future
+ .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+ return;
+ }
+ addListener(flush(serverName, location.getRegion()), (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
}
- flush(serverName, location.getRegion())
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
});
+ });
return future;
}
@@ -852,7 +848,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> flushRegionServer(ServerName sn) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegions(sn).whenComplete((hRegionInfos, err) -> {
+ addListener(getRegions(sn), (hRegionInfos, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
@@ -861,9 +857,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
if (hRegionInfos != null) {
hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
}
- CompletableFuture
- .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
- .whenComplete((ret, err2) -> {
+ addListener(CompletableFuture.allOf(
+ compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
@@ -936,7 +931,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegions(sn).whenComplete((hRegionInfos, err) -> {
+ addListener(getRegions(sn), (hRegionInfos, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
@@ -945,15 +940,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
if (hRegionInfos != null) {
hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
}
- CompletableFuture
- .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
+ addListener(CompletableFuture.allOf(
+ compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
});
return future;
}
@@ -961,28 +955,26 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
boolean major) {
CompletableFuture<Void> future = new CompletableFuture<>();
-
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
- }
- compact(location.getServerName(), location.getRegion(), major, columnFamily)
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
+ addListener(getRegionLocation(regionName), (location, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future
+ .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+ return;
+ }
+ addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
+ (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
return future;
}
@@ -994,19 +986,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
// For meta table, we use zk to fetch all locations.
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
- registry.getMetaRegionLocation().whenComplete(
- (metaRegions, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else if (metaRegions == null || metaRegions.isEmpty()
- || metaRegions.getDefaultRegionLocation() == null) {
- future.completeExceptionally(new IOException("meta region does not found"));
- } else {
- future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
- }
- // close the registry.
- IOUtils.closeQuietly(registry);
- });
+ addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ } else if (metaRegions == null || metaRegions.isEmpty() ||
+ metaRegions.getDefaultRegionLocation() == null) {
+ future.completeExceptionally(new IOException("meta region does not found"));
+ } else {
+ future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
+ }
+ // close the registry.
+ IOUtils.closeQuietly(registry);
+ });
return future;
} else {
// For non-meta table, we fetch all locations by scanning hbase:meta table
@@ -1017,40 +1008,40 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
/**
* Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
*/
- private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
- boolean major, CompactType compactType) {
+ private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
+ CompactType compactType) {
CompletableFuture<Void> future = new CompletableFuture<>();
switch (compactType) {
case MOB:
- connection.registry.getMasterAddress().whenComplete((serverName, err) -> {
+ addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
- compact(serverName, regionInfo, major, columnFamily)
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
+ addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
});
break;
case NORMAL:
- getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
+ addListener(getTableHRegionLocations(tableName), (locations, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
- CompletableFuture<?>[] compactFutures = locations.stream().filter(l -> l.getRegion() != null)
+ CompletableFuture<?>[] compactFutures =
+ locations.stream().filter(l -> l.getRegion() != null)
.filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
.map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
.toArray(CompletableFuture<?>[]::new);
// future complete unless all of the compact futures are completed.
- CompletableFuture.allOf(compactFutures).whenComplete((ret, err2) -> {
+ addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
@@ -1091,29 +1082,28 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
CompletableFuture<TableName> result) {
- getRegionLocation(encodeRegionName).whenComplete(
- (location, err) -> {
- if (err != null) {
- result.completeExceptionally(err);
- return;
- }
- RegionInfo regionInfo = location.getRegion();
- if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
- result.completeExceptionally(new IllegalArgumentException(
- "Can't invoke merge on non-default regions directly"));
- return;
- }
- if (!tableName.compareAndSet(null, regionInfo.getTable())) {
- if (!tableName.get().equals(regionInfo.getTable())) {
- // tables of this two region should be same.
- result.completeExceptionally(new IllegalArgumentException(
- "Cannot merge regions from two different tables " + tableName.get() + " and "
- + regionInfo.getTable()));
- } else {
- result.complete(tableName.get());
- }
+ addListener(getRegionLocation(encodeRegionName), (location, err) -> {
+ if (err != null) {
+ result.completeExceptionally(err);
+ return;
+ }
+ RegionInfo regionInfo = location.getRegion();
+ if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+ result.completeExceptionally(
+ new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
+ return;
+ }
+ if (!tableName.compareAndSet(null, regionInfo.getTable())) {
+ if (!tableName.get().equals(regionInfo.getTable())) {
+ // tables of this two region should be same.
+ result.completeExceptionally(
+ new IllegalArgumentException("Cannot merge regions from two different tables " +
+ tableName.get() + " and " + regionInfo.getTable()));
+ } else {
+ result.complete(tableName.get());
}
- });
+ }
+ });
}
private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA,
@@ -1178,41 +1168,42 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA);
final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB);
- checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB)
- .whenComplete((tableName, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
+ addListener(checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB),
+ (tableName, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
- MergeTableRegionsRequest request = null;
- try {
- request = RequestConverter.buildMergeTableRegionsRequest(
- new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
- ng.newNonce());
- } catch (DeserializationException e) {
- future.completeExceptionally(e);
- return;
- }
+ MergeTableRegionsRequest request = null;
+ try {
+ request = RequestConverter.buildMergeTableRegionsRequest(
+ new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
+ ng.newNonce());
+ } catch (DeserializationException e) {
+ future.completeExceptionally(e);
+ return;
+ }
+ addListener(
this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
(s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
- new MergeTableRegionProcedureBiConsumer(tableName)).whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
-
- });
+ new MergeTableRegionProcedureBiConsumer(tableName)),
+ (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
return future;
}
@Override
public CompletableFuture<Void> split(TableName tableName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- tableExists(tableName).whenComplete((exist, error) -> {
+ addListener(tableExists(tableName), (exist, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@@ -1221,45 +1212,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
future.completeExceptionally(new TableNotFoundException(tableName));
return;
}
- metaTable
+ addListener(
+ metaTable
.scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
- .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
- .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION)))
- .whenComplete((results, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- return;
- }
- if (results != null && !results.isEmpty()) {
- List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
- for (Result r : results) {
- if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) continue;
- RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
- if (rl != null) {
- for (HRegionLocation h : rl.getRegionLocations()) {
- if (h != null && h.getServerName() != null) {
- RegionInfo hri = h.getRegion();
- if (hri == null || hri.isSplitParent()
- || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID)
- continue;
- splitFutures.add(split(hri, null));
+ .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
+ .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
+ (results, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ return;
+ }
+ if (results != null && !results.isEmpty()) {
+ List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
+ for (Result r : results) {
+ if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
+ continue;
+ }
+ RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
+ if (rl != null) {
+ for (HRegionLocation h : rl.getRegionLocations()) {
+ if (h != null && h.getServerName() != null) {
+ RegionInfo hri = h.getRegion();
+ if (hri == null || hri.isSplitParent() ||
+ hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+ continue;
}
+ splitFutures.add(split(hri, null));
}
}
}
- CompletableFuture
- .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()]))
- .whenComplete((ret, exception) -> {
- if (exception != null) {
- future.completeExceptionally(exception);
- return;
- }
- future.complete(ret);
- });
- } else {
- future.complete(null);
}
- });
+ addListener(
+ CompletableFuture
+ .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
+ (ret, exception) -> {
+ if (exception != null) {
+ future.completeExceptionally(exception);
+ return;
+ }
+ future.complete(ret);
+ });
+ } else {
+ future.complete(null);
+ }
+ });
});
return future;
}
@@ -1270,54 +1266,52 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
if (splitPoint == null) {
return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
}
- connection.getRegionLocator(tableName).getRegionLocation(splitPoint)
- .whenComplete((loc, err) -> {
- if (err != null) {
- result.completeExceptionally(err);
- } else if (loc == null || loc.getRegion() == null) {
- result.completeExceptionally(new IllegalArgumentException(
- "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
- } else {
- splitRegion(loc.getRegion().getRegionName(), splitPoint)
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- result.completeExceptionally(err2);
- } else {
- result.complete(ret);
- }
+ addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint),
+ (loc, err) -> {
+ if (err != null) {
+ result.completeExceptionally(err);
+ } else if (loc == null || loc.getRegion() == null) {
+ result.completeExceptionally(new IllegalArgumentException(
+ "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
+ } else {
+ addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
+ if (err2 != null) {
+ result.completeExceptionally(err2);
+ } else {
+ result.complete(ret);
+ }
- });
- }
- });
+ });
+ }
+ });
return result;
}
@Override
public CompletableFuture<Void> splitRegion(byte[] regionName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- RegionInfo regionInfo = location.getRegion();
- if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
- future.completeExceptionally(new IllegalArgumentException(
- "Can't split replicas directly. "
- + "Replicas are auto-split when their primary is split."));
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
+ addListener(getRegionLocation(regionName), (location, err) -> {
+ RegionInfo regionInfo = location.getRegion();
+ if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+ future
+ .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
+ "Replicas are auto-split when their primary is split."));
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future
+ .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+ return;
+ }
+ addListener(split(regionInfo, null), (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
}
- split(regionInfo, null).whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
});
+ });
return future;
}
@@ -1326,35 +1320,34 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
Preconditions.checkNotNull(splitPoint,
"splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- RegionInfo regionInfo = location.getRegion();
- if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
- future.completeExceptionally(new IllegalArgumentException(
- "Can't split replicas directly. "
- + "Replicas are auto-split when their primary is split."));
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
- }
- if (regionInfo.getStartKey() != null
- && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
- future.completeExceptionally(new IllegalArgumentException(
- "should not give a splitkey which equals to startkey!"));
- return;
+ addListener(getRegionLocation(regionName), (location, err) -> {
+ RegionInfo regionInfo = location.getRegion();
+ if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+ future
+ .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
+ "Replicas are auto-split when their primary is split."));
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future
+ .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+ return;
+ }
+ if (regionInfo.getStartKey() != null &&
+ Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
+ future.completeExceptionally(
+ new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
+ return;
+ }
+ addListener(split(regionInfo, splitPoint), (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
}
- split(regionInfo, splitPoint).whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
});
+ });
return future;
}
@@ -1363,121 +1356,119 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
TableName tableName = hri.getTable();
SplitTableRegionRequest request = null;
try {
- request = RequestConverter
- .buildSplitTableRegionRequest(hri, splitPoint,
- ng.getNonceGroup(), ng.newNonce());
+ request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
+ ng.newNonce());
} catch (DeserializationException e) {
future.completeExceptionally(e);
return future;
}
- this.<SplitTableRegionRequest, SplitTableRegionResponse>procedureCall(request,
- (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
- new SplitTableRegionProcedureBiConsumer(tableName)).whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
+ addListener(this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(request,
+ (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
+ new SplitTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
return future;
}
@Override
public CompletableFuture<Void> assign(byte[] regionName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- this.<Void> newMasterCaller()
- .action(
- ((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
- controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo
- .getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done),
- resp -> null))).call().whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
+ addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ addListener(this.<Void> newMasterCaller()
+ .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
+ controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
+ (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
+ .call(), (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
return future;
}
@Override
public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
+ addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ addListener(
this.<Void> newMasterCaller()
- .action(
- ((controller, stub) -> this
- .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
- RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
- (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))).call()
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
+ .action(((controller, stub) -> this
+ .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
+ RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
+ (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
+ .call(),
+ (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
return future;
}
@Override
public CompletableFuture<Void> offline(byte[] regionName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
+ addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ addListener(
this.<Void> newMasterCaller()
- .action(
- ((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
- controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo
- .getRegionName()), (s, c, req, done) -> s.offlineRegion(c, req, done),
- resp -> null))).call().whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
+ .action(((controller, stub) -> this
+ .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
+ RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
+ (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
+ .call(),
+ (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
return future;
}
@Override
public CompletableFuture<Void> move(byte[] regionName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
+ addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ addListener(
moveRegion(
- RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null))
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
+ RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
+ (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
return future;
}
@@ -1486,20 +1477,20 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
Preconditions.checkNotNull(destServerName,
"destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete((regionInfo, err) -> {
+ addListener(getRegionInfo(regionName), (regionInfo, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
- moveRegion(
- RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName))
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
+ addListener(moveRegion(RequestConverter
+ .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
+ (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
});
return future;
}
@@ -1634,11 +1625,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
CompletableFuture<Void> future = new CompletableFuture<Void>();
- getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
+ addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
if (!completeExceptionally(future, error)) {
ReplicationPeerConfig newPeerConfig =
- ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
- updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> {
+ ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
+ addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
if (!completeExceptionally(future, error)) {
future.complete(result);
}
@@ -1656,24 +1647,23 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
CompletableFuture<Void> future = new CompletableFuture<Void>();
- getReplicationPeerConfig(id).whenComplete(
- (peerConfig, error) -> {
- if (!completeExceptionally(future, error)) {
- ReplicationPeerConfig newPeerConfig = null;
- try {
- newPeerConfig = ReplicationPeerConfigUtil
- .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
- } catch (ReplicationException e) {
- future.completeExceptionally(e);
- return;
- }
- updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> {
- if (!completeExceptionally(future, error)) {
- future.complete(result);
- }
- });
+ addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
+ if (!completeExceptionally(future, error)) {
+ ReplicationPeerConfig newPeerConfig = null;
+ try {
+ newPeerConfig = ReplicationPeerConfigUtil
+ .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
+ } catch (ReplicationException e) {
+ future.completeExceptionally(e);
+ return;
}
- });
+ addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
+ if (!completeExceptionally(future, error)) {
+ future.complete(result);
+ }
+ });
+ }
+ });
return future;
}
@@ -1708,31 +1698,30 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
- listTableDescriptors().whenComplete(
- (tables, error) -> {
- if (!completeExceptionally(future, error)) {
- List<TableCFs> replicatedTableCFs = new ArrayList<>();
- tables.forEach(table -> {
- Map<String, Integer> cfs = new HashMap<>();
- Stream.of(table.getColumnFamilies())
- .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
- .forEach(column -> {
- cfs.put(column.getNameAsString(), column.getScope());
- });
- if (!cfs.isEmpty()) {
- replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
- }
- });
- future.complete(replicatedTableCFs);
- }
- });
+ addListener(listTableDescriptors(), (tables, error) -> {
+ if (!completeExceptionally(future, error)) {
+ List<TableCFs> replicatedTableCFs = new ArrayList<>();
+ tables.forEach(table -> {
+ Map<String, Integer> cfs = new HashMap<>();
+ Stream.of(table.getColumnFamilies())
+ .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
+ .forEach(column -> {
+ cfs.put(column.getNameAsString(), column.getScope());
+ });
+ if (!cfs.isEmpty()) {
+ replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
+ }
+ });
+ future.complete(replicatedTableCFs);
+ }
+ });
return future;
}
@Override
public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
- SnapshotProtos.SnapshotDescription snapshot = ProtobufUtil
- .createHBaseProtosSnapshotDesc(snapshotDesc);
+ SnapshotProtos.SnapshotDescription snapshot =
+ ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
try {
ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
} catch (IllegalArgumentException e) {
@@ -1740,47 +1729,47 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
CompletableFuture<Void> future = new CompletableFuture<>();
final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
- this.<Long> newMasterCaller()
- .action(
- (controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
- stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
- resp -> resp.getExpectedTimeout())).call().whenComplete((expectedTimeout, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- TimerTask pollingTask = new TimerTask() {
- int tries = 0;
- long startTime = EnvironmentEdgeManager.currentTime();
- long endTime = startTime + expectedTimeout;
- long maxPauseTime = expectedTimeout / maxAttempts;
-
- @Override
- public void run(Timeout timeout) throws Exception {
- if (EnvironmentEdgeManager.currentTime() < endTime) {
- isSnapshotFinished(snapshotDesc).whenComplete((done, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else if (done) {
- future.complete(null);
- } else {
- // retry again after pauseTime.
- long pauseTime = ConnectionUtils.getPauseTime(
- TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
+ addListener(this.<Long> newMasterCaller()
+ .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
+ stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
+ resp -> resp.getExpectedTimeout()))
+ .call(), (expectedTimeout, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ TimerTask pollingTask = new TimerTask() {
+ int tries = 0;
+ long startTime = EnvironmentEdgeManager.currentTime();
+ long endTime = startTime + expectedTimeout;
+ long maxPauseTime = expectedTimeout / maxAttempts;
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (EnvironmentEdgeManager.currentTime() < endTime) {
+ addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else if (done) {
+ future.complete(null);
+ } else {
+ // retry again after pauseTime.
+ long pauseTime =
+ ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
pauseTime = Math.min(pauseTime, maxPauseTime);
- AsyncConnectionImpl.RETRY_TIMER
- .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
+ TimeUnit.MILLISECONDS);
}
- } );
- } else {
- future.completeExceptionally(new SnapshotCreationException("Snapshot '"
- + snapshot.getName() + "' wasn't completed in expectedTime:" + expectedTimeout
- + " ms", snapshotDesc));
- }
+ });
+ } else {
+ future.completeExceptionally(
+ new SnapshotCreationException("Snapshot '" + snapshot.getName() +
+ "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
}
- };
- AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
- });
+ }
+ };
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
+ });
return future;
}
@@ -1806,52 +1795,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
@Override
- public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot) {
+ public CompletableFuture<Void> restoreSnapshot(String snapshotName,
+ boolean takeFailSafeSnapshot) {
CompletableFuture<Void> future = new CompletableFuture<>();
- listSnapshots(Pattern.compile(snapshotName)).whenComplete(
- (snapshotDescriptions, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- TableName tableName = null;
- if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
- for (SnapshotDescription snap : snapshotDescriptions) {
- if (snap.getName().equals(snapshotName)) {
- tableName = snap.getTableName();
- break;
- }
+ addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ TableName tableName = null;
+ if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
+ for (SnapshotDescription snap : snapshotDescriptions) {
+ if (snap.getName().equals(snapshotName)) {
+ tableName = snap.getTableName();
+ break;
}
}
- if (tableName == null) {
- future.completeExceptionally(new RestoreSnapshotException(
- "Unable to find the table name for snapshot=" + snapshotName));
- return;
- }
- final TableName finalTableName = tableName;
- tableExists(finalTableName)
- .whenComplete((exists, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else if (!exists) {
- // if table does not exist, then just clone snapshot into new table.
- completeConditionalOnFuture(future,
- internalRestoreSnapshot(snapshotName, finalTableName));
+ }
+ if (tableName == null) {
+ future.completeExceptionally(new RestoreSnapshotException(
+ "Unable to find the table name for snapshot=" + snapshotName));
+ return;
+ }
+ final TableName finalTableName = tableName;
+ addListener(tableExists(finalTableName), (exists, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else if (!exists) {
+ // if table does not exist, then just clone snapshot into new table.
+ completeConditionalOnFuture(future,
+ internalRestoreSnapshot(snapshotName, finalTableName));
+ } else {
+ addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
+ if (err4 != null) {
+ future.completeExceptionally(err4);
+ } else if (!disabled) {
+ future.completeExceptionally(new TableNotDisabledException(finalTableName));
} else {
- isTableDisabled(finalTableName).whenComplete(
- (disabled, err4) -> {
- if (err4 != null) {
- future.completeExceptionally(err4);
- } else if (!disabled) {
- future.completeExceptionally(new TableNotDisabledException(finalTableName));
- } else {
- completeConditionalOnFuture(future,
- restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
- }
- });
+ completeConditionalOnFuture(future,
+ restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
}
- } );
+ });
+ }
});
+ });
return future;
}
@@ -1860,49 +1847,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
if (takeFailSafeSnapshot) {
CompletableFuture<Void> future = new CompletableFuture<>();
// Step.1 Take a snapshot of the current state
- String failSafeSnapshotSnapshotNameFormat = this.connection.getConfiguration().get(
- HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
- HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
- final String failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotNameFormat
- .replace("{snapshot.name}", snapshotName)
+ String failSafeSnapshotSnapshotNameFormat =
+ this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
+ HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
+ final String failSafeSnapshotSnapshotName =
+ failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
.replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
.replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
- snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, err) -> {
+ addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else {
// Step.2 Restore snapshot
- internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, err2) -> {
- if (err2 != null) {
- // Step.3.a Something went wrong during the restore and try to rollback.
- internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName).whenComplete(
- (void3, err3) -> {
- if (err3 != null) {
- future.completeExceptionally(err3);
- } else {
- String msg = "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
- + failSafeSnapshotSnapshotName + " succeeded.";
- future.completeExceptionally(new RestoreSnapshotException(msg));
- }
- });
- } else {
- // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
- LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
- deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete(
- (ret3, err3) -> {
- if (err3 != null) {
- LOG.error(
- "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, err3);
- future.completeExceptionally(err3);
- } else {
- future.complete(ret3);
- }
- });
+ addListener(internalRestoreSnapshot(snapshotName, tableName), (void2, err2) -> {
+ if (err2 != null) {
+ // Step.3.a Something went wrong during the restore and try to rollback.
+ addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName),
+ (void3, err3) -> {
+ if (err3 != null) {
+ future.completeExceptionally(err3);
+ } else {
+ String msg =
+ "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
+ failSafeSnapshotSnapshotName + " succeeded.";
+ future.completeExceptionally(new RestoreSnapshotException(msg));
+ }
+ });
+ } else {
+ // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
+ LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
+ addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
+ if (err3 != null) {
+ LOG.error(
+ "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
+ err3);
+ future.completeExceptionally(err3);
+ } else {
+ future.complete(ret3);
+ }
+ });
+ }
+ });
}
- } );
- }
- } );
+ });
return future;
} else {
return internalRestoreSnapshot(snapshotName, tableName);
@@ -1911,7 +1899,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
CompletableFuture<T> parentFuture) {
- parentFuture.whenComplete((res, err) -> {
+ addListener(parentFuture, (res, err) -> {
if (err != null) {
dependentFuture.completeExceptionally(err);
} else {
@@ -1923,7 +1911,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- tableExists(tableName).whenComplete((exists, err) -> {
+ addListener(tableExists(tableName), (exists, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else if (exists) {
@@ -1993,31 +1981,29 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
Pattern tableNamePattern, Pattern snapshotNamePattern) {
CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
- listTableNames(tableNamePattern, false).whenComplete(
- (tableNames, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
+ addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ if (tableNames == null || tableNames.size() <= 0) {
+ future.complete(Collections.emptyList());
+ return;
+ }
+ addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
return;
}
- if (tableNames == null || tableNames.size() <= 0) {
+ if (snapshotDescList == null || snapshotDescList.isEmpty()) {
future.complete(Collections.emptyList());
return;
}
- getCompletedSnapshots(snapshotNamePattern).whenComplete(
- (snapshotDescList, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- return;
- }
- if (snapshotDescList == null || snapshotDescList.isEmpty()) {
- future.complete(Collections.emptyList());
- return;
- }
- future.complete(snapshotDescList.stream()
- .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
- .collect(Collectors.toList()));
- });
+ future.complete(snapshotDescList.stream()
+ .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
+ .collect(Collectors.toList()));
});
+ });
return future;
}
@@ -2064,7 +2050,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
}
CompletableFuture<Void> future = new CompletableFuture<>();
- listSnapshotsFuture.whenComplete(((snapshotDescriptions, err) -> {
+ addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
@@ -2073,12 +2059,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
future.complete(null);
return;
}
- List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>();
- snapshotDescriptions.forEach(snapDesc -> deleteSnapshotFutures
- .add(internalDeleteSnapshot(snapDesc)));
- CompletableFuture.allOf(
- deleteSnapshotFutures.toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()]))
- .thenAccept(v -> future.complete(v));
+ addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
+ .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
+ if (e != null) {
+ future.completeExceptionally(e);
+ } else {
+ future.complete(v);
+ }
+ });
}));
return future;
}
@@ -2100,50 +2088,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
Map<String, String> props) {
CompletableFuture<Void> future = new CompletableFuture<>();
ProcedureDescription procDesc =
- ProtobufUtil.buildProcedureDescription(signature, instance, props);
- this.<Long> newMasterCaller()
- .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
- controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
- (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
- .call().whenComplete((expectedTimeout, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- TimerTask pollingTask = new TimerTask() {
- int tries = 0;
- long startTime = EnvironmentEdgeManager.currentTime();
- long endTime = startTime + expectedTimeout;
- long maxPauseTime = expectedTimeout / maxAttempts;
-
- @Override
- public void run(Timeout timeout) throws Exception {
- if (EnvironmentEdgeManager.currentTime() < endTime) {
- isProcedureFinished(signature, instance, props).whenComplete((done, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- return;
- }
- if (done) {
- future.complete(null);
- } else {
- // retry again after pauseTime.
- long pauseTime = ConnectionUtils
- .getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
- pauseTime = Math.min(pauseTime, maxPauseTime);
- AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
- TimeUnit.MICROSECONDS);
- }
- });
- } else {
- future.completeExceptionally(new IOException("Procedure '" + signature + " : "
- + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
- }
+ ProtobufUtil.buildProcedureDescription(signature, instance, props);
+ addListener(this.<Long> newMasterCaller()
+ .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
+ controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
+ (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
+ .call(), (expectedTimeout, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ TimerTask pollingTask = new TimerTask() {
+ int tries = 0;
+ long startTime = EnvironmentEdgeManager.currentTime();
+ long endTime = startTime + expectedTimeout;
+ long maxPauseTime = expectedTimeout / maxAttempts;
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (EnvironmentEdgeManager.currentTime() < endTime) {
+ addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ return;
+ }
+ if (done) {
+ future.complete(null);
+ } else {
+ // retry again after pauseTime.
+ long pauseTime =
+ ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
+ pauseTime = Math.min(pauseTime, maxPauseTime);
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
+ TimeUnit.MICROSECONDS);
+ }
+ });
+ } else {
+ future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
+ instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
}
- };
- // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
- AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
- });
+ }
+ };
+ // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
+ });
return future;
}
@@ -2262,15 +2250,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
- future.whenComplete((location, err) -> {
+ addListener(future, (location, err) -> {
if (err != null) {
returnedFuture.completeExceptionally(err);
return;
}
if (!location.isPresent() || location.get().getRegion() == null) {
- returnedFuture.completeExceptionally(new UnknownRegionException(
- "Invalid region name or encoded region name: "
- + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
+ returnedFuture.completeExceptionally(
+ new UnknownRegionException("Invalid region name or encoded region name: " +
+ Bytes.toStringBinary(regionNameOrEncodedRegionName)));
} else {
returnedFuture.complete(location.get());
}
@@ -2294,14 +2282,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
if (Bytes.equals(regionNameOrEncodedRegionName,
- RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
- || Bytes.equals(regionNameOrEncodedRegionName,
+ RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
+ Bytes.equals(regionNameOrEncodedRegionName,
RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
}
CompletableFuture<RegionInfo> future = new CompletableFuture<>();
- getRegionLocation(regionNameOrEncodedRegionName).whenComplete((location, err) -> {
+ addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else {
@@ -2343,7 +2331,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
+ private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
abstract void onFinished();
@@ -2359,7 +2347,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
+ private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
protected final TableName tableName;
TableProcedureBiConsumer(TableName tableName) {
@@ -2384,7 +2372,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
+ private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
protected final String namespaceName;
NamespaceProcedureBiConsumer(String namespaceName) {
@@ -2408,7 +2396,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
CreateTableProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2420,7 +2408,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
super(tableName);
@@ -2450,7 +2438,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
TruncateTableProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2462,7 +2450,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
EnableTableProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2474,7 +2462,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
DisableTableProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2486,7 +2474,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
AddColumnFamilyProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2498,7 +2486,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2510,7 +2498,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2522,7 +2510,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
+ private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
CreateNamespaceProcedureBiConsumer(String namespaceName) {
super(namespaceName);
@@ -2534,7 +2522,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
+ private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
DeleteNamespaceProcedureBiConsumer(String namespaceName) {
super(namespaceName);
@@ -2546,7 +2534,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
+ private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
ModifyNamespaceProcedureBiConsumer(String namespaceName) {
super(namespaceName);
@@ -2558,7 +2546,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
MergeTableRegionProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2570,7 +2558,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
SplitTableRegionProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2584,7 +2572,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
CompletableFuture<Void> future = new CompletableFuture<>();
- procFuture.whenComplete((procId, error) -> {
+ addListener(procFuture, (procId, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@@ -2595,30 +2583,33 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
- this.<GetProcedureResultResponse> newMasterCaller().action((controller, stub) -> this
- .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
- controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
- (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
- .call().whenComplete((response, error) -> {
- if (error != null) {
- LOG.warn("failed to get the procedure result procId={}", procId,
- ConnectionUtils.translateException(error));
- retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
- ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
- return;
- }
- if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
- retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
- ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
- return;
- }
- if (response.hasException()) {
- IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
- future.completeExceptionally(ioe);
- } else {
- future.complete(null);
- }
- });
+ addListener(
+ this.<GetProcedureResultResponse> newMasterCaller()
+ .action((controller, stub) -> this
+ .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
+ controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
+ (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
+ .call(),
+ (response, error) -> {
+ if (error != null) {
+ LOG.warn("failed to get the procedure result procId={}", procId,
+ ConnectionUtils.translateException(error));
+ retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
+ ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
+ return;
+ }
+ if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
+ retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
+ ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
+ return;
+ }
+ if (response.hasException()) {
+ IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
+ future.completeExceptionally(ioe);
+ } else {
+ future.complete(null);
+ }
+ });
}
private <T> CompletableFuture<T> failedFuture(Throwable error) {
@@ -2700,24 +2691,26 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> updateConfiguration() {
CompletableFuture<Void> future = new CompletableFuture<Void>();
- getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS))
- .whenComplete((status, err) -> {
+ addListener(
+ getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS)),
+ (status, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else {
List<CompletableFuture<Void>> futures = new ArrayList<>();
status.getLiveServerMetrics().keySet()
- .forEach(server -> futures.add(updateConfiguration(server)));
+ .forEach(server -> futures.add(updateConfiguration(server)));
futures.add(updateConfiguration(status.getMasterName()));
status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
- CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
- .whenComplete((result, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(result);
- }
- });
+ addListener(
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
+ (result, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(result);
+ }
+ });
}
});
return future;
@@ -2800,88 +2793,87 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
switch (compactType) {
case MOB:
- connection.registry.getMasterAddress().whenComplete((serverName, err) -> {
+ addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
- this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName).action(
- (controller, stub) -> this
- .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
+ addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
+ .action((controller, stub) -> this
+ .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
controller, stub,
RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
- (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)
- ).call().whenComplete((resp2, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- if (resp2.hasCompactionState()) {
- future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
+ (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
+ .call(), (resp2, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
} else {
- future.complete(CompactionState.NONE);
+ if (resp2.hasCompactionState()) {
+ future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
+ } else {
+ future.complete(CompactionState.NONE);
+ }
}
- }
- });
+ });
});
break;
case NORMAL:
- getTableHRegionLocations(tableName).whenComplete(
- (locations, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- List<CompactionState> regionStates = new ArrayList<>();
- List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
- locations.stream().filter(loc -> loc.getServerName() != null)
- .filter(loc -> loc.getRegion() != null)
- .filter(loc -> !loc.getRegion().isOffline())
- .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
- futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
- // If any region compaction state is MAJOR_AND_MINOR
- // the table compaction state is MAJOR_AND_MINOR, too.
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
- future.complete(regionState);
- } else {
- regionStates.add(regionState);
- }
- }));
- });
- CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
- .whenComplete((ret, err3) -> {
- // If future not completed, check all regions's compaction state
- if (!future.isCompletedExceptionally() && !future.isDone()) {
- CompactionState state = CompactionState.NONE;
- for (CompactionState regionState : regionStates) {
- switch (regionState) {
- case MAJOR:
- if (state == CompactionState.MINOR) {
- future.complete(CompactionState.MAJOR_AND_MINOR);
- } else {
- state = CompactionState.MAJOR;
- }
- break;
- case MINOR:
- if (state == CompactionState.MAJOR) {
- future.complete(CompactionState.MAJOR_AND_MINOR);
- } else {
- state = CompactionState.MINOR;
- }
- break;
- case NONE:
- default:
+ addListener(getTableHRegionLocations(tableName), (locations, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ List<CompactionState> regionStates = new ArrayList<>();
+ List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
+ locations.stream().filter(loc -> loc.getServerName() != null)
+ .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
+ .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
+ futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
+ // If any region compaction state is MAJOR_AND_MINOR
+ // the table compaction state is MAJOR_AND_MINOR, too.
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
+ future.complete(regionState);
+ } else {
+ regionStates.add(regionState);
+ }
+ }));
+ });
+ addListener(
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
+ (ret, err3) -> {
+ // If future not completed, check all regions's compaction state
+ if (!future.isCompletedExceptionally() && !future.isDone()) {
+ CompactionState state = CompactionState.NONE;
+ for (CompactionState regionState : regionStates) {
+ switch (regionState) {
+ case MAJOR:
+ if (state == CompactionState.MINOR) {
+ future.complete(CompactionState.MAJOR_AND_MINOR);
+ } else {
+ state = CompactionState.MAJOR;
}
- if (!future.isDone()) {
- future.complete(state);
+ break;
+ case MINOR:
+ if (state == CompactionState.MAJOR) {
+ future.complete(CompactionState.MAJOR_AND_MINOR);
+ } else {
+ state = CompactionState.MINOR;
}
- }
+ break;
+ case NONE:
+ default:
}
- });
- });
+ if (!future.isDone()) {
+ future.complete(state);
+ }
+ }
+ }
+ });
+ });
break;
default:
throw new IllegalArgumentException("Unknown compactType: " + compactType);
@@ -2893,37 +2885,38 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
CompletableFuture<CompactionState> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
- }
+ addListener(getRegionLocation(regionName), (location, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future
+ .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+ return;
+ }
+ addListener(
this.<GetRegionInfoResponse> newAdminCaller()
- .action(
- (controller, stub) -> this
- .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
- controller, stub, RequestConverter.buildGetRegionInfoRequest(location
- .getRegion().getRegionName(), true), (s, c, req, done) -> s
- .getRegionInfo(controller, req, done), resp -> resp))
- .serverName(serverName).call().whenComplete((resp2, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- if (resp2.hasCompactionState()) {
- future.complete(
<TRUNCATED>
[3/4] hbase git commit: HBASE-17356 Add replica get support
Posted by zh...@apache.org.
HBASE-17356 Add replica get support
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ff23c022
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ff23c022
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ff23c022
Branch: refs/heads/branch-2.0
Commit: ff23c022126ab0cb557bc55cd0b38043b1ed385d
Parents: 4caf2fb
Author: zhangduo <zh...@apache.org>
Authored: Tue Jan 1 21:59:37 2019 +0800
Committer: Duo Zhang <zh...@apache.org>
Committed: Thu Jan 3 10:55:06 2019 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/RegionLocations.java | 30 +-
.../client/AsyncBatchRpcRetryingCaller.java | 114 +-
.../client/AsyncConnectionConfiguration.java | 12 +
.../hbase/client/AsyncConnectionImpl.java | 1 -
.../hbase/client/AsyncMetaRegionLocator.java | 125 +-
.../hbase/client/AsyncNonMetaRegionLocator.java | 291 +--
.../hadoop/hbase/client/AsyncRegionLocator.java | 129 +-
.../hbase/client/AsyncRegionLocatorHelper.java | 147 ++
.../hbase/client/AsyncRpcRetryingCaller.java | 15 +-
.../client/AsyncRpcRetryingCallerFactory.java | 55 +-
.../AsyncSingleRequestRpcRetryingCaller.java | 71 +-
.../hbase/client/AsyncTableRegionLocator.java | 28 +-
.../client/AsyncTableRegionLocatorImpl.java | 6 +-
.../hbase/client/ConnectionConfiguration.java | 5 +-
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 1897 +++++++++---------
.../hadoop/hbase/client/RawAsyncTableImpl.java | 208 +-
.../apache/hadoop/hbase/util/FutureUtils.java | 60 +
.../hbase/client/RegionReplicaTestHelper.java | 161 ++
.../client/TestAsyncMetaRegionLocator.java | 55 +-
.../client/TestAsyncNonMetaRegionLocator.java | 126 +-
...syncNonMetaRegionLocatorConcurrenyLimit.java | 20 +-
...TestAsyncSingleRequestRpcRetryingCaller.java | 56 +-
.../client/TestAsyncTableLocatePrefetch.java | 4 +-
.../client/TestAsyncTableRegionReplicasGet.java | 204 ++
.../hbase/client/TestZKAsyncRegistry.java | 44 +-
25 files changed, 2301 insertions(+), 1563 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
index fd6f3c7..f98bf03 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
@@ -56,8 +56,8 @@ public class RegionLocations {
int index = 0;
for (HRegionLocation loc : locations) {
if (loc != null) {
- if (loc.getRegionInfo().getReplicaId() >= maxReplicaId) {
- maxReplicaId = loc.getRegionInfo().getReplicaId();
+ if (loc.getRegion().getReplicaId() >= maxReplicaId) {
+ maxReplicaId = loc.getRegion().getReplicaId();
maxReplicaIdIndex = index;
}
}
@@ -72,7 +72,7 @@ public class RegionLocations {
this.locations = new HRegionLocation[maxReplicaId + 1];
for (HRegionLocation loc : locations) {
if (loc != null) {
- this.locations[loc.getRegionInfo().getReplicaId()] = loc;
+ this.locations[loc.getRegion().getReplicaId()] = loc;
}
}
}
@@ -146,7 +146,7 @@ public class RegionLocations {
public RegionLocations remove(HRegionLocation location) {
if (location == null) return this;
if (location.getRegion() == null) return this;
- int replicaId = location.getRegionInfo().getReplicaId();
+ int replicaId = location.getRegion().getReplicaId();
if (replicaId >= locations.length) return this;
// check whether something to remove. HRL.compareTo() compares ONLY the
@@ -203,14 +203,14 @@ public class RegionLocations {
// in case of region replication going down, we might have a leak here.
int max = other.locations.length;
- HRegionInfo regionInfo = null;
+ RegionInfo regionInfo = null;
for (int i = 0; i < max; i++) {
HRegionLocation thisLoc = this.getRegionLocation(i);
HRegionLocation otherLoc = other.getRegionLocation(i);
- if (regionInfo == null && otherLoc != null && otherLoc.getRegionInfo() != null) {
+ if (regionInfo == null && otherLoc != null && otherLoc.getRegion() != null) {
// regionInfo is the first non-null HRI from other RegionLocations. We use it to ensure that
// all replica region infos belong to the same region with same region id.
- regionInfo = otherLoc.getRegionInfo();
+ regionInfo = otherLoc.getRegion();
}
HRegionLocation selectedLoc = selectRegionLocation(thisLoc,
@@ -232,7 +232,7 @@ public class RegionLocations {
for (int i=0; i < newLocations.length; i++) {
if (newLocations[i] != null) {
if (!RegionReplicaUtil.isReplicasForSameRegion(regionInfo,
- newLocations[i].getRegionInfo())) {
+ newLocations[i].getRegion())) {
newLocations[i] = null;
}
}
@@ -273,9 +273,9 @@ public class RegionLocations {
boolean checkForEquals, boolean force) {
assert location != null;
- int replicaId = location.getRegionInfo().getReplicaId();
+ int replicaId = location.getRegion().getReplicaId();
- HRegionLocation oldLoc = getRegionLocation(location.getRegionInfo().getReplicaId());
+ HRegionLocation oldLoc = getRegionLocation(location.getRegion().getReplicaId());
HRegionLocation selectedLoc = selectRegionLocation(oldLoc, location,
checkForEquals, force);
@@ -288,8 +288,8 @@ public class RegionLocations {
// ensure that all replicas share the same start code. Otherwise delete them
for (int i=0; i < newLocations.length; i++) {
if (newLocations[i] != null) {
- if (!RegionReplicaUtil.isReplicasForSameRegion(location.getRegionInfo(),
- newLocations[i].getRegionInfo())) {
+ if (!RegionReplicaUtil.isReplicasForSameRegion(location.getRegion(),
+ newLocations[i].getRegion())) {
newLocations[i] = null;
}
}
@@ -317,8 +317,8 @@ public class RegionLocations {
public HRegionLocation getRegionLocationByRegionName(byte[] regionName) {
for (HRegionLocation loc : locations) {
if (loc != null) {
- if (Bytes.equals(loc.getRegionInfo().getRegionName(), regionName)
- || Bytes.equals(loc.getRegionInfo().getEncodedNameAsBytes(), regionName)) {
+ if (Bytes.equals(loc.getRegion().getRegionName(), regionName)
+ || Bytes.equals(loc.getRegion().getEncodedNameAsBytes(), regionName)) {
return loc;
}
}
@@ -331,7 +331,7 @@ public class RegionLocations {
}
public HRegionLocation getDefaultRegionLocation() {
- return locations[HRegionInfo.DEFAULT_REPLICA_ID];
+ return locations[RegionInfo.DEFAULT_REPLICA_ID];
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 51b89a9..e268b2e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -23,8 +23,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.ArrayList;
@@ -43,24 +42,26 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Retry caller for batch.
@@ -121,10 +122,10 @@ class AsyncBatchRpcRetryingCaller<T> {
private static final class ServerRequest {
public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
- new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+ new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
public void addAction(HRegionLocation loc, Action action) {
- computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(),
+ computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(),
() -> new RegionRequest(loc)).actions.add(action);
}
}
@@ -173,11 +174,10 @@ class AsyncBatchRpcRetryingCaller<T> {
Throwable error, ServerName serverName) {
if (tries > startLogErrorsCnt) {
String regions =
- regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'")
- .collect(Collectors.joining(",", "[", "]"));
- LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName
- + " failed, tries=" + tries,
- error);
+ regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'")
+ .collect(Collectors.joining(",", "[", "]"));
+ LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName +
+ " failed, tries=" + tries, error);
}
}
@@ -191,7 +191,7 @@ class AsyncBatchRpcRetryingCaller<T> {
errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>());
}
errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
- getExtraContextForError(serverName)));
+ getExtraContextForError(serverName)));
}
private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) {
@@ -204,7 +204,7 @@ class AsyncBatchRpcRetryingCaller<T> {
return;
}
ThrowableWithExtraContext errorWithCtx =
- new ThrowableWithExtraContext(error, currentTime, extras);
+ new ThrowableWithExtraContext(error, currentTime, extras);
List<ThrowableWithExtraContext> errors = removeErrors(action);
if (errors == null) {
errors = Collections.singletonList(errorWithCtx);
@@ -227,7 +227,7 @@ class AsyncBatchRpcRetryingCaller<T> {
return;
}
future.completeExceptionally(new RetriesExhaustedException(tries,
- Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
+ Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
});
}
@@ -242,9 +242,9 @@ class AsyncBatchRpcRetryingCaller<T> {
// multiRequestBuilder will be populated with region actions.
// rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
// action list.
- RequestConverter.buildNoDataRegionActions(entry.getKey(),
- entry.getValue().actions, cells, multiRequestBuilder, regionActionBuilder, actionBuilder,
- mutationBuilder, nonceGroup, rowMutationsIndexMap);
+ RequestConverter.buildNoDataRegionActions(entry.getKey(), entry.getValue().actions, cells,
+ multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
+ rowMutationsIndexMap);
}
return multiRequestBuilder.build();
}
@@ -254,15 +254,15 @@ class AsyncBatchRpcRetryingCaller<T> {
RegionResult regionResult, List<Action> failedActions, Throwable regionException) {
Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
if (result == null) {
- LOG.error("Server " + serverName + " sent us neither result nor exception for row '"
- + Bytes.toStringBinary(action.getAction().getRow()) + "' of "
- + regionReq.loc.getRegionInfo().getRegionNameAsString());
+ LOG.error("Server " + serverName + " sent us neither result nor exception for row '" +
+ Bytes.toStringBinary(action.getAction().getRow()) + "' of " +
+ regionReq.loc.getRegion().getRegionNameAsString());
addError(action, new RuntimeException("Invalid response"), serverName);
failedActions.add(action);
} else if (result instanceof Throwable) {
Throwable error = translateException((Throwable) result);
logException(tries, () -> Stream.of(regionReq), error, serverName);
- conn.getLocator().updateCachedLocation(regionReq.loc, error);
+ conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
getExtraContextForError(serverName));
@@ -281,20 +281,19 @@ class AsyncBatchRpcRetryingCaller<T> {
RegionResult regionResult = resp.getResults().get(rn);
Throwable regionException = resp.getException(rn);
if (regionResult != null) {
- regionReq.actions.forEach(
- action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions,
- regionException));
+ regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName,
+ regionResult, failedActions, regionException));
} else {
Throwable error;
if (regionException == null) {
- LOG.error(
- "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
+ LOG
+ .error("Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
error = new RuntimeException("Invalid response");
} else {
error = translateException(regionException);
}
logException(tries, () -> Stream.of(regionReq), error, serverName);
- conn.getLocator().updateCachedLocation(regionReq.loc, error);
+ conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failAll(regionReq.actions.stream(), tries, error, serverName);
return;
@@ -314,8 +313,7 @@ class AsyncBatchRpcRetryingCaller<T> {
remainingNs = remainingTimeNs();
if (remainingNs <= 0) {
failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream())
- .flatMap(r -> r.actions.stream()),
- tries);
+ .flatMap(r -> r.actions.stream()), tries);
return;
}
} else {
@@ -366,15 +364,15 @@ class AsyncBatchRpcRetryingCaller<T> {
ServerName serverName) {
Throwable error = translateException(t);
logException(tries, () -> actionsByRegion.values().stream(), error, serverName);
- actionsByRegion
- .forEach((rn, regionReq) -> conn.getLocator().updateCachedLocation(regionReq.loc, error));
+ actionsByRegion.forEach(
+ (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error));
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,
serverName);
return;
}
List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
- .collect(Collectors.toList());
+ .collect(Collectors.toList());
addError(copiedActions, error, serverName);
tryResubmit(copiedActions.stream(), tries);
}
@@ -407,30 +405,30 @@ class AsyncBatchRpcRetryingCaller<T> {
}
ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
- CompletableFuture.allOf(actions
- .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
- RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
- if (error != null) {
- error = translateException(error);
- if (error instanceof DoNotRetryIOException) {
- failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
- return;
- }
- addError(action, error, null);
- locateFailed.add(action);
- } else {
- computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new)
- .addAction(loc, action);
+ addListener(CompletableFuture.allOf(actions
+ .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
+ RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
+ if (error != null) {
+ error = translateException(error);
+ if (error instanceof DoNotRetryIOException) {
+ failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
+ return;
}
- }))
- .toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {
- if (!actionsByServer.isEmpty()) {
- send(actionsByServer, tries);
- }
- if (!locateFailed.isEmpty()) {
- tryResubmit(locateFailed.stream(), tries);
+ addError(action, error, null);
+ locateFailed.add(action);
+ } else {
+ computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc,
+ action);
}
- });
+ }))
+ .toArray(CompletableFuture[]::new)), (v, r) -> {
+ if (!actionsByServer.isEmpty()) {
+ send(actionsByServer, tries);
+ }
+ if (!locateFailed.isEmpty()) {
+ tryResubmit(locateFailed.stream(), tries);
+ }
+ });
}
public List<CompletableFuture<T>> call() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 915e9dd..fa051a5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -39,6 +39,8 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
@@ -94,6 +96,10 @@ class AsyncConnectionConfiguration {
private final long writeBufferPeriodicFlushTimeoutNs;
+ // this is for supporting region replica get, if the primary does not finished within this
+ // timeout, we will send request to secondaries.
+ private final long primaryCallTimeoutNs;
+
@SuppressWarnings("deprecation")
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
@@ -124,6 +130,8 @@ class AsyncConnectionConfiguration {
this.writeBufferPeriodicFlushTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
+ this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
+ conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT));
}
long getMetaOperationTimeoutNs() {
@@ -181,4 +189,8 @@ class AsyncConnectionConfiguration {
long getWriteBufferPeriodicFlushTimeoutNs() {
return writeBufferPeriodicFlushTimeoutNs;
}
+
+ long getPrimaryCallTimeoutNs() {
+ return primaryCallTimeoutNs;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index b26f6b2..507bfc3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -136,7 +136,6 @@ class AsyncConnectionImpl implements AsyncConnection {
}
// we will override this method for testing retry caller, so do not remove this method.
- @VisibleForTesting
AsyncRegionLocator getLocator() {
return locator;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
index 06b5b57..9fef15d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
@@ -17,11 +17,16 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocator.*;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,43 +41,43 @@ class AsyncMetaRegionLocator {
private final AsyncRegistry registry;
- private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
+ private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>();
- private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture =
- new AtomicReference<>();
+ private final AtomicReference<CompletableFuture<RegionLocations>> metaRelocateFuture =
+ new AtomicReference<>();
AsyncMetaRegionLocator(AsyncRegistry registry) {
this.registry = registry;
}
- CompletableFuture<HRegionLocation> getRegionLocation(boolean reload) {
+ /**
+ * Get the region locations for meta region. If the location for the given replica is not
+ * available in the cached locations, then fetch from the HBase cluster.
+ * <p/>
+ * The <code>replicaId</code> parameter is important. If the region replication config for meta
+ * region is changed, then the cached region locations may not have the locations for new
+ * replicas. If we do not check the location for the given replica, we will always return the
+ * cached region locations and cause an infinite loop.
+ */
+ CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) {
for (;;) {
if (!reload) {
- HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
- if (metaRegionLocation != null) {
- return CompletableFuture.completedFuture(metaRegionLocation);
+ RegionLocations locs = this.metaRegionLocations.get();
+ if (isGood(locs, replicaId)) {
+ return CompletableFuture.completedFuture(locs);
}
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("Meta region location cache is null, try fetching from registry.");
- }
+ LOG.trace("Meta region location cache is null, try fetching from registry.");
if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Start fetching meta region location from registry.");
- }
- CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+ LOG.debug("Start fetching meta region location from registry.");
+ CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
registry.getMetaRegionLocation().whenComplete((locs, error) -> {
if (error != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to fetch meta region location from registry", error);
- }
+ LOG.debug("Failed to fetch meta region location from registry", error);
metaRelocateFuture.getAndSet(null).completeExceptionally(error);
return;
}
- HRegionLocation loc = locs.getDefaultRegionLocation();
- if (LOG.isDebugEnabled()) {
- LOG.debug("The fetched meta region location is " + loc);
- }
+ LOG.debug("The fetched meta region location is {}" + locs);
// Here we update cache before reset future, so it is possible that someone can get a
// stale value. Consider this:
// 1. update cache
@@ -82,12 +87,12 @@ class AsyncMetaRegionLocator {
// cleared in step 2.
// But we do not think it is a big deal as it rarely happens, and even if it happens, the
// caller will retry again later, no correctness problems.
- this.metaRegionLocation.set(loc);
+ this.metaRegionLocations.set(locs);
metaRelocateFuture.set(null);
- future.complete(loc);
+ future.complete(locs);
});
} else {
- CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+ CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
if (future != null) {
return future;
}
@@ -95,30 +100,56 @@ class AsyncMetaRegionLocator {
}
}
- void updateCachedLocation(HRegionLocation loc, Throwable exception) {
- AsyncRegionLocator.updateCachedLocation(loc, exception, l -> metaRegionLocation.get(),
- newLoc -> {
- for (;;) {
- HRegionLocation oldLoc = metaRegionLocation.get();
- if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum() ||
- oldLoc.getServerName().equals(newLoc.getServerName()))) {
- return;
- }
- if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) {
- return;
- }
- }
- }, l -> {
- for (;;) {
- HRegionLocation oldLoc = metaRegionLocation.get();
- if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) {
- return;
- }
+ private HRegionLocation getCacheLocation(HRegionLocation loc) {
+ RegionLocations locs = metaRegionLocations.get();
+ return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
+ }
+
+ private void addLocationToCache(HRegionLocation loc) {
+ for (;;) {
+ int replicaId = loc.getRegion().getReplicaId();
+ RegionLocations oldLocs = metaRegionLocations.get();
+ if (oldLocs == null) {
+ RegionLocations newLocs = createRegionLocations(loc);
+ if (metaRegionLocations.compareAndSet(null, newLocs)) {
+ return;
}
- });
+ }
+ HRegionLocation oldLoc = oldLocs.getRegionLocation(replicaId);
+ if (oldLoc != null && (oldLoc.getSeqNum() > loc.getSeqNum() ||
+ oldLoc.getServerName().equals(loc.getServerName()))) {
+ return;
+ }
+ RegionLocations newLocs = replaceRegionLocation(oldLocs, loc);
+ if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
+ return;
+ }
+ }
+ }
+
+ private void removeLocationFromCache(HRegionLocation loc) {
+ for (;;) {
+ RegionLocations oldLocs = metaRegionLocations.get();
+ if (oldLocs == null) {
+ return;
+ }
+ HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
+ if (!canUpdateOnError(loc, oldLoc)) {
+ return;
+ }
+ RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
+ if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
+ return;
+ }
+ }
+ }
+
+ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+ AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
+ this::addLocationToCache, this::removeLocationFromCache);
}
void clearCache() {
- metaRegionLocation.set(null);
+ metaRegionLocations.set(null);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 7e3d56c..1fcfbb0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.NINES;
import static org.apache.hadoop.hbase.HConstants.ZEROES;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.mergeRegionLocations;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
@@ -39,7 +44,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
@@ -53,6 +60,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Objects;
/**
* The asynchronous locator for regions other than meta.
@@ -83,9 +91,9 @@ class AsyncNonMetaRegionLocator {
private static final class LocateRequest {
- public final byte[] row;
+ private final byte[] row;
- public final RegionLocateType locateType;
+ private final RegionLocateType locateType;
public LocateRequest(byte[] row, RegionLocateType locateType) {
this.row = row;
@@ -109,12 +117,12 @@ class AsyncNonMetaRegionLocator {
private static final class TableCache {
- public final ConcurrentNavigableMap<byte[], HRegionLocation> cache =
+ private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
- public final Set<LocateRequest> pendingRequests = new HashSet<>();
+ private final Set<LocateRequest> pendingRequests = new HashSet<>();
- public final Map<LocateRequest, CompletableFuture<HRegionLocation>> allRequests =
+ private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
new LinkedHashMap<>();
public boolean hasQuota(int max) {
@@ -133,25 +141,29 @@ class AsyncNonMetaRegionLocator {
return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
}
- public void clearCompletedRequests(Optional<HRegionLocation> location) {
- for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter =
+ public void clearCompletedRequests(Optional<RegionLocations> locations) {
+ for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
allRequests.entrySet().iterator(); iter.hasNext();) {
- Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next();
- if (tryComplete(entry.getKey(), entry.getValue(), location)) {
+ Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
+ if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
iter.remove();
}
}
}
- private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future,
- Optional<HRegionLocation> location) {
+ private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
+ Optional<RegionLocations> locations) {
if (future.isDone()) {
return true;
}
- if (!location.isPresent()) {
+ if (!locations.isPresent()) {
return false;
}
- HRegionLocation loc = location.get();
+ RegionLocations locs = locations.get();
+ HRegionLocation loc = ObjectUtils.firstNonNull(locs.getRegionLocations());
+ // we should at least have one location available, otherwise the request should fail and
+ // should not arrive here
+ assert loc != null;
boolean completed;
if (req.locateType.equals(RegionLocateType.BEFORE)) {
// for locating the row before current row, the common case is to find the previous region
@@ -166,7 +178,7 @@ class AsyncNonMetaRegionLocator {
completed = loc.getRegion().containsRow(req.row);
}
if (completed) {
- future.complete(loc);
+ future.complete(locs);
return true;
} else {
return false;
@@ -186,59 +198,59 @@ class AsyncNonMetaRegionLocator {
return computeIfAbsent(cache, tableName, TableCache::new);
}
- private void removeFromCache(HRegionLocation loc) {
- TableCache tableCache = cache.get(loc.getRegion().getTable());
- if (tableCache == null) {
- return;
+ private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
+ HRegionLocation[] locArr1 = locs1.getRegionLocations();
+ HRegionLocation[] locArr2 = locs2.getRegionLocations();
+ if (locArr1.length != locArr2.length) {
+ return false;
}
- tableCache.cache.computeIfPresent(loc.getRegion().getStartKey(), (k, oldLoc) -> {
- if (oldLoc.getSeqNum() > loc.getSeqNum() ||
- !oldLoc.getServerName().equals(loc.getServerName())) {
- return oldLoc;
+ for (int i = 0; i < locArr1.length; i++) {
+ // do not need to compare region info
+ HRegionLocation loc1 = locArr1[i];
+ HRegionLocation loc2 = locArr2[i];
+ if (loc1 == null) {
+ if (loc2 != null) {
+ return false;
+ }
+ } else {
+ if (loc2 == null) {
+ return false;
+ }
+ if (loc1.getSeqNum() != loc2.getSeqNum()) {
+ return false;
+ }
+ if (Objects.equal(loc1.getServerName(), loc2.getServerName())) {
+ return false;
+ }
}
- return null;
- });
+ }
+ return true;
}
// return whether we add this loc to cache
- private boolean addToCache(TableCache tableCache, HRegionLocation loc) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Try adding " + loc + " to cache");
- }
- byte[] startKey = loc.getRegion().getStartKey();
- HRegionLocation oldLoc = tableCache.cache.putIfAbsent(startKey, loc);
- if (oldLoc == null) {
- return true;
- }
- if (oldLoc.getSeqNum() > loc.getSeqNum() ||
- oldLoc.getServerName().equals(loc.getServerName())) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc +
- " is newer than us or has the same server name");
- }
- return false;
- }
- return loc == tableCache.cache.compute(startKey, (k, oldValue) -> {
- if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) {
- return loc;
+ private boolean addToCache(TableCache tableCache, RegionLocations locs) {
+ LOG.trace("Try adding {} to cache", locs);
+ byte[] startKey = locs.getDefaultRegionLocation().getRegion().getStartKey();
+ for (;;) {
+ RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
+ if (oldLocs == null) {
+ return true;
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue +
+ RegionLocations mergedLocs = mergeRegionLocations(locs, oldLocs);
+ if (isEqual(mergedLocs, oldLocs)) {
+ // the merged one is the same with the old one, give up
+ LOG.trace("Will not add {} to cache because the old value {} " +
" is newer than us or has the same server name." +
- " Maybe it is updated before we replace it");
+ " Maybe it is updated before we replace it", locs, oldLocs);
+ return false;
}
- return oldValue;
- });
- }
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
- justification = "Called by lambda expression")
- private void addToCache(HRegionLocation loc) {
- addToCache(getTableCache(loc.getRegion().getTable()), loc);
- LOG.trace("Try adding {} to cache", loc);
+ if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
+ return true;
+ }
+ }
}
- private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
+ private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
Throwable error) {
if (error != null) {
LOG.warn("Failed to locate region in '" + tableName + "', row='" +
@@ -246,8 +258,8 @@ class AsyncNonMetaRegionLocator {
}
Optional<LocateRequest> toSend = Optional.empty();
TableCache tableCache = getTableCache(tableName);
- if (loc != null) {
- if (!addToCache(tableCache, loc)) {
+ if (locs != null) {
+ if (!addToCache(tableCache, locs)) {
// someone is ahead of us.
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
@@ -269,7 +281,7 @@ class AsyncNonMetaRegionLocator {
future.completeExceptionally(error);
}
}
- tableCache.clearCompletedRequests(Optional.ofNullable(loc));
+ tableCache.clearCompletedRequests(Optional.ofNullable(locs));
// Remove a complete locate request in a synchronized block, so the table cache must have
// quota to send a candidate request.
toSend = tableCache.getCandidate();
@@ -286,9 +298,11 @@ class AsyncNonMetaRegionLocator {
Bytes.toStringBinary(req.row), req.locateType, locs);
}
+ // the default region location should always be presented when fetching from meta, otherwise
+ // let's fail the request.
if (locs == null || locs.getDefaultRegionLocation() == null) {
complete(tableName, req, null,
- new IOException(String.format("No location found for '%s', row='%s', locateType=%s",
+ new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
tableName, Bytes.toStringBinary(req.row), req.locateType)));
return true;
}
@@ -296,58 +310,60 @@ class AsyncNonMetaRegionLocator {
RegionInfo info = loc.getRegion();
if (info == null) {
complete(tableName, req, null,
- new IOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
+ new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
tableName, Bytes.toStringBinary(req.row), req.locateType)));
return true;
}
if (info.isSplitParent()) {
return false;
}
- if (loc.getServerName() == null) {
- complete(tableName, req, null,
- new IOException(
- String.format("No server address listed for region '%s', row='%s', locateType=%s",
- info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType)));
- return true;
- }
- complete(tableName, req, loc, null);
+ complete(tableName, req, locs, null);
return true;
}
- private HRegionLocation locateRowInCache(TableCache tableCache, TableName tableName, byte[] row) {
- Map.Entry<byte[], HRegionLocation> entry = tableCache.cache.floorEntry(row);
+ private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
+ int replicaId) {
+ Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
if (entry == null) {
return null;
}
- HRegionLocation loc = entry.getValue();
+ RegionLocations locs = entry.getValue();
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null) {
+ return null;
+ }
byte[] endKey = loc.getRegion().getEndKey();
if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
- Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT);
+ LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+ Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
}
- return loc;
+ return locs;
} else {
return null;
}
}
- private HRegionLocation locateRowBeforeInCache(TableCache tableCache, TableName tableName,
- byte[] row) {
+ private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
+ byte[] row, int replicaId) {
boolean isEmptyStopRow = isEmptyStopRow(row);
- Map.Entry<byte[], HRegionLocation> entry =
- isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
+ Map.Entry<byte[], RegionLocations> entry =
+ isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
if (entry == null) {
return null;
}
- HRegionLocation loc = entry.getValue();
+ RegionLocations locs = entry.getValue();
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null) {
+ return null;
+ }
if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
(!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
- Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE);
+ LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+ Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
}
- return loc;
+ return locs;
} else {
return null;
}
@@ -390,8 +406,8 @@ class AsyncNonMetaRegionLocator {
if (tableNotFound) {
complete(tableName, req, null, new TableNotFoundException(tableName));
} else if (!completeNormally) {
- complete(tableName, req, null, new IOException(
- "Unable to find region for " + Bytes.toStringBinary(req.row) + " in " + tableName));
+ complete(tableName, req, null, new IOException("Unable to find region for '" +
+ Bytes.toStringBinary(req.row) + "' in " + tableName));
}
}
@@ -423,13 +439,12 @@ class AsyncNonMetaRegionLocator {
continue;
}
RegionInfo info = loc.getRegion();
- if (info == null || info.isOffline() || info.isSplitParent() ||
- loc.getServerName() == null) {
+ if (info == null || info.isOffline() || info.isSplitParent()) {
continue;
}
- if (addToCache(tableCache, loc)) {
+ if (addToCache(tableCache, locs)) {
synchronized (tableCache) {
- tableCache.clearCompletedRequests(Optional.of(loc));
+ tableCache.clearCompletedRequests(Optional.of(locs));
}
}
}
@@ -438,36 +453,36 @@ class AsyncNonMetaRegionLocator {
});
}
- private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row,
- RegionLocateType locateType) {
+ private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
+ int replicaId, RegionLocateType locateType) {
return locateType.equals(RegionLocateType.BEFORE)
- ? locateRowBeforeInCache(tableCache, tableName, row)
- : locateRowInCache(tableCache, tableName, row);
+ ? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
+ : locateRowInCache(tableCache, tableName, row, replicaId);
}
// locateToPrevious is true means we will use the start key of a region to locate the region
// placed before it. Used for reverse scan. See the comment of
// AsyncRegionLocator.getPreviousRegionLocation.
- private CompletableFuture<HRegionLocation> getRegionLocationInternal(TableName tableName,
- byte[] row, RegionLocateType locateType, boolean reload) {
+ private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName,
+ byte[] row, int replicaId, RegionLocateType locateType, boolean reload) {
// AFTER should be convert to CURRENT before calling this method
assert !locateType.equals(RegionLocateType.AFTER);
TableCache tableCache = getTableCache(tableName);
if (!reload) {
- HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType);
- if (loc != null) {
- return CompletableFuture.completedFuture(loc);
+ RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
+ if (isGood(locs, replicaId)) {
+ return CompletableFuture.completedFuture(locs);
}
}
- CompletableFuture<HRegionLocation> future;
+ CompletableFuture<RegionLocations> future;
LocateRequest req;
boolean sendRequest = false;
synchronized (tableCache) {
// check again
if (!reload) {
- HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType);
- if (loc != null) {
- return CompletableFuture.completedFuture(loc);
+ RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
+ if (isGood(locs, replicaId)) {
+ return CompletableFuture.completedFuture(locs);
}
}
req = new LocateRequest(row, locateType);
@@ -487,28 +502,58 @@ class AsyncNonMetaRegionLocator {
return future;
}
- CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
- RegionLocateType locateType, boolean reload) {
- if (locateType.equals(RegionLocateType.BEFORE)) {
- return getRegionLocationInternal(tableName, row, locateType, reload);
- } else {
- // as we know the exact row after us, so we can just create the new row, and use the same
- // algorithm to locate it.
- if (locateType.equals(RegionLocateType.AFTER)) {
- row = createClosestRowAfter(row);
- }
- return getRegionLocationInternal(tableName, row, RegionLocateType.CURRENT, reload);
+ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
+ int replicaId, RegionLocateType locateType, boolean reload) {
+ // as we know the exact row after us, so we can just create the new row, and use the same
+ // algorithm to locate it.
+ if (locateType.equals(RegionLocateType.AFTER)) {
+ row = createClosestRowAfter(row);
+ locateType = RegionLocateType.CURRENT;
}
+ return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
}
- void updateCachedLocation(HRegionLocation loc, Throwable exception) {
- AsyncRegionLocator.updateCachedLocation(loc, exception, l -> {
- TableCache tableCache = cache.get(l.getRegion().getTable());
- if (tableCache == null) {
- return null;
+ private void removeLocationFromCache(HRegionLocation loc) {
+ TableCache tableCache = cache.get(loc.getRegion().getTable());
+ if (tableCache == null) {
+ return;
+ }
+ byte[] startKey = loc.getRegion().getStartKey();
+ for (;;) {
+ RegionLocations oldLocs = tableCache.cache.get(startKey);
+ HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
+ if (!canUpdateOnError(loc, oldLoc)) {
+ return;
}
- return tableCache.cache.get(l.getRegion().getStartKey());
- }, this::addToCache, this::removeFromCache);
+ RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
+ if (newLocs == null) {
+ if (tableCache.cache.remove(startKey, oldLocs)) {
+ return;
+ }
+ } else {
+ if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
+ return;
+ }
+ }
+ }
+ }
+
+ private void addLocationToCache(HRegionLocation loc) {
+ addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
+ }
+
+ private HRegionLocation getCachedLocation(HRegionLocation loc) {
+ TableCache tableCache = cache.get(loc.getRegion().getTable());
+ if (tableCache == null) {
+ return null;
+ }
+ RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
+ return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
+ }
+
+ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+ AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
+ this::addLocationToCache, this::removeLocationFromCache);
}
void clearCache(TableName tableName) {
@@ -526,11 +571,11 @@ class AsyncNonMetaRegionLocator {
// only used for testing whether we have cached the location for a region.
@VisibleForTesting
- HRegionLocation getRegionLocationInCache(TableName tableName, byte[] row) {
+ RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
TableCache tableCache = cache.get(tableName);
if (tableCache == null) {
return null;
}
- return locateRowInCache(tableCache, tableName, row);
+ return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 56228ab..d624974 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -18,26 +18,24 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
-import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
-import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
import java.util.function.Supplier;
-
+import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionException;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.exceptions.RegionMovedException;
-import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timeout;
/**
* The asynchronous region locator.
@@ -59,8 +57,8 @@ class AsyncRegionLocator {
this.retryTimer = retryTimer;
}
- private CompletableFuture<HRegionLocation> withTimeout(CompletableFuture<HRegionLocation> future,
- long timeoutNs, Supplier<String> timeoutMsg) {
+ private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs,
+ Supplier<String> timeoutMsg) {
if (future.isDone() || timeoutNs <= 0) {
return future;
}
@@ -78,74 +76,75 @@ class AsyncRegionLocator {
});
}
- CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+ private boolean isMeta(TableName tableName) {
+ return TableName.isMetaTableName(tableName);
+ }
+
+ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
RegionLocateType type, boolean reload, long timeoutNs) {
+ CompletableFuture<RegionLocations> future = isMeta(tableName)
+ ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload)
+ : nonMetaRegionLocator.getRegionLocations(tableName, row,
+ RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
+ return withTimeout(future, timeoutNs,
+ () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
+ "ms) waiting for region locations for " + tableName + ", row='" +
+ Bytes.toStringBinary(row) + "'");
+ }
+
+ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+ int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
// meta region can not be split right now so we always call the same method.
// Change it later if the meta table can have more than one regions.
- CompletableFuture<HRegionLocation> future =
- tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation(reload)
- : nonMetaRegionLocator.getRegionLocation(tableName, row, type, reload);
+ CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
+ CompletableFuture<RegionLocations> locsFuture =
+ isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload)
+ : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
+ addListener(locsFuture, (locs, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null) {
+ future
+ .completeExceptionally(new RegionException("No location for " + tableName + ", row='" +
+ Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId));
+ } else if (loc.getServerName() == null) {
+ future.completeExceptionally(new HBaseIOException("No server address listed for region '" +
+ loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) +
+ "', locateType=" + type + ", replicaId=" + replicaId));
+ } else {
+ future.complete(loc);
+ }
+ });
return withTimeout(future, timeoutNs,
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
- "ms) waiting for region location for " + tableName + ", row='" +
- Bytes.toStringBinary(row) + "'");
+ "ms) waiting for region location for " + tableName + ", row='" + Bytes.toStringBinary(row) +
+ "', replicaId=" + replicaId);
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
- RegionLocateType type, long timeoutNs) {
- return getRegionLocation(tableName, row, type, false, timeoutNs);
+ int replicaId, RegionLocateType type, long timeoutNs) {
+ return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs);
}
- static boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
- // Do not need to update if no such location, or the location is newer, or the location is not
- // same with us
- return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() &&
- oldLoc.getServerName().equals(loc.getServerName());
+ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+ RegionLocateType type, boolean reload, long timeoutNs) {
+ return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload,
+ timeoutNs);
}
- static void updateCachedLocation(HRegionLocation loc, Throwable exception,
- Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
- Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
- HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Try updating " + loc + ", the old value is " + oldLoc, exception);
- }
- if (!canUpdate(loc, oldLoc)) {
- return;
- }
- Throwable cause = findException(exception);
- if (LOG.isDebugEnabled()) {
- LOG.debug("The actual exception when updating " + loc, cause);
- }
- if (cause == null || !isMetaClearingException(cause)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Will not update " + loc + " because the exception is null or not the one we care about");
- }
- return;
- }
- if (cause instanceof RegionMovedException) {
- RegionMovedException rme = (RegionMovedException) cause;
- HRegionLocation newLoc =
- new HRegionLocation(loc.getRegionInfo(), rme.getServerName(), rme.getLocationSeqNum());
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Try updating " + loc + " with the new location " + newLoc + " constructed by " + rme);
- }
- addToCache.accept(newLoc);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Try removing " + loc + " from cache");
- }
- removeFromCache.accept(loc);
- }
+ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+ RegionLocateType type, long timeoutNs) {
+ return getRegionLocation(tableName, row, type, false, timeoutNs);
}
- void updateCachedLocation(HRegionLocation loc, Throwable exception) {
+ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
if (loc.getRegion().isMetaRegion()) {
- metaRegionLocator.updateCachedLocation(loc, exception);
+ metaRegionLocator.updateCachedLocationOnError(loc, exception);
} else {
- nonMetaRegionLocator.updateCachedLocation(loc, exception);
+ nonMetaRegionLocator.updateCachedLocationOnError(loc, exception);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
new file mode 100644
index 0000000..5c9c091
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
+import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
+
+import java.util.Arrays;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class for asynchronous region locator.
+ */
+@InterfaceAudience.Private
+final class AsyncRegionLocatorHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocatorHelper.class);
+
+ private AsyncRegionLocatorHelper() {
+ }
+
+ static boolean canUpdateOnError(HRegionLocation loc, HRegionLocation oldLoc) {
+ // Do not need to update if no such location, or the location is newer, or the location is not
+ // the same with us
+ return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() &&
+ oldLoc.getServerName().equals(loc.getServerName());
+ }
+
+ static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
+ Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
+ Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
+ HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
+ LOG.debug("Try updating {} , the old value is {}", loc, oldLoc, exception);
+ if (!canUpdateOnError(loc, oldLoc)) {
+ return;
+ }
+ Throwable cause = findException(exception);
+ LOG.debug("The actual exception when updating {}", loc, cause);
+ if (cause == null || !isMetaClearingException(cause)) {
+ LOG.debug("Will not update {} because the exception is null or not the one we care about",
+ loc);
+ return;
+ }
+ if (cause instanceof RegionMovedException) {
+ RegionMovedException rme = (RegionMovedException) cause;
+ HRegionLocation newLoc =
+ new HRegionLocation(loc.getRegion(), rme.getServerName(), rme.getLocationSeqNum());
+ LOG.debug("Try updating {} with the new location {} constructed by {}", loc, newLoc, rme);
+ addToCache.accept(newLoc);
+ } else {
+ LOG.debug("Try removing {} from cache", loc);
+ removeFromCache.accept(loc);
+ }
+ }
+
+ static RegionLocations createRegionLocations(HRegionLocation loc) {
+ int replicaId = loc.getRegion().getReplicaId();
+ HRegionLocation[] locs = new HRegionLocation[replicaId + 1];
+ locs[replicaId] = loc;
+ return new RegionLocations(locs);
+ }
+
+ /**
+ * Create a new {@link RegionLocations} based on the given {@code oldLocs}, and replace the
+ * location for the given {@code replicaId} with the given {@code loc}.
+ * <p/>
+ * All the {@link RegionLocations} in async locator related class are immutable because we want to
+ * access them concurrently, so here we need to create a new one, instead of calling
+ * {@link RegionLocations#updateLocation(HRegionLocation, boolean, boolean)}.
+ */
+ static RegionLocations replaceRegionLocation(RegionLocations oldLocs, HRegionLocation loc) {
+ int replicaId = loc.getRegion().getReplicaId();
+ HRegionLocation[] locs = oldLocs.getRegionLocations();
+ locs = Arrays.copyOf(locs, Math.max(replicaId + 1, locs.length));
+ locs[replicaId] = loc;
+ return new RegionLocations(locs);
+ }
+
+ /**
+ * Create a new {@link RegionLocations} based on the given {@code oldLocs}, and remove the
+ * location for the given {@code replicaId}.
+ * <p/>
+ * All the {@link RegionLocations} in async locator related class are immutable because we want to
+ * access them concurrently, so here we need to create a new one, instead of calling
+ * {@link RegionLocations#remove(int)}.
+ */
+ static RegionLocations removeRegionLocation(RegionLocations oldLocs, int replicaId) {
+ HRegionLocation[] locs = oldLocs.getRegionLocations();
+ if (locs.length < replicaId + 1) {
+ // Here we do not modify the oldLocs so it is safe to return it.
+ return oldLocs;
+ }
+ locs = Arrays.copyOf(locs, locs.length);
+ locs[replicaId] = null;
+ if (ObjectUtils.firstNonNull(locs) != null) {
+ return new RegionLocations(locs);
+ } else {
+ // if all the locations are null, just return null
+ return null;
+ }
+ }
+
+ /**
+ * Create a new {@link RegionLocations} which is the merging result for the given two
+ * {@link RegionLocations}.
+ * <p/>
+ * All the {@link RegionLocations} in async locator related class are immutable because we want to
+ * access them concurrently, so here we need to create a new one, instead of calling
+ * {@link RegionLocations#mergeLocations(RegionLocations)} directly.
+ */
+ static RegionLocations mergeRegionLocations(RegionLocations newLocs, RegionLocations oldLocs) {
+ RegionLocations locs = new RegionLocations(newLocs.getRegionLocations());
+ locs.mergeLocations(oldLocs);
+ return locs;
+ }
+
+ static boolean isGood(RegionLocations locs, int replicaId) {
+ if (locs == null) {
+ return false;
+ }
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ return loc != null && loc.getServerName() != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index d30012f..e03049a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -88,15 +88,15 @@ public abstract class AsyncRpcRetryingCaller<T> {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
}
- protected long remainingTimeNs() {
+ protected final long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
}
- protected void completeExceptionally() {
+ protected final void completeExceptionally() {
future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
}
- protected void resetCallTimeout() {
+ protected final void resetCallTimeout() {
long callTimeoutNs;
if (operationTimeoutNs > 0) {
callTimeoutNs = remainingTimeNs();
@@ -111,8 +111,15 @@ public abstract class AsyncRpcRetryingCaller<T> {
resetController(controller, callTimeoutNs);
}
- protected void onError(Throwable error, Supplier<String> errMsg,
+ protected final void onError(Throwable error, Supplier<String> errMsg,
Consumer<Throwable> updateCachedLocation) {
+ if (future.isDone()) {
+ // Give up if the future is already done, this is possible if user has already canceled the
+ // future. And for timeline consistent read, we will also cancel some requests if we have
+ // already get one of the responses.
+ LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled());
+ return;
+ }
error = translateException(error);
if (error instanceof DoNotRetryIOException) {
future.completeExceptionally(error);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index f80b4e5..a660e74 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -17,22 +17,22 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
@@ -75,6 +75,8 @@ class AsyncRpcRetryingCallerFactory {
private RegionLocateType locateType = RegionLocateType.CURRENT;
+ private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
+
public SingleRequestCallerBuilder<T> table(TableName tableName) {
this.tableName = tableName;
return this;
@@ -121,11 +123,17 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public SingleRequestCallerBuilder<T> replicaId(int replicaId) {
+ this.replicaId = replicaId;
+ return this;
+ }
+
public AsyncSingleRequestRpcRetryingCaller<T> build() {
+ checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
- checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
- checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
- pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+ checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), replicaId,
+ checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
+ pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
/**
@@ -241,11 +249,11 @@ class AsyncRpcRetryingCallerFactory {
public AsyncScanSingleRegionRpcRetryingCaller build() {
checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
- checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
- checkNotNull(resultCache, "resultCache is null"),
- checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
- checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
- pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+ checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
+ checkNotNull(resultCache, "resultCache is null"),
+ checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
+ checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
+ pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
/**
@@ -311,7 +319,7 @@ class AsyncRpcRetryingCallerFactory {
public <T> AsyncBatchRpcRetryingCaller<T> build() {
return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
- maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+ maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
public <T> List<CompletableFuture<T>> call() {
@@ -363,8 +371,8 @@ class AsyncRpcRetryingCallerFactory {
public AsyncMasterRequestRpcRetryingCaller<T> build() {
return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn,
- checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
- rpcTimeoutNs, startLogErrorsCnt);
+ checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
+ rpcTimeoutNs, startLogErrorsCnt);
}
/**
@@ -390,7 +398,8 @@ class AsyncRpcRetryingCallerFactory {
private ServerName serverName;
- public AdminRequestCallerBuilder<T> action(AsyncAdminRequestRetryingCaller.Callable<T> callable) {
+ public AdminRequestCallerBuilder<T> action(
+ AsyncAdminRequestRetryingCaller.Callable<T> callable) {
this.callable = callable;
return this;
}
@@ -420,15 +429,15 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
- public AdminRequestCallerBuilder<T> serverName(ServerName serverName){
+ public AdminRequestCallerBuilder<T> serverName(ServerName serverName) {
this.serverName = serverName;
return this;
}
public AsyncAdminRequestRetryingCaller<T> build() {
return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
- operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName,
- "serverName is null"), checkNotNull(callable, "action is null"));
+ operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
+ checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
}
public CompletableFuture<T> call() {
@@ -436,7 +445,7 @@ class AsyncRpcRetryingCallerFactory {
}
}
- public <T> AdminRequestCallerBuilder<T> adminRequest(){
+ public <T> AdminRequestCallerBuilder<T> adminRequest() {
return new AdminRequestCallerBuilder<>();
}
@@ -488,8 +497,8 @@ class AsyncRpcRetryingCallerFactory {
public AsyncServerRequestRpcRetryingCaller<T> build() {
return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
- operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName,
- "serverName is null"), checkNotNull(callable, "action is null"));
+ operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
+ checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
}
public CompletableFuture<T> call() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 56c82fb..1a52e5c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -17,17 +17,19 @@
*/
package org.apache.hadoop.hbase.client;
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
/**
* Retry caller for a single request, such as get, put, delete, etc.
@@ -45,18 +47,21 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
private final byte[] row;
+ private final int replicaId;
+
private final RegionLocateType locateType;
private final Callable<T> callable;
public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
- TableName tableName, byte[] row, RegionLocateType locateType, Callable<T> callable,
- long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
- int startLogErrorsCnt) {
+ TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
+ Callable<T> callable, long pauseNs, int maxAttempts, long operationTimeoutNs,
+ long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
- startLogErrorsCnt);
+ startLogErrorsCnt);
this.tableName = tableName;
this.row = row;
+ this.replicaId = replicaId;
this.locateType = locateType;
this.callable = callable;
}
@@ -67,23 +72,22 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
stub = conn.getRegionServerStub(loc.getServerName());
} catch (IOException e) {
onError(e,
- () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row)
- + "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
- err -> conn.getLocator().updateCachedLocation(loc, err));
+ () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) +
+ "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
+ err -> conn.getLocator().updateCachedLocationOnError(loc, err));
return;
}
resetCallTimeout();
- callable.call(controller, loc, stub).whenComplete(
- (result, error) -> {
- if (error != null) {
- onError(error,
- () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in "
- + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
- err -> conn.getLocator().updateCachedLocation(loc, err));
- return;
- }
- future.complete(result);
- });
+ callable.call(controller, loc, stub).whenComplete((result, error) -> {
+ if (error != null) {
+ onError(error,
+ () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
+ loc.getRegion().getEncodedName() + " of " + tableName + " failed",
+ err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+ return;
+ }
+ future.complete(result);
+ });
}
@Override
@@ -98,18 +102,17 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
} else {
locateTimeoutNs = -1L;
}
- conn.getLocator()
- .getRegionLocation(tableName, row, locateType, locateTimeoutNs)
- .whenComplete(
- (loc, error) -> {
- if (error != null) {
- onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName
- + " failed", err -> {
- });
- return;
- }
- call(loc);
- });
+ addListener(
+ conn.getLocator().getRegionLocation(tableName, row, replicaId, locateType, locateTimeoutNs),
+ (loc, error) -> {
+ if (error != null) {
+ onError(error,
+ () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> {
+ });
+ return;
+ }
+ call(loc);
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
index dbfcef5..3bda38e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.client;
import java.util.concurrent.CompletableFuture;
-
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
@@ -55,5 +54,30 @@ public interface AsyncTableRegionLocator {
* @param row Row to find.
* @param reload true to reload information or false to use cached information
*/
- CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload);
+ default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
+ return getRegionLocation(row, RegionReplicaUtil.DEFAULT_REPLICA_ID, reload);
+ }
+
+ /**
+ * Finds the region with the given <code>replicaId</code> on which the given row is being served.
+ * <p>
+ * Returns the location of the region with the given <code>replicaId</code> to which the row
+ * belongs.
+ * @param row Row to find.
+ * @param replicaId the replica id of the region
+ */
+ default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId) {
+ return getRegionLocation(row, replicaId, false);
+ }
+
+ /**
+ * Finds the region with the given <code>replicaId</code> on which the given row is being served.
+ * <p>
+ * Returns the location of the region with the given <code>replicaId</code> to which the row
+ * belongs.
+ * @param row Row to find.
+ * @param replicaId the replica id of the region
+ * @param reload true to reload information or false to use cached information
+ */
+ CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, boolean reload);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
index 7d199df..465a411 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -44,7 +44,9 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
}
@Override
- public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
- return locator.getRegionLocation(tableName, row, RegionLocateType.CURRENT, reload, -1L);
+ public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId,
+ boolean reload) {
+ return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload,
+ -1L);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index d996004..55c62e7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -38,6 +38,9 @@ public class ConnectionConfiguration {
public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1 second
public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";
public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;
+ public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND =
+ "hbase.client.primaryCallTimeout.get";
+ public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; // 10ms
private final long writeBufferSize;
private final long writeBufferPeriodicFlushTimeoutMs;
@@ -86,7 +89,7 @@ public class ConnectionConfiguration {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
this.primaryCallTimeoutMicroSecond =
- conf.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10ms
+ conf.getInt(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT);
this.replicaCallTimeoutMicroSecondScan =
conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms
[4/4] hbase git commit: HBASE-18569 Add prefetch support for async
region locator
Posted by zh...@apache.org.
HBASE-18569 Add prefetch support for async region locator
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4caf2fb0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4caf2fb0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4caf2fb0
Branch: refs/heads/branch-2.0
Commit: 4caf2fb0d848821d96b98dc449589eae0124b2b7
Parents: 40d2787
Author: zhangduo <zh...@apache.org>
Authored: Fri Jun 22 08:48:33 2018 +0800
Committer: Duo Zhang <zh...@apache.org>
Committed: Thu Jan 3 10:55:06 2019 +0800
----------------------------------------------------------------------
.../hbase/client/AsyncNonMetaRegionLocator.java | 75 +++++++++++++++---
.../client/TestAsyncTableLocatePrefetch.java | 82 ++++++++++++++++++++
2 files changed, 145 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/4caf2fb0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index f6d74a5..7e3d56c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -52,6 +52,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
/**
* The asynchronous locator for regions other than meta.
*/
@@ -60,15 +62,23 @@ class AsyncNonMetaRegionLocator {
private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class);
+ @VisibleForTesting
static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
"hbase.client.meta.max.concurrent.locate.per.table";
private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
+ @VisibleForTesting
+ static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
+
+ private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
+
private final AsyncConnectionImpl conn;
private final int maxConcurrentLocateRequestPerTable;
+ private final int locatePrefetchLimit;
+
private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
private static final class LocateRequest {
@@ -168,6 +178,8 @@ class AsyncNonMetaRegionLocator {
this.conn = conn;
this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
+ this.locatePrefetchLimit =
+ conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
}
private TableCache getTableCache(TableName tableName) {
@@ -223,9 +235,7 @@ class AsyncNonMetaRegionLocator {
justification = "Called by lambda expression")
private void addToCache(HRegionLocation loc) {
addToCache(getTableCache(loc.getRegion().getTable()), loc);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Try adding " + loc + " to cache");
- }
+ LOG.trace("Try adding {} to cache", loc);
}
private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
@@ -271,8 +281,10 @@ class AsyncNonMetaRegionLocator {
// return whether we should stop the scan
private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
RegionLocations locs = MetaTableAccessor.getRegionLocations(result);
- LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
- Bytes.toStringBinary(req.row), req.locateType, locs);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
+ Bytes.toStringBinary(req.row), req.locateType, locs);
+ }
if (locs == null || locs.getDefaultRegionLocation() == null) {
complete(tableName, req, null,
@@ -294,8 +306,8 @@ class AsyncNonMetaRegionLocator {
if (loc.getServerName() == null) {
complete(tableName, req, null,
new IOException(
- String.format("No server address listed for region '%s', row='%s', locateType=%s",
- info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType)));
+ String.format("No server address listed for region '%s', row='%s', locateType=%s",
+ info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType)));
return true;
}
complete(tableName, req, loc, null);
@@ -361,7 +373,7 @@ class AsyncNonMetaRegionLocator {
RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
conn.getTable(META_TABLE_NAME)
.scan(new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
- .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5)
+ .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
.setReadType(ReadType.PREAD), new AdvancedScanResultConsumer() {
private boolean completeNormally = false;
@@ -385,12 +397,41 @@ class AsyncNonMetaRegionLocator {
@Override
public void onNext(Result[] results, ScanController controller) {
- for (Result result : results) {
- tableNotFound = false;
- if (onScanNext(tableName, req, result)) {
+ if (results.length == 0) {
+ return;
+ }
+ tableNotFound = false;
+ int i = 0;
+ for (; i < results.length; i++) {
+ if (onScanNext(tableName, req, results[i])) {
completeNormally = true;
controller.terminate();
- return;
+ i++;
+ break;
+ }
+ }
+ // Add the remaining results into cache
+ if (i < results.length) {
+ TableCache tableCache = getTableCache(tableName);
+ for (; i < results.length; i++) {
+ RegionLocations locs = MetaTableAccessor.getRegionLocations(results[i]);
+ if (locs == null) {
+ continue;
+ }
+ HRegionLocation loc = locs.getDefaultRegionLocation();
+ if (loc == null) {
+ continue;
+ }
+ RegionInfo info = loc.getRegion();
+ if (info == null || info.isOffline() || info.isSplitParent() ||
+ loc.getServerName() == null) {
+ continue;
+ }
+ if (addToCache(tableCache, loc)) {
+ synchronized (tableCache) {
+ tableCache.clearCompletedRequests(Optional.of(loc));
+ }
+ }
}
}
}
@@ -482,4 +523,14 @@ class AsyncNonMetaRegionLocator {
}
}
}
+
+ // only used for testing whether we have cached the location for a region.
+ @VisibleForTesting
+ HRegionLocation getRegionLocationInCache(TableName tableName, byte[] row) {
+ TableCache tableCache = cache.get(tableName);
+ if (tableCache == null) {
+ return null;
+ }
+ return locateRowInCache(tableCache, tableName, row);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4caf2fb0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
new file mode 100644
index 0000000..13d8000
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableLocatePrefetch {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncTableLocatePrefetch.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("async");
+
+ private static byte[] FAMILY = Bytes.toBytes("cf");
+
+ private static AsyncConnection CONN;
+
+ private static AsyncNonMetaRegionLocator LOCATOR;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL.getConfiguration().setInt(AsyncNonMetaRegionLocator.LOCATE_PREFETCH_LIMIT, 100);
+ TEST_UTIL.startMiniCluster(3);
+ TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAMILY);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+ LOCATOR = new AsyncNonMetaRegionLocator((AsyncConnectionImpl) CONN);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ Closeables.close(CONN, true);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void test() throws InterruptedException, ExecutionException {
+ assertNotNull(LOCATOR
+ .getRegionLocation(TABLE_NAME, Bytes.toBytes("zzz"), RegionLocateType.CURRENT, false).get());
+ // we finish the request before we adding the remaining results to cache so sleep a bit here
+ Thread.sleep(1000);
+ // confirm that the locations of all the regions have been cached.
+ assertNotNull(LOCATOR.getRegionLocationInCache(TABLE_NAME, Bytes.toBytes("aaa")));
+ for (byte[] row : HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE) {
+ assertNotNull(LOCATOR.getRegionLocationInCache(TABLE_NAME, row));
+ }
+ }
+}