You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/12/25 12:38:02 UTC
[1/2] hbase git commit: HBASE-17345 Implement batch
Repository: hbase
Updated Branches:
refs/heads/master 8da7366fc -> 8fa5b0b94
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java
deleted file mode 100644
index 612e830..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.function.BiFunction;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
-@Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncTableMultiGet {
-
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
- private static TableName TABLE_NAME = TableName.valueOf("async");
-
- private static byte[] FAMILY = Bytes.toBytes("cf");
-
- private static byte[] CQ = Bytes.toBytes("cq");
-
- private static int COUNT = 100;
-
- private static AsyncConnection ASYNC_CONN;
-
- @Parameter
- public Supplier<AsyncTableBase> getTable;
-
- private static RawAsyncTable getRawTable() {
- return ASYNC_CONN.getRawTable(TABLE_NAME);
- }
-
- private static AsyncTable getTable() {
- return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
- }
-
- @Parameters
- public static List<Object[]> params() {
- return Arrays.asList(new Supplier<?>[] { TestAsyncTableMultiGet::getRawTable },
- new Supplier<?>[] { TestAsyncTableMultiGet::getTable });
- }
-
- @BeforeClass
- public static void setUp() throws Exception {
- TEST_UTIL.startMiniCluster(3);
- byte[][] splitKeys = new byte[8][];
- for (int i = 11; i < 99; i += 11) {
- splitKeys[i / 11 - 1] = Bytes.toBytes(String.format("%02d", i));
- }
- TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
- TEST_UTIL.waitTableAvailable(TABLE_NAME);
- TEST_UTIL.getAdmin().setBalancerRunning(false, true);
- ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
- RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME);
- List<CompletableFuture<?>> futures = new ArrayList<>();
- IntStream.range(0, COUNT).forEach(i -> futures.add(table.put(
- new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i)))));
- CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- ASYNC_CONN.close();
- TEST_UTIL.shutdownMiniCluster();
- }
-
- private void move() throws IOException, InterruptedException {
- HRegionServer src = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME);
- HRegionServer dst = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
- .map(t -> t.getRegionServer()).filter(r -> r != src).findAny().get();
- Region region = src.getOnlineRegions(TABLE_NAME).stream().findAny().get();
- TEST_UTIL.getAdmin().move(region.getRegionInfo().getEncodedNameAsBytes(),
- Bytes.toBytes(dst.getServerName().getServerName()));
- Thread.sleep(1000);
- }
-
- private void test(BiFunction<AsyncTableBase, List<Get>, List<Result>> getFunc)
- throws IOException, InterruptedException {
- AsyncTableBase table = getTable.get();
- List<Get> gets =
- IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(String.format("%02d", i))))
- .collect(Collectors.toList());
- List<Result> results = getFunc.apply(table, gets);
- assertEquals(COUNT, results.size());
- for (int i = 0; i < COUNT; i++) {
- Result result = results.get(i);
- assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ)));
- }
- // test basic failure recovery
- move();
- results = getFunc.apply(table, gets);
- assertEquals(COUNT, results.size());
- for (int i = 0; i < COUNT; i++) {
- Result result = results.get(i);
- assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ)));
- }
- }
-
- @Test
- public void testGet() throws InterruptedException, IOException {
- test((table, gets) -> {
- return table.get(gets).stream().map(f -> {
- try {
- return f.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- }).collect(Collectors.toList());
- });
-
- }
-
- @Test
- public void testGetAll() throws InterruptedException, IOException {
- test((table, gets) -> {
- try {
- return table.getAll(gets).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- });
- }
-}
[2/2] hbase git commit: HBASE-17345 Implement batch
Posted by zh...@apache.org.
HBASE-17345 Implement batch
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8fa5b0b9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8fa5b0b9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8fa5b0b9
Branch: refs/heads/master
Commit: 8fa5b0b946c01516076fa944a310b33224ff21a9
Parents: 8da7366
Author: zhangduo <zh...@apache.org>
Authored: Thu Dec 22 19:42:15 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Dec 25 20:36:52 2016 +0800
----------------------------------------------------------------------
.../client/AsyncBatchRpcRetryingCaller.java | 476 +++++++++++++++++++
.../client/AsyncMultiGetRpcRetryingCaller.java | 407 ----------------
.../client/AsyncRpcRetryingCallerFactory.java | 39 +-
.../AsyncScanSingleRegionRpcRetryingCaller.java | 2 +-
.../AsyncSingleRequestRpcRetryingCaller.java | 2 +-
.../hadoop/hbase/client/AsyncTableBase.java | 103 +++-
.../hadoop/hbase/client/AsyncTableImpl.java | 4 +-
.../hadoop/hbase/client/ConnectionUtils.java | 62 ++-
.../hadoop/hbase/client/RawAsyncTableImpl.java | 7 +-
.../hbase/shaded/protobuf/RequestConverter.java | 5 +-
.../client/AbstractTestAsyncTableScan.java | 12 +-
.../hbase/client/TestAsyncGetMultiThread.java | 150 ------
.../hbase/client/TestAsyncTableBatch.java | 236 +++++++++
.../client/TestAsyncTableGetMultiThreaded.java | 149 ++++++
.../hbase/client/TestAsyncTableMultiGet.java | 163 -------
15 files changed, 1032 insertions(+), 785 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
new file mode 100644
index 0000000..6f0b8e9
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
+import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
+
+import io.netty.util.HashedWheelTimer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.util.AtomicUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Retry caller for batch.
+ * <p>
+ * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with
+ * other single operations
+ * <p>
+ * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the
+ * implementation, we will record a {@code tries} parameter for each operation group, and if it is
+ * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can
+ * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of
+ * the depth of the tree.
+ */
+@InterfaceAudience.Private
+class AsyncBatchRpcRetryingCaller<T> {
+
+ private static final Log LOG = LogFactory.getLog(AsyncBatchRpcRetryingCaller.class);
+
+ private final HashedWheelTimer retryTimer;
+
+ private final AsyncConnectionImpl conn;
+
+ private final TableName tableName;
+
+ private final List<Action> actions;
+
+ private final List<CompletableFuture<T>> futures;
+
+ private final IdentityHashMap<Action, CompletableFuture<T>> action2Future;
+
+ private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;
+
+ private final long pauseNs;
+
+ private final int maxAttempts;
+
+ private final long operationTimeoutNs;
+
+ private final long readRpcTimeoutNs;
+
+ private final long writeRpcTimeoutNs;
+
+ private final int startLogErrorsCnt;
+
+ private final long startNs;
+
+ // we can not use HRegionLocation as the map key because the hashCode and equals method of
+ // HRegionLocation only consider serverName.
+ private static final class RegionRequest {
+
+ public final HRegionLocation loc;
+
+ public final ConcurrentLinkedQueue<Action> actions = new ConcurrentLinkedQueue<>();
+
+ public RegionRequest(HRegionLocation loc) {
+ this.loc = loc;
+ }
+ }
+
+ private static final class ServerRequest {
+
+ public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
+ new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+
+ public final AtomicLong rpcTimeoutNs;
+
+ public ServerRequest(long defaultRpcTimeoutNs) {
+ this.rpcTimeoutNs = new AtomicLong(defaultRpcTimeoutNs);
+ }
+
+ public void addAction(HRegionLocation loc, Action action, long rpcTimeoutNs) {
+ computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(),
+ () -> new RegionRequest(loc)).actions.add(action);
+ // try update the timeout to a larger value
+ if (this.rpcTimeoutNs.get() <= 0) {
+ return;
+ }
+ if (rpcTimeoutNs <= 0) {
+ this.rpcTimeoutNs.set(-1L);
+ return;
+ }
+ AtomicUtils.updateMax(this.rpcTimeoutNs, rpcTimeoutNs);
+ }
+ }
+
+ public AsyncBatchRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+ TableName tableName, List<? extends Row> actions, long pauseNs, int maxRetries,
+ long operationTimeoutNs, long readRpcTimeoutNs, long writeRpcTimeoutNs,
+ int startLogErrorsCnt) {
+ this.retryTimer = retryTimer;
+ this.conn = conn;
+ this.tableName = tableName;
+ this.pauseNs = pauseNs;
+ this.maxAttempts = retries2Attempts(maxRetries);
+ this.operationTimeoutNs = operationTimeoutNs;
+ this.readRpcTimeoutNs = readRpcTimeoutNs;
+ this.writeRpcTimeoutNs = writeRpcTimeoutNs;
+ this.startLogErrorsCnt = startLogErrorsCnt;
+
+ this.actions = new ArrayList<>(actions.size());
+ this.futures = new ArrayList<>(actions.size());
+ this.action2Future = new IdentityHashMap<>(actions.size());
+ for (int i = 0, n = actions.size(); i < n; i++) {
+ Row rawAction = actions.get(i);
+ Action action = new Action(rawAction, i);
+ if (rawAction instanceof Append || rawAction instanceof Increment) {
+ action.setNonce(conn.getNonceGenerator().newNonce());
+ }
+ this.actions.add(action);
+ CompletableFuture<T> future = new CompletableFuture<>();
+ futures.add(future);
+ action2Future.put(action, future);
+ }
+ this.action2Errors = new IdentityHashMap<>();
+ this.startNs = System.nanoTime();
+ }
+
+ private long remainingTimeNs() {
+ return operationTimeoutNs - (System.nanoTime() - startNs);
+ }
+
+ private List<ThrowableWithExtraContext> removeErrors(Action action) {
+ synchronized (action2Errors) {
+ return action2Errors.remove(action);
+ }
+ }
+
+ private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier,
+ Throwable error, ServerName serverName) {
+ if (tries > startLogErrorsCnt) {
+ String regions =
+ regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'")
+ .collect(Collectors.joining(",", "[", "]"));
+ LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName
+ + " failed, tries=" + tries,
+ error);
+ }
+ }
+
+ private String getExtraContextForError(ServerName serverName) {
+ return serverName != null ? serverName.getServerName() : "";
+ }
+
+ private void addError(Action action, Throwable error, ServerName serverName) {
+ List<ThrowableWithExtraContext> errors;
+ synchronized (action2Errors) {
+ errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>());
+ }
+ errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
+ getExtraContextForError(serverName)));
+ }
+
+ private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) {
+ actions.forEach(action -> addError(action, error, serverName));
+ }
+
+ private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) {
+ CompletableFuture<T> future = action2Future.get(action);
+ if (future.isDone()) {
+ return;
+ }
+ ThrowableWithExtraContext errorWithCtx =
+ new ThrowableWithExtraContext(error, currentTime, extras);
+ List<ThrowableWithExtraContext> errors = removeErrors(action);
+ if (errors == null) {
+ errors = Collections.singletonList(errorWithCtx);
+ } else {
+ errors.add(errorWithCtx);
+ }
+ future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors));
+ }
+
+ private void failAll(Stream<Action> actions, int tries, Throwable error, ServerName serverName) {
+ long currentTime = EnvironmentEdgeManager.currentTime();
+ String extras = getExtraContextForError(serverName);
+ actions.forEach(action -> failOne(action, tries, error, currentTime, extras));
+ }
+
+ private void failAll(Stream<Action> actions, int tries) {
+ actions.forEach(action -> {
+ CompletableFuture<T> future = action2Future.get(action);
+ if (future.isDone()) {
+ return;
+ }
+ future.completeExceptionally(new RetriesExhaustedException(tries,
+ Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
+ });
+ }
+
+ private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
+ List<CellScannable> cells) throws IOException {
+ ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
+ ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
+ ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
+ ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
+ for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
+ // TODO: remove the extra for loop as we will iterate it in mutationBuilder.
+ if (!multiRequestBuilder.hasNonceGroup()) {
+ for (Action action : entry.getValue().actions) {
+ if (action.hasNonce()) {
+ multiRequestBuilder.setNonceGroup(conn.getNonceGenerator().getNonceGroup());
+ break;
+ }
+ }
+ }
+ regionActionBuilder.clear();
+ regionActionBuilder.setRegion(
+ RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey()));
+ regionActionBuilder = RequestConverter.buildNoDataRegionAction(entry.getKey(),
+ entry.getValue().actions, cells, regionActionBuilder, actionBuilder, mutationBuilder);
+ multiRequestBuilder.addRegionAction(regionActionBuilder.build());
+ }
+ return multiRequestBuilder.build();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName,
+ RegionResult regionResult, List<Action> failedActions) {
+ Object result = regionResult.result.get(action.getOriginalIndex());
+ if (result == null) {
+ LOG.error("Server " + serverName + " sent us neither result nor exception for row '"
+ + Bytes.toStringBinary(action.getAction().getRow()) + "' of "
+ + regionReq.loc.getRegionInfo().getRegionNameAsString());
+ addError(action, new RuntimeException("Invalid response"), serverName);
+ failedActions.add(action);
+ } else if (result instanceof Throwable) {
+ Throwable error = translateException((Throwable) result);
+ logException(tries, () -> Stream.of(regionReq), error, serverName);
+ if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
+ failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
+ getExtraContextForError(serverName));
+ } else {
+ failedActions.add(action);
+ }
+ } else {
+ action2Future.get(action).complete((T) result);
+ }
+ }
+
+ private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
+ ServerName serverName, MultiResponse resp) {
+ List<Action> failedActions = new ArrayList<>();
+ actionsByRegion.forEach((rn, regionReq) -> {
+ RegionResult regionResult = resp.getResults().get(rn);
+ if (regionResult != null) {
+ regionReq.actions.forEach(
+ action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions));
+ } else {
+ Throwable t = resp.getException(rn);
+ Throwable error;
+ if (t == null) {
+ LOG.error(
+ "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
+ error = new RuntimeException("Invalid response");
+ } else {
+ error = translateException(t);
+ logException(tries, () -> Stream.of(regionReq), error, serverName);
+ conn.getLocator().updateCachedLocation(regionReq.loc, error);
+ if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
+ failAll(regionReq.actions.stream(), tries, error, serverName);
+ return;
+ }
+ addError(regionReq.actions, error, serverName);
+ failedActions.addAll(regionReq.actions);
+ }
+ }
+ });
+ if (!failedActions.isEmpty()) {
+ tryResubmit(failedActions.stream(), tries);
+ }
+ }
+
+ private void send(Map<ServerName, ServerRequest> actionsByServer, int tries) {
+ long remainingNs;
+ if (operationTimeoutNs > 0) {
+ remainingNs = remainingTimeNs();
+ if (remainingNs <= 0) {
+ failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream())
+ .flatMap(r -> r.actions.stream()),
+ tries);
+ return;
+ }
+ } else {
+ remainingNs = Long.MAX_VALUE;
+ }
+ actionsByServer.forEach((sn, serverReq) -> {
+ ClientService.Interface stub;
+ try {
+ stub = conn.getRegionServerStub(sn);
+ } catch (IOException e) {
+ onError(serverReq.actionsByRegion, tries, e, sn);
+ return;
+ }
+ ClientProtos.MultiRequest req;
+ List<CellScannable> cells = new ArrayList<>();
+ try {
+ req = buildReq(serverReq.actionsByRegion, cells);
+ } catch (IOException e) {
+ onError(serverReq.actionsByRegion, tries, e, sn);
+ return;
+ }
+ HBaseRpcController controller = conn.rpcControllerFactory.newController();
+ resetController(controller, Math.min(serverReq.rpcTimeoutNs.get(), remainingNs));
+ if (!cells.isEmpty()) {
+ controller.setCellScanner(createCellScanner(cells));
+ }
+ stub.multi(controller, req, resp -> {
+ if (controller.failed()) {
+ onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn);
+ } else {
+ try {
+ onComplete(serverReq.actionsByRegion, tries, sn,
+ ResponseConverter.getResults(req, resp, controller.cellScanner()));
+ } catch (Exception e) {
+ onError(serverReq.actionsByRegion, tries, e, sn);
+ return;
+ }
+ }
+ });
+ });
+ }
+
+ private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t,
+ ServerName serverName) {
+ Throwable error = translateException(t);
+ logException(tries, () -> actionsByRegion.values().stream(), error, serverName);
+ if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
+ failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,
+ serverName);
+ return;
+ }
+ List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
+ .collect(Collectors.toList());
+ addError(copiedActions, error, serverName);
+ tryResubmit(copiedActions.stream(), tries);
+ }
+
+ private void tryResubmit(Stream<Action> actions, int tries) {
+ long delayNs;
+ if (operationTimeoutNs > 0) {
+ long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
+ if (maxDelayNs <= 0) {
+ failAll(actions, tries);
+ return;
+ }
+ delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
+ } else {
+ delayNs = getPauseTime(pauseNs, tries - 1);
+ }
+ retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
+ }
+
+ private long getRpcTimeoutNs(Action action) {
+ return action.getAction() instanceof Get ? readRpcTimeoutNs : writeRpcTimeoutNs;
+ }
+
+ private void groupAndSend(Stream<Action> actions, int tries) {
+ long locateTimeoutNs;
+ if (operationTimeoutNs > 0) {
+ locateTimeoutNs = remainingTimeNs();
+ if (locateTimeoutNs <= 0) {
+ failAll(actions, tries);
+ return;
+ }
+ } else {
+ locateTimeoutNs = -1L;
+ }
+ ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
+ ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
+ // use the small one as the default timeout value, and increase the timeout value if we have an
+ // action in the group needs a larger timeout value.
+ long defaultRpcTimeoutNs;
+ if (readRpcTimeoutNs > 0) {
+ defaultRpcTimeoutNs =
+ writeRpcTimeoutNs > 0 ? Math.min(readRpcTimeoutNs, writeRpcTimeoutNs) : readRpcTimeoutNs;
+ } else {
+ defaultRpcTimeoutNs = writeRpcTimeoutNs > 0 ? writeRpcTimeoutNs : -1L;
+ }
+ CompletableFuture.allOf(actions
+ .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
+ RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
+ if (error != null) {
+ error = translateException(error);
+ if (error instanceof DoNotRetryIOException) {
+ failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
+ return;
+ }
+ addError(action, error, null);
+ locateFailed.add(action);
+ } else {
+ computeIfAbsent(actionsByServer, loc.getServerName(),
+ () -> new ServerRequest(defaultRpcTimeoutNs)).addAction(loc, action,
+ getRpcTimeoutNs(action));
+ }
+ }))
+ .toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {
+ if (!actionsByServer.isEmpty()) {
+ send(actionsByServer, tries);
+ }
+ if (!locateFailed.isEmpty()) {
+ tryResubmit(locateFailed.stream(), tries);
+ }
+ });
+ }
+
+ public List<CompletableFuture<T>> call() {
+ groupAndSend(actions.stream(), 1);
+ return futures;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java
deleted file mode 100644
index e1208c2..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
-import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
-
-import io.netty.util.HashedWheelTimer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.IdentityHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
-/**
- * Retry caller for multi get.
- * <p>
- * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with
- * other single operations
- * <p>
- * And the {@link #maxAttempts} is a limit for each single get in the batch logically. In the
- * implementation, we will record a {@code tries} parameter for each operation group, and if it is
- * split to several groups when retrying, the sub groups will inherit {@code tries}. You can imagine
- * that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of the depth
- * of the tree.
- */
-@InterfaceAudience.Private
-class AsyncMultiGetRpcRetryingCaller {
-
- private static final Log LOG = LogFactory.getLog(AsyncMultiGetRpcRetryingCaller.class);
-
- private final HashedWheelTimer retryTimer;
-
- private final AsyncConnectionImpl conn;
-
- private final TableName tableName;
-
- private final List<Get> gets;
-
- private final List<CompletableFuture<Result>> futures;
-
- private final IdentityHashMap<Get, CompletableFuture<Result>> get2Future;
-
- private final IdentityHashMap<Get, List<ThrowableWithExtraContext>> get2Errors;
-
- private final long pauseNs;
-
- private final int maxAttempts;
-
- private final long operationTimeoutNs;
-
- private final long rpcTimeoutNs;
-
- private final int startLogErrorsCnt;
-
- private final long startNs;
-
- // we can not use HRegionLocation as the map key because the hashCode and equals method of
- // HRegionLocation only consider serverName.
- private static final class RegionRequest {
-
- public final HRegionLocation loc;
-
- public final ConcurrentLinkedQueue<Get> gets = new ConcurrentLinkedQueue<>();
-
- public RegionRequest(HRegionLocation loc) {
- this.loc = loc;
- }
- }
-
- public AsyncMultiGetRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
- TableName tableName, List<Get> gets, long pauseNs, int maxRetries, long operationTimeoutNs,
- long rpcTimeoutNs, int startLogErrorsCnt) {
- this.retryTimer = retryTimer;
- this.conn = conn;
- this.tableName = tableName;
- this.gets = gets;
- this.pauseNs = pauseNs;
- this.maxAttempts = retries2Attempts(maxRetries);
- this.operationTimeoutNs = operationTimeoutNs;
- this.rpcTimeoutNs = rpcTimeoutNs;
- this.startLogErrorsCnt = startLogErrorsCnt;
-
- this.futures = new ArrayList<>(gets.size());
- this.get2Future = new IdentityHashMap<>(gets.size());
- gets.forEach(
- get -> futures.add(get2Future.computeIfAbsent(get, k -> new CompletableFuture<>())));
- this.get2Errors = new IdentityHashMap<>();
- this.startNs = System.nanoTime();
- }
-
- private long remainingTimeNs() {
- return operationTimeoutNs - (System.nanoTime() - startNs);
- }
-
- private List<ThrowableWithExtraContext> removeErrors(Get get) {
- synchronized (get2Errors) {
- return get2Errors.remove(get);
- }
- }
-
- private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier,
- Throwable error, ServerName serverName) {
- if (tries > startLogErrorsCnt) {
- String regions =
- regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'")
- .collect(Collectors.joining(",", "[", "]"));
- LOG.warn("Get data for " + regions + " in " + tableName + " from " + serverName
- + " failed, tries=" + tries,
- error);
- }
- }
-
- private String getExtras(ServerName serverName) {
- return serverName != null ? serverName.getServerName() : "";
- }
-
- private void addError(Get get, Throwable error, ServerName serverName) {
- List<ThrowableWithExtraContext> errors;
- synchronized (get2Errors) {
- errors = get2Errors.computeIfAbsent(get, k -> new ArrayList<>());
- }
- errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
- serverName != null ? serverName.toString() : ""));
- }
-
- private void addError(Iterable<Get> gets, Throwable error, ServerName serverName) {
- gets.forEach(get -> addError(get, error, serverName));
- }
-
- private void failOne(Get get, int tries, Throwable error, long currentTime, String extras) {
- CompletableFuture<Result> future = get2Future.get(get);
- if (future.isDone()) {
- return;
- }
- ThrowableWithExtraContext errorWithCtx =
- new ThrowableWithExtraContext(error, currentTime, extras);
- List<ThrowableWithExtraContext> errors = removeErrors(get);
- if (errors == null) {
- errors = Collections.singletonList(errorWithCtx);
- } else {
- errors.add(errorWithCtx);
- }
- future.completeExceptionally(new RetriesExhaustedException(tries, errors));
- }
-
- private void failAll(Stream<Get> gets, int tries, Throwable error, ServerName serverName) {
- long currentTime = System.currentTimeMillis();
- String extras = getExtras(serverName);
- gets.forEach(get -> failOne(get, tries, error, currentTime, extras));
- }
-
- private void failAll(Stream<Get> gets, int tries) {
- gets.forEach(get -> {
- CompletableFuture<Result> future = get2Future.get(get);
- if (future.isDone()) {
- return;
- }
- future.completeExceptionally(new RetriesExhaustedException(tries,
- Optional.ofNullable(removeErrors(get)).orElse(Collections.emptyList())));
- });
- }
-
- private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> getsByRegion)
- throws IOException {
- ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
- for (Map.Entry<byte[], RegionRequest> entry : getsByRegion.entrySet()) {
- ClientProtos.RegionAction.Builder regionActionBuilder =
- ClientProtos.RegionAction.newBuilder().setRegion(
- RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey()));
- int index = 0;
- for (Get get : entry.getValue().gets) {
- regionActionBuilder.addAction(
- ClientProtos.Action.newBuilder().setIndex(index).setGet(ProtobufUtil.toGet(get)));
- index++;
- }
- multiRequestBuilder.addRegionAction(regionActionBuilder);
- }
- return multiRequestBuilder.build();
- }
-
- private void onComplete(Map<byte[], RegionRequest> getsByRegion, int tries, ServerName serverName,
- MultiResponse resp) {
- List<Get> failedGets = new ArrayList<>();
- getsByRegion.forEach((rn, regionReq) -> {
- RegionResult regionResult = resp.getResults().get(rn);
- if (regionResult != null) {
- int index = 0;
- for (Get get : regionReq.gets) {
- Object result = regionResult.result.get(index);
- if (result == null) {
- LOG.error("Server sent us neither result nor exception for row '"
- + Bytes.toStringBinary(get.getRow()) + "' of " + Bytes.toStringBinary(rn));
- addError(get, new RuntimeException("Invalid response"), serverName);
- failedGets.add(get);
- } else if (result instanceof Throwable) {
- Throwable error = translateException((Throwable) result);
- logException(tries, () -> Stream.of(regionReq), error, serverName);
- if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
- failOne(get, tries, error, EnvironmentEdgeManager.currentTime(),
- getExtras(serverName));
- } else {
- failedGets.add(get);
- }
- } else {
- get2Future.get(get).complete((Result) result);
- }
- index++;
- }
- } else {
- Throwable t = resp.getException(rn);
- Throwable error;
- if (t == null) {
- LOG.error(
- "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
- error = new RuntimeException("Invalid response");
- } else {
- error = translateException(t);
- logException(tries, () -> Stream.of(regionReq), error, serverName);
- conn.getLocator().updateCachedLocation(regionReq.loc, error);
- if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
- failAll(regionReq.gets.stream(), tries, error, serverName);
- return;
- }
- addError(regionReq.gets, error, serverName);
- failedGets.addAll(regionReq.gets);
- }
- }
- });
- if (!failedGets.isEmpty()) {
- tryResubmit(failedGets.stream(), tries);
- }
- }
-
- private void send(Map<ServerName, ? extends Map<byte[], RegionRequest>> getsByServer, int tries) {
- long callTimeoutNs;
- if (operationTimeoutNs > 0) {
- long remainingNs = remainingTimeNs();
- if (remainingNs <= 0) {
- failAll(getsByServer.values().stream().flatMap(m -> m.values().stream())
- .flatMap(r -> r.gets.stream()),
- tries);
- return;
- }
- callTimeoutNs = Math.min(remainingNs, rpcTimeoutNs);
- } else {
- callTimeoutNs = rpcTimeoutNs;
- }
- getsByServer.forEach((sn, getsByRegion) -> {
- ClientService.Interface stub;
- try {
- stub = conn.getRegionServerStub(sn);
- } catch (IOException e) {
- onError(getsByRegion, tries, e, sn);
- return;
- }
- ClientProtos.MultiRequest req;
- try {
- req = buildReq(getsByRegion);
- } catch (IOException e) {
- onError(getsByRegion, tries, e, sn);
- return;
- }
- HBaseRpcController controller = conn.rpcControllerFactory.newController();
- resetController(controller, callTimeoutNs);
- stub.multi(controller, req, resp -> {
- if (controller.failed()) {
- onError(getsByRegion, tries, controller.getFailed(), sn);
- } else {
- try {
- onComplete(getsByRegion, tries, sn,
- ResponseConverter.getResults(req, resp, controller.cellScanner()));
- } catch (Exception e) {
- onError(getsByRegion, tries, e, sn);
- return;
- }
- }
- });
- });
- }
-
- private void onError(Map<byte[], RegionRequest> getsByRegion, int tries, Throwable t,
- ServerName serverName) {
- Throwable error = translateException(t);
- logException(tries, () -> getsByRegion.values().stream(), error, serverName);
- if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
- failAll(getsByRegion.values().stream().flatMap(r -> r.gets.stream()), tries, error,
- serverName);
- return;
- }
- List<Get> copiedGets =
- getsByRegion.values().stream().flatMap(r -> r.gets.stream()).collect(Collectors.toList());
- addError(copiedGets, error, serverName);
- tryResubmit(copiedGets.stream(), tries);
- }
-
- private void tryResubmit(Stream<Get> gets, int tries) {
- long delayNs;
- if (operationTimeoutNs > 0) {
- long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
- if (maxDelayNs <= 0) {
- failAll(gets, tries);
- return;
- }
- delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
- } else {
- delayNs = getPauseTime(pauseNs, tries - 1);
- }
- retryTimer.newTimeout(t -> groupAndSend(gets, tries + 1), delayNs, TimeUnit.NANOSECONDS);
- }
-
- private void groupAndSend(Stream<Get> gets, int tries) {
- long locateTimeoutNs;
- if (operationTimeoutNs > 0) {
- locateTimeoutNs = remainingTimeNs();
- if (locateTimeoutNs <= 0) {
- failAll(gets, tries);
- return;
- }
- } else {
- locateTimeoutNs = -1L;
- }
- ConcurrentMap<ServerName, ConcurrentMap<byte[], RegionRequest>> getsByServer =
- new ConcurrentHashMap<>();
- ConcurrentLinkedQueue<Get> locateFailed = new ConcurrentLinkedQueue<>();
- CompletableFuture.allOf(gets.map(get -> conn.getLocator()
- .getRegionLocation(tableName, get.getRow(), RegionLocateType.CURRENT, locateTimeoutNs)
- .whenComplete((loc, error) -> {
- if (error != null) {
- error = translateException(error);
- if (error instanceof DoNotRetryIOException) {
- failOne(get, tries, error, EnvironmentEdgeManager.currentTime(), "");
- return;
- }
- addError(get, error, null);
- locateFailed.add(get);
- } else {
- ConcurrentMap<byte[], RegionRequest> getsByRegion = computeIfAbsent(getsByServer,
- loc.getServerName(), () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
- computeIfAbsent(getsByRegion, loc.getRegionInfo().getRegionName(),
- () -> new RegionRequest(loc)).gets.add(get);
- }
- })).toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {
- if (!getsByServer.isEmpty()) {
- send(getsByServer, tries);
- }
- if (!locateFailed.isEmpty()) {
- tryResubmit(locateFailed.stream(), tries);
- }
- });
- }
-
- public List<CompletableFuture<Result>> call() {
- groupAndSend(gets.stream(), 1);
- return futures;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index d240fab..c90bee2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -258,48 +258,55 @@ class AsyncRpcRetryingCallerFactory {
return new ScanSingleRegionCallerBuilder();
}
- public class MultiGetCallerBuilder {
+ public class BatchCallerBuilder {
private TableName tableName;
- private List<Get> gets;
+ private List<? extends Row> actions;
private long operationTimeoutNs = -1L;
- private long rpcTimeoutNs = -1L;
+ private long readRpcTimeoutNs = -1L;
+
+ private long writeRpcTimeoutNs = -1L;
- public MultiGetCallerBuilder table(TableName tableName) {
+ public BatchCallerBuilder table(TableName tableName) {
this.tableName = tableName;
return this;
}
- public MultiGetCallerBuilder gets(List<Get> gets) {
- this.gets = gets;
+ public BatchCallerBuilder actions(List<? extends Row> actions) {
+ this.actions = actions;
return this;
}
- public MultiGetCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) {
+ public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) {
this.operationTimeoutNs = unit.toNanos(operationTimeout);
return this;
}
- public MultiGetCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
- this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
+ public BatchCallerBuilder readRpcTimeout(long rpcTimeout, TimeUnit unit) {
+ this.readRpcTimeoutNs = unit.toNanos(rpcTimeout);
+ return this;
+ }
+
+ public BatchCallerBuilder writeRpcTimeout(long rpcTimeout, TimeUnit unit) {
+ this.writeRpcTimeoutNs = unit.toNanos(rpcTimeout);
return this;
}
- public AsyncMultiGetRpcRetryingCaller build() {
- return new AsyncMultiGetRpcRetryingCaller(retryTimer, conn, tableName, gets,
+ public <T> AsyncBatchRpcRetryingCaller<T> build() {
+ return new AsyncBatchRpcRetryingCaller<T>(retryTimer, conn, tableName, actions,
conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs,
- rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
+ readRpcTimeoutNs, writeRpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
}
- public List<CompletableFuture<Result>> call() {
- return build().call();
+ public <T> List<CompletableFuture<T>> call() {
+ return this.<T> build().call();
}
}
- public MultiGetCallerBuilder multiGet() {
- return new MultiGetCallerBuilder();
+ public BatchCallerBuilder batch() {
+ return new BatchCallerBuilder();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 81c806f..5bf6195 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -161,7 +161,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
if (closeScanner) {
closeScanner();
}
- future.completeExceptionally(new RetriesExhaustedException(tries, exceptions));
+ future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 0b4add1..04e69af 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -120,7 +120,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
}
private void completeExceptionally() {
- future.completeExceptionally(new RetriesExhaustedException(tries, exceptions));
+ future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
}
private void onError(Throwable error, Supplier<String> errMsg,
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
index a2b5247..19a22c0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
@@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hbase.client;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.voidBatch;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.voidBatchAll;
+
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
@@ -30,7 +34,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
* The base interface for asynchronous version of Table. Obtain an instance from a
@@ -126,11 +129,7 @@ public interface AsyncTableBase {
* be wrapped by a {@link CompletableFuture}.
*/
default CompletableFuture<Boolean> exists(Get get) {
- if (!get.isCheckExistenceOnly()) {
- get = ReflectionUtils.newInstance(get.getClass(), get);
- get.setCheckExistenceOnly(true);
- }
- return get(get).thenApply(r -> r.getExists());
+ return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists());
}
/**
@@ -362,7 +361,9 @@ public interface AsyncTableBase {
* @param gets The objects that specify what data to fetch and from which rows.
* @return A list of {@link CompletableFuture}s that represent the result for each get.
*/
- List<CompletableFuture<Result>> get(List<Get> gets);
+ default List<CompletableFuture<Result>> get(List<Get> gets) {
+ return batch(gets);
+ }
/**
* A simple version for batch get. It will fail if there are any failures and you will get the
@@ -371,8 +372,90 @@ public interface AsyncTableBase {
* @return A {@link CompletableFuture} that wrapper the result list.
*/
default CompletableFuture<List<Result>> getAll(List<Get> gets) {
- List<CompletableFuture<Result>> futures = get(gets);
+ return batchAll(gets);
+ }
+
+ /**
+ * Test for the existence of columns in the table, as specified by the Gets.
+ * <p>
+ * This will return a list of booleans. Each value will be true if the related Get matches one or
+ * more keys, false if not.
+ * <p>
+ * This is a server-side call so it prevents any data from being transferred to the client.
+ * @param gets the Gets
+ * @return A list of {@link CompletableFuture}s that represent the existence for each get.
+ */
+ default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
+ return get(toCheckExistenceOnly(gets)).stream().map(f -> f.thenApply(r -> r.getExists()))
+ .collect(toList());
+ }
+
+ /**
+ * A simple version for batch exists. It will fail if there are any failures and you will get the
+ * whole result boolean list at once if the operation is succeeded.
+ * @param gets the Gets
+ * @return A {@link CompletableFuture} that wrapper the result boolean list.
+ */
+ default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) {
+ return getAll(toCheckExistenceOnly(gets))
+ .thenApply(l -> l.stream().map(r -> r.getExists()).collect(toList()));
+ }
+
+ /**
+ * Puts some data in the table, in batch.
+ * @param puts The list of mutations to apply.
+ * @return A list of {@link CompletableFuture}s that represent the result for each put.
+ */
+ default List<CompletableFuture<Void>> put(List<Put> puts) {
+ return voidBatch(this, puts);
+ }
+
+ /**
+ * A simple version of batch put. It will fail if there are any failures.
+ * @param puts The list of mutations to apply.
+ * @return A {@link CompletableFuture} that always returns null when complete normally.
+ */
+ default CompletableFuture<Void> putAll(List<Put> puts) {
+ return voidBatchAll(this, puts);
+ }
+
+ /**
+ * Deletes the specified cells/rows in bulk.
+ * @param deletes list of things to delete.
+ * @return A list of {@link CompletableFuture}s that represent the result for each delete.
+ */
+ default List<CompletableFuture<Void>> delete(List<Delete> deletes) {
+ return voidBatch(this, deletes);
+ }
+
+ /**
+ * A simple version of batch delete. It will fail if there are any failures.
+ * @param deletes list of things to delete.
+ * @return A {@link CompletableFuture} that always returns null when complete normally.
+ */
+ default CompletableFuture<Void> deleteAll(List<Delete> deletes) {
+ return voidBatchAll(this, deletes);
+ }
+
+ /**
+ * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering of
+ * execution of the actions is not defined. Meaning if you do a Put and a Get in the same
+ * {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put
+ * had put.
+ * @param actions list of Get, Put, Delete, Increment, Append objects
+ * @return A list of {@link CompletableFuture}s that represent the result for each action.
+ */
+ <T> List<CompletableFuture<T>> batch(List<? extends Row> actions);
+
+ /**
+ * A simple version of batch. It will fail if there are any failures and you will get the whole
+ * result list at once if the operation is succeeded.
+ * @param actions list of Get, Put, Delete, Increment, Append objects
+ * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
+ */
+ default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
+ List<CompletableFuture<T>> futures = batch(actions);
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
- .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(Collectors.toList()));
+ .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 6cc2551..7281185 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -194,7 +194,7 @@ class AsyncTableImpl implements AsyncTable {
}
@Override
- public List<CompletableFuture<Result>> get(List<Get> gets) {
- return rawTable.get(gets).stream().map(this::wrap).collect(Collectors.toList());
+ public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
+ return rawTable.<T> batch(actions).stream().map(this::wrap).collect(Collectors.toList());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index cc27992..4355182 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
@@ -28,6 +29,8 @@ import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -49,6 +52,7 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.ipc.RemoteException;
/**
@@ -59,11 +63,11 @@ public final class ConnectionUtils {
private static final Log LOG = LogFactory.getLog(ConnectionUtils.class);
- private ConnectionUtils() {}
+ private ConnectionUtils() {
+ }
/**
- * Calculate pause time.
- * Built on {@link HConstants#RETRY_BACKOFF}.
+ * Calculate pause time. Built on {@link HConstants#RETRY_BACKOFF}.
* @param pause time to pause
* @param tries amount of tries
* @return How long to wait after <code>tries</code> retries
@@ -83,7 +87,6 @@ public final class ConnectionUtils {
return normalPause + jitter;
}
-
/**
* Adds / subs an up to 50% jitter to a pause time. Minimum is 1.
* @param pause the expected pause.
@@ -103,24 +106,23 @@ public final class ConnectionUtils {
* @param cnm Replaces the nonce generator used, for testing.
* @return old nonce generator.
*/
- public static NonceGenerator injectNonceGeneratorForTesting(
- ClusterConnection conn, NonceGenerator cnm) {
+ public static NonceGenerator injectNonceGeneratorForTesting(ClusterConnection conn,
+ NonceGenerator cnm) {
return ConnectionImplementation.injectNonceGeneratorForTesting(conn, cnm);
}
/**
- * Changes the configuration to set the number of retries needed when using Connection
- * internally, e.g. for updating catalog tables, etc.
- * Call this method before we create any Connections.
+ * Changes the configuration to set the number of retries needed when using Connection internally,
+ * e.g. for updating catalog tables, etc. Call this method before we create any Connections.
* @param c The Configuration instance to set the retries into.
* @param log Used to log what we set in here.
*/
- public static void setServerSideHConnectionRetriesConfig(
- final Configuration c, final String sn, final Log log) {
+ public static void setServerSideHConnectionRetriesConfig(final Configuration c, final String sn,
+ final Log log) {
// TODO: Fix this. Not all connections from server side should have 10 times the retries.
int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- // Go big. Multiply by 10. If we can't get to meta after this many retries
+ // Go big. Multiply by 10. If we can't get to meta after this many retries
// then something seriously wrong.
int serversideMultiplier = c.getInt("hbase.client.serverside.retries.multiplier", 10);
int retries = hcRetries * serversideMultiplier;
@@ -141,9 +143,9 @@ public final class ConnectionUtils {
* @throws IOException if IO failure occurred
*/
public static ClusterConnection createShortCircuitConnection(final Configuration conf,
- ExecutorService pool, User user, final ServerName serverName,
- final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client)
- throws IOException {
+ ExecutorService pool, User user, final ServerName serverName,
+ final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client)
+ throws IOException {
if (user == null) {
user = UserProvider.instantiate(conf).getCurrent();
}
@@ -166,8 +168,7 @@ public final class ConnectionUtils {
*/
@VisibleForTesting
public static void setupMasterlessConnection(Configuration conf) {
- conf.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
- MasterlessConnection.class.getName());
+ conf.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, MasterlessConnection.class.getName());
}
/**
@@ -175,8 +176,7 @@ public final class ConnectionUtils {
* region re-lookups.
*/
static class MasterlessConnection extends ConnectionImplementation {
- MasterlessConnection(Configuration conf,
- ExecutorService pool, User user) throws IOException {
+ MasterlessConnection(Configuration conf, ExecutorService pool, User user) throws IOException {
super(conf, pool, user);
}
@@ -197,8 +197,7 @@ public final class ConnectionUtils {
/**
* Get a unique key for the rpc stub to the given server.
*/
- static String getStubKey(String serviceName, ServerName serverName,
- boolean hostnameCanChange) {
+ static String getStubKey(String serviceName, ServerName serverName, boolean hostnameCanChange) {
// Sometimes, servers go down and they come back up with the same hostname but a different
// IP address. Force a resolution of the rsHostname by trying to instantiate an
// InetSocketAddress, and this way we will rightfully get a new stubKey.
@@ -327,4 +326,25 @@ public final class ConnectionUtils {
// Add a delta to avoid timeout immediately after a retry sleeping.
static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
+
+ static Get toCheckExistenceOnly(Get get) {
+ if (get.isCheckExistenceOnly()) {
+ return get;
+ }
+ return ReflectionUtils.newInstance(get.getClass(), get).setCheckExistenceOnly(true);
+ }
+
+ static List<Get> toCheckExistenceOnly(List<Get> gets) {
+ return gets.stream().map(ConnectionUtils::toCheckExistenceOnly).collect(toList());
+ }
+
+ static List<CompletableFuture<Void>> voidBatch(AsyncTableBase table,
+ List<? extends Row> actions) {
+ return table.<Object> batch(actions).stream().map(f -> f.<Void> thenApply(r -> null))
+ .collect(toList());
+ }
+
+ static CompletableFuture<Void> voidBatchAll(AsyncTableBase table, List<? extends Row> actions) {
+ return table.<Object> batchAll(actions).thenApply(r -> null);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 6fad0da..347c85b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -407,9 +407,10 @@ class RawAsyncTableImpl implements RawAsyncTable {
}
@Override
- public List<CompletableFuture<Result>> get(List<Get> gets) {
- return conn.callerFactory.multiGet().table(tableName).gets(gets)
+ public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
+ return conn.callerFactory.batch().table(tableName).actions(actions)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
+ .readRpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .writeRpcTimeout(writeRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 446cd89..424d578 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -52,9 +52,8 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
@@ -670,7 +669,7 @@ public final class RequestConverter {
* @throws IOException
*/
public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
- final List<Action> actions, final List<CellScannable> cells,
+ final Iterable<Action> actions, final List<CellScannable> cells,
final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder) throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
index 3028111..5614d8e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
@@ -22,10 +22,8 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -62,12 +60,10 @@ public abstract class AbstractTestAsyncTableScan {
TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
- RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME);
- List<CompletableFuture<?>> futures = new ArrayList<>();
- IntStream.range(0, COUNT).forEach(
- i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
- .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))));
- CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
+ ASYNC_CONN.getRawTable(TABLE_NAME).putAll(IntStream.range(0, COUNT)
+ .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
+ .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))
+ .collect(Collectors.toList())).get();
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
deleted file mode 100644
index d24501d..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
-import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
-import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
-import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.IntStream;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Threads;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-/**
- * Will split the table, and move region randomly when testing.
- */
-@Category({ LargeTests.class, ClientTests.class })
-public class TestAsyncGetMultiThread {
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
- private static TableName TABLE_NAME = TableName.valueOf("async");
-
- private static byte[] FAMILY = Bytes.toBytes("cf");
-
- private static byte[] QUALIFIER = Bytes.toBytes("cq");
-
- private static int COUNT = 1000;
-
- private static AsyncConnection CONN;
-
- private static byte[][] SPLIT_KEYS;
-
- @BeforeClass
- public static void setUp() throws Exception {
- TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
- TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
- TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L);
- TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000);
- TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
- TEST_UTIL.startMiniCluster(5);
- SPLIT_KEYS = new byte[8][];
- for (int i = 111; i < 999; i += 111) {
- SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
- }
- TEST_UTIL.createTable(TABLE_NAME, FAMILY);
- TEST_UTIL.waitTableAvailable(TABLE_NAME);
- CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
- RawAsyncTable table = CONN.getRawTable(TABLE_NAME);
- List<CompletableFuture<?>> futures = new ArrayList<>();
- IntStream.range(0, COUNT)
- .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
- .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))));
- CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- IOUtils.closeQuietly(CONN);
- TEST_UTIL.shutdownMiniCluster();
- }
-
- private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException {
- while (!stop.get()) {
- int i = ThreadLocalRandom.current().nextInt(COUNT);
- assertEquals(i,
- Bytes.toInt(
- CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get()
- .getValue(FAMILY, QUALIFIER)));
- }
- }
-
- @Test
- public void test() throws IOException, InterruptedException, ExecutionException {
- int numThreads = 20;
- AtomicBoolean stop = new AtomicBoolean(false);
- ExecutorService executor =
- Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-"));
- List<Future<?>> futures = new ArrayList<>();
- IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {
- run(stop);
- return null;
- })));
- Collections.shuffle(Arrays.asList(SPLIT_KEYS), new Random(123));
- Admin admin = TEST_UTIL.getAdmin();
- for (byte[] splitPoint : SPLIT_KEYS) {
- admin.split(TABLE_NAME, splitPoint);
- for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) {
- region.compact(true);
- }
- Thread.sleep(5000);
- admin.balancer(true);
- Thread.sleep(5000);
- ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
- ServerName newMetaServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
- .map(t -> t.getRegionServer().getServerName()).filter(s -> !s.equals(metaServer))
- .findAny().get();
- admin.move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
- Bytes.toBytes(newMetaServer.getServerName()));
- Thread.sleep(5000);
- }
- stop.set(true);
- executor.shutdown();
- for (Future<?> future : futures) {
- future.get();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
new file mode 100644
index 0000000..308b9e5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ LargeTests.class, ClientTests.class })
+public class TestAsyncTableBatch {
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("async");
+
+ private static byte[] FAMILY = Bytes.toBytes("cf");
+
+ private static byte[] CQ = Bytes.toBytes("cq");
+
+ private static int COUNT = 1000;
+
+ private static AsyncConnection CONN;
+
+ private static byte[][] SPLIT_KEYS;
+
+ @Parameter(0)
+ public String tableType;
+
+ @Parameter(1)
+ public Function<TableName, AsyncTableBase> tableGetter;
+
+ private static RawAsyncTable getRawTable(TableName tableName) {
+ return CONN.getRawTable(tableName);
+ }
+
+ private static AsyncTable getTable(TableName tableName) {
+ return CONN.getTable(tableName, ForkJoinPool.commonPool());
+ }
+
+ @Parameters(name = "{index}: type={0}")
+ public static List<Object[]> params() {
+ Function<TableName, AsyncTableBase> rawTableGetter = TestAsyncTableBatch::getRawTable;
+ Function<TableName, AsyncTableBase> tableGetter = TestAsyncTableBatch::getTable;
+ return Arrays.asList(new Object[] { "raw", rawTableGetter },
+ new Object[] { "normal", tableGetter });
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL.startMiniCluster(3);
+ SPLIT_KEYS = new byte[8][];
+ for (int i = 111; i < 999; i += 111) {
+ SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+ }
+ CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ CONN.close();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUpBeforeTest() throws IOException, InterruptedException {
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ }
+
+ @After
+ public void tearDownAfterTest() throws IOException {
+ Admin admin = TEST_UTIL.getAdmin();
+ if (admin.isTableEnabled(TABLE_NAME)) {
+ admin.disableTable(TABLE_NAME);
+ }
+ admin.deleteTable(TABLE_NAME);
+ }
+
+ private byte[] getRow(int i) {
+ return Bytes.toBytes(String.format("%03d", i));
+ }
+
+ @Test
+ public void test() throws InterruptedException, ExecutionException, IOException {
+ AsyncTableBase table = tableGetter.apply(TABLE_NAME);
+ table.putAll(IntStream.range(0, COUNT)
+ .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
+ .collect(Collectors.toList())).get();
+ List<Result> results =
+ table
+ .getAll(IntStream.range(0, COUNT)
+ .mapToObj(
+ i -> Arrays.asList(new Get(getRow(i)), new Get(Arrays.copyOf(getRow(i), 4))))
+ .flatMap(l -> l.stream()).collect(Collectors.toList()))
+ .get();
+ assertEquals(2 * COUNT, results.size());
+ for (int i = 0; i < COUNT; i++) {
+ assertEquals(i, Bytes.toInt(results.get(2 * i).getValue(FAMILY, CQ)));
+ assertTrue(results.get(2 * i + 1).isEmpty());
+ }
+ Admin admin = TEST_UTIL.getAdmin();
+ admin.flush(TABLE_NAME);
+ TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).forEach(r -> {
+ byte[] startKey = r.getRegionInfo().getStartKey();
+ int number = startKey.length == 0 ? 55 : Integer.parseInt(Bytes.toString(startKey));
+ byte[] splitPoint = Bytes.toBytes(String.format("%03d", number + 55));
+ try {
+ admin.splitRegion(r.getRegionInfo().getRegionName(), splitPoint);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ // we are not going to test the function of split so no assertion here. Just wait for a while
+ // and then start our work.
+ Thread.sleep(5000);
+ table.deleteAll(
+ IntStream.range(0, COUNT).mapToObj(i -> new Delete(getRow(i))).collect(Collectors.toList()))
+ .get();
+ results = table
+ .getAll(
+ IntStream.range(0, COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
+ .get();
+ assertEquals(COUNT, results.size());
+ results.forEach(r -> assertTrue(r.isEmpty()));
+ }
+
+ @Test
+ public void testMixed() throws InterruptedException, ExecutionException {
+ AsyncTableBase table = tableGetter.apply(TABLE_NAME);
+ table.putAll(IntStream.range(0, 5)
+ .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i)))
+ .collect(Collectors.toList())).get();
+ List<Row> actions = new ArrayList<>();
+ actions.add(new Get(Bytes.toBytes(0)));
+ actions.add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, CQ, Bytes.toBytes((long) 2)));
+ actions.add(new Delete(Bytes.toBytes(2)));
+ actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1));
+ actions.add(new Append(Bytes.toBytes(4)).add(FAMILY, CQ, Bytes.toBytes(4)));
+ List<Object> results = table.batchAll(actions).get();
+ assertEquals(5, results.size());
+ Result getResult = (Result) results.get(0);
+ assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
+ assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ)));
+ assertTrue(table.get(new Get(Bytes.toBytes(2))).get().isEmpty());
+ Result incrementResult = (Result) results.get(3);
+ assertEquals(4, Bytes.toLong(incrementResult.getValue(FAMILY, CQ)));
+ Result appendResult = (Result) results.get(4);
+ byte[] appendValue = appendResult.getValue(FAMILY, CQ);
+ assertEquals(12, appendValue.length);
+ assertEquals(4, Bytes.toLong(appendValue));
+ assertEquals(4, Bytes.toInt(appendValue, 8));
+ }
+
+ public static final class ErrorInjectObserver extends BaseRegionObserver {
+
+ @Override
+ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
+ List<Cell> results) throws IOException {
+ if (e.getEnvironment().getRegionInfo().getEndKey().length == 0) {
+ throw new DoNotRetryRegionException("Inject Error");
+ }
+ }
+ }
+
+ @Test
+ public void testPartialSuccess() throws IOException, InterruptedException, ExecutionException {
+ Admin admin = TEST_UTIL.getAdmin();
+ HTableDescriptor htd = admin.getTableDescriptor(TABLE_NAME);
+ htd.addCoprocessor(ErrorInjectObserver.class.getName());
+ admin.modifyTable(TABLE_NAME, htd);
+ AsyncTableBase table = tableGetter.apply(TABLE_NAME);
+ table.putAll(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Put(k).addColumn(FAMILY, CQ, k))
+ .collect(Collectors.toList())).get();
+ List<CompletableFuture<Result>> futures = table
+ .get(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Get(k)).collect(Collectors.toList()));
+ for (int i = 0; i < SPLIT_KEYS.length - 1; i++) {
+ assertArrayEquals(SPLIT_KEYS[i], futures.get(i).get().getValue(FAMILY, CQ));
+ }
+ try {
+ futures.get(SPLIT_KEYS.length - 1).get();
+ } catch (ExecutionException e) {
+ assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
new file mode 100644
index 0000000..da8141b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
+import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
+import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Will split the table, and move region randomly when testing.
+ */
+@Category({ LargeTests.class, ClientTests.class })
+public class TestAsyncTableGetMultiThreaded {
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("async");
+
+ private static byte[] FAMILY = Bytes.toBytes("cf");
+
+ private static byte[] QUALIFIER = Bytes.toBytes("cq");
+
+ private static int COUNT = 1000;
+
+ private static AsyncConnection CONN;
+
+ private static byte[][] SPLIT_KEYS;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
+ TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
+ TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L);
+ TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000);
+ TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
+ TEST_UTIL.startMiniCluster(5);
+ SPLIT_KEYS = new byte[8][];
+ for (int i = 111; i < 999; i += 111) {
+ SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+ }
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+ CONN.getRawTable(TABLE_NAME)
+ .putAll(
+ IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
+ .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList()))
+ .get();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ IOUtils.closeQuietly(CONN);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException {
+ while (!stop.get()) {
+ int i = ThreadLocalRandom.current().nextInt(COUNT);
+ assertEquals(i,
+ Bytes.toInt(
+ CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get()
+ .getValue(FAMILY, QUALIFIER)));
+ }
+ }
+
+ @Test
+ public void test() throws IOException, InterruptedException, ExecutionException {
+ int numThreads = 20;
+ AtomicBoolean stop = new AtomicBoolean(false);
+ ExecutorService executor =
+ Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-"));
+ List<Future<?>> futures = new ArrayList<>();
+ IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {
+ run(stop);
+ return null;
+ })));
+ Collections.shuffle(Arrays.asList(SPLIT_KEYS), new Random(123));
+ Admin admin = TEST_UTIL.getAdmin();
+ for (byte[] splitPoint : SPLIT_KEYS) {
+ admin.split(TABLE_NAME, splitPoint);
+ for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) {
+ region.compact(true);
+ }
+ Thread.sleep(5000);
+ admin.balancer(true);
+ Thread.sleep(5000);
+ ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
+ ServerName newMetaServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
+ .map(t -> t.getRegionServer().getServerName()).filter(s -> !s.equals(metaServer))
+ .findAny().get();
+ admin.move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
+ Bytes.toBytes(newMetaServer.getServerName()));
+ Thread.sleep(5000);
+ }
+ stop.set(true);
+ executor.shutdown();
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ }
+}