You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/07/04 02:14:06 UTC
[3/5] hbase git commit: HBASE-18283 Provide a construct method which
accept a thread pool for AsyncAdmin
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
new file mode 100644
index 0000000..fcfdf93
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -0,0 +1,2278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+
+import java.util.stream.Stream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ProcedureInfo;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
+import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
+import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.TableCFs;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.quotas.QuotaFilter;
+import org.apache.hadoop.hbase.quotas.QuotaSettings;
+import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
+import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
+import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * The implementation of AsyncAdmin.
+ */
+@InterfaceAudience.Private
+public class RawAsyncHBaseAdmin implements AsyncAdmin {
+ public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
+
+ private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class);
+
+ private final AsyncConnectionImpl connection;
+
+ private final RawAsyncTable metaTable;
+
+ private final long rpcTimeoutNs;
+
+ private final long operationTimeoutNs;
+
+ private final long pauseNs;
+
+ private final int maxAttempts;
+
+ private final int startLogErrorsCnt;
+
+ private final NonceGenerator ng;
+
+ RawAsyncHBaseAdmin(AsyncConnectionImpl connection) {
+ this.connection = connection;
+ this.metaTable = connection.getRawTable(META_TABLE_NAME);
+ this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs();
+ this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs();
+ this.pauseNs = connection.connConf.getPauseNs();
+ this.maxAttempts = connection.connConf.getMaxRetries();
+ this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt();
+ this.ng = connection.getNonceGenerator();
+ }
+
+ private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
+ return this.connection.callerFactory.<T> masterRequest()
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+ .startLogErrorsCnt(startLogErrorsCnt);
+ }
+
+ private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
+ return this.connection.callerFactory.<T> adminRequest()
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+ .startLogErrorsCnt(startLogErrorsCnt);
+ }
+
+ @FunctionalInterface
+ private interface MasterRpcCall<RESP, REQ> {
+ void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
+ RpcCallback<RESP> done);
+ }
+
+ @FunctionalInterface
+ private interface AdminRpcCall<RESP, REQ> {
+ void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
+ RpcCallback<RESP> done);
+ }
+
+ @FunctionalInterface
+ private interface Converter<D, S> {
+ D convert(S src) throws IOException;
+ }
+
+ private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
+ MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
+ Converter<RESP, PRESP> respConverter) {
+ CompletableFuture<RESP> future = new CompletableFuture<>();
+ rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
+
+ @Override
+ public void run(PRESP resp) {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ try {
+ future.complete(respConverter.convert(resp));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ }
+ }
+ });
+ return future;
+ }
+
+ private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
+ AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
+ Converter<RESP, PRESP> respConverter) {
+
+ CompletableFuture<RESP> future = new CompletableFuture<>();
+ rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
+
+ @Override
+ public void run(PRESP resp) {
+ if (controller.failed()) {
+ future.completeExceptionally(new IOException(controller.errorText()));
+ } else {
+ try {
+ future.complete(respConverter.convert(resp));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ }
+ }
+ });
+ return future;
+ }
+
+ private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
+ MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
+ ProcedureBiConsumer consumer) {
+ CompletableFuture<Long> procFuture = this
+ .<Long> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall,
+ respConverter)).call();
+ return waitProcedureResult(procFuture).whenComplete(consumer);
+ }
+
+ @FunctionalInterface
+ private interface TableOperator {
+ CompletableFuture<Void> operate(TableName table);
+ }
+
+ private CompletableFuture<List<TableDescriptor>> batchTableOperations(Pattern pattern,
+ TableOperator operator, String operationType) {
+ CompletableFuture<List<TableDescriptor>> future = new CompletableFuture<>();
+ List<TableDescriptor> failed = new LinkedList<>();
+ listTables(Optional.ofNullable(pattern), false).whenComplete(
+ (tables, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ CompletableFuture[] futures =
+ tables.stream()
+ .map((table) -> operator.operate(table.getTableName()).whenComplete((v, ex) -> {
+ if (ex != null) {
+ LOG.info("Failed to " + operationType + " table " + table.getTableName(), ex);
+ failed.add(table);
+ }
+ })).<CompletableFuture> toArray(size -> new CompletableFuture[size]);
+ CompletableFuture.allOf(futures).thenAccept((v) -> {
+ future.complete(failed);
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> tableExists(TableName tableName) {
+ return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
+ }
+
+ @Override
+ public CompletableFuture<List<TableDescriptor>> listTables(Optional<Pattern> pattern,
+ boolean includeSysTables) {
+ return this.<List<TableDescriptor>> newMasterCaller()
+ .action((controller, stub) -> this
+ .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
+ controller, stub,
+ RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables),
+ (s, c, req, done) -> s.getTableDescriptors(c, req, done),
+ (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<List<TableName>> listTableNames(Optional<Pattern> pattern,
+ boolean includeSysTables) {
+ return this.<List<TableName>> newMasterCaller()
+ .action((controller, stub) -> this
+ .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller, stub,
+ RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables),
+ (s, c, req, done) -> s.getTableNames(c, req, done),
+ (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<TableDescriptor> getTableDescriptor(TableName tableName) {
+ CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
+ this.<List<TableSchema>> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
+ controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), (s,
+ c, req, done) -> s.getTableDescriptors(c, req, done), (resp) -> resp
+ .getTableSchemaList())).call().whenComplete((tableSchemas, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (!tableSchemas.isEmpty()) {
+ future.complete(ProtobufUtil.convertToTableDesc(tableSchemas.get(0)));
+ } else {
+ future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
+ int numRegions) {
+ try {
+ return createTable(desc, Optional.of(getSplitKeys(startKey, endKey, numRegions)));
+ } catch (IllegalArgumentException e) {
+ return failedFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> createTable(TableDescriptor desc, Optional<byte[][]> splitKeys) {
+ if (desc.getTableName() == null) {
+ return failedFuture(new IllegalArgumentException("TableName cannot be null"));
+ }
+ try {
+ splitKeys.ifPresent(keys -> verifySplitKeys(keys));
+ return this.<CreateTableRequest, CreateTableResponse> procedureCall(RequestConverter
+ .buildCreateTableRequest(desc, splitKeys, ng.getNonceGroup(), ng.newNonce()), (s, c, req,
+ done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
+ new CreateTableProcedureBiConsumer(this, desc.getTableName()));
+ } catch (IllegalArgumentException e) {
+ return failedFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteTable(TableName tableName) {
+ return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
+ .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
+ (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
+ new DeleteTableProcedureBiConsumer(this, tableName));
+ }
+
+ @Override
+ public CompletableFuture<List<TableDescriptor>> deleteTables(Pattern pattern) {
+ return batchTableOperations(pattern, (table) -> deleteTable(table), "DELETE");
+ }
+
+ @Override
+ public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
+ return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(
+ RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
+ ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
+ (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(this, tableName));
+ }
+
+ @Override
+ public CompletableFuture<Void> enableTable(TableName tableName) {
+ return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter
+ .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
+ (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
+ new EnableTableProcedureBiConsumer(this, tableName));
+ }
+
+ @Override
+ public CompletableFuture<List<TableDescriptor>> enableTables(Pattern pattern) {
+ return batchTableOperations(pattern, (table) -> enableTable(table), "ENABLE");
+ }
+
+ @Override
+ public CompletableFuture<Void> disableTable(TableName tableName) {
+ return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter
+ .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
+ (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
+ new DisableTableProcedureBiConsumer(this, tableName));
+ }
+
+ @Override
+ public CompletableFuture<List<TableDescriptor>> disableTables(Pattern pattern) {
+ return batchTableOperations(pattern, (table) -> disableTable(table), "DISABLE");
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (state.isPresent()) {
+ future.complete(state.get().inStates(TableState.State.ENABLED));
+ } else {
+ future.completeExceptionally(new TableNotFoundException(tableName));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (state.isPresent()) {
+ future.complete(state.get().inStates(TableState.State.DISABLED));
+ } else {
+ future.completeExceptionally(new TableNotFoundException(tableName));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
+ return isTableAvailable(tableName, null);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ isTableEnabled(tableName).whenComplete(
+ (enabled, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (!enabled) {
+ future.complete(false);
+ } else {
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
+ .whenComplete(
+ (locations, error1) -> {
+ if (error1 != null) {
+ future.completeExceptionally(error1);
+ return;
+ }
+ int notDeployed = 0;
+ int regionCount = 0;
+ for (HRegionLocation location : locations) {
+ HRegionInfo info = location.getRegionInfo();
+ if (location.getServerName() == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Table " + tableName + " has not deployed region "
+ + info.getEncodedName());
+ }
+ notDeployed++;
+ } else if (splitKeys != null
+ && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
+ for (byte[] splitKey : splitKeys) {
+ // Just check if the splitkey is available
+ if (Bytes.equals(info.getStartKey(), splitKey)) {
+ regionCount++;
+ break;
+ }
+ }
+ } else {
+ // Always empty start row should be counted
+ regionCount++;
+ }
+ }
+ if (notDeployed > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Table " + tableName + " has " + notDeployed + " regions");
+ }
+ future.complete(false);
+ } else if (splitKeys != null && regionCount != splitKeys.length + 1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Table " + tableName + " expected to have "
+ + (splitKeys.length + 1) + " regions, but only " + regionCount
+ + " available");
+ }
+ future.complete(false);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Table " + tableName + " should be available");
+ }
+ future.complete(true);
+ }
+ });
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Pair<Integer, Integer>> getAlterStatus(TableName tableName) {
+ return this
+ .<Pair<Integer, Integer>>newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<GetSchemaAlterStatusRequest, GetSchemaAlterStatusResponse, Pair<Integer, Integer>> call(
+ controller, stub, RequestConverter.buildGetSchemaAlterStatusRequest(tableName), (s,
+ c, req, done) -> s.getSchemaAlterStatus(c, req, done), (resp) -> new Pair<>(
+ resp.getYetToUpdateRegions(), resp.getTotalRegions()))).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
+ return this.<AddColumnRequest, AddColumnResponse> procedureCall(
+ RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
+ ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
+ new AddColumnFamilyProcedureBiConsumer(this, tableName));
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
+ return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(
+ RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
+ ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
+ (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(this, tableName));
+ }
+
+ @Override
+ public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
+ ColumnFamilyDescriptor columnFamily) {
+ return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(
+ RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
+ ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
+ (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(this, tableName));
+ }
+
+ @Override
+ public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
+ return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
+ RequestConverter.buildCreateNamespaceRequest(descriptor),
+ (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
+ new CreateNamespaceProcedureBiConsumer(this, descriptor.getName()));
+ }
+
+ @Override
+ public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
+ return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
+ RequestConverter.buildModifyNamespaceRequest(descriptor),
+ (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
+ new ModifyNamespaceProcedureBiConsumer(this, descriptor.getName()));
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteNamespace(String name) {
+ return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
+ RequestConverter.buildDeleteNamespaceRequest(name),
+ (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
+ new DeleteNamespaceProcedureBiConsumer(this, name));
+ }
+
+ @Override
+ public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
+ return this
+ .<NamespaceDescriptor> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
+ controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
+ req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
+ .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
+ }
+
+ @Override
+ public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
+ return this
+ .<List<NamespaceDescriptor>> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
+ controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
+ done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
+ .toNamespaceDescriptorList(resp))).call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> setBalancerOn(final boolean on) {
+ return this
+ .<Boolean> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller,
+ stub, RequestConverter.buildSetBalancerRunningRequest(on, true),
+ (s, c, req, done) -> s.setBalancerRunning(c, req, done),
+ (resp) -> resp.getPrevBalanceValue())).call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> balance(boolean forcible) {
+ return this
+ .<Boolean> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
+ stub, RequestConverter.buildBalanceRequest(forcible),
+ (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isBalancerOn() {
+ return this
+ .<Boolean> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
+ controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
+ (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> closeRegion(byte[] regionName, Optional<ServerName> serverName) {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ getRegionLocation(regionName).whenComplete((location, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ ServerName server = serverName.isPresent() ? serverName.get() : location.getServerName();
+ if (server == null) {
+ future.completeExceptionally(new NotServingRegionException(regionName));
+ } else {
+ closeRegion(location.getRegionInfo(), server).whenComplete((result, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(result);
+ }
+ });
+ }
+ });
+ return future;
+ }
+
+ private CompletableFuture<Boolean> closeRegion(HRegionInfo hri, ServerName serverName) {
+ return this
+ .<Boolean> newAdminCaller()
+ .action(
+ (controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, Boolean> adminCall(
+ controller, stub,
+ ProtobufUtil.buildCloseRegionRequest(serverName, hri.getRegionName()),
+ (s, c, req, done) -> s.closeRegion(controller, req, done), resp -> resp.getClosed()))
+ .serverName(serverName).call();
+ }
+
+ @Override
+ public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn) {
+ return this.<List<HRegionInfo>> newAdminCaller()
+ .action((controller, stub) -> this
+ .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<HRegionInfo>> adminCall(
+ controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
+ (s, c, req, done) -> s.getOnlineRegion(c, req, done),
+ resp -> ProtobufUtil.getRegionInfos(resp)))
+ .serverName(sn).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> flush(TableName tableName) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ tableExists(tableName).whenComplete((exists, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ } else if (!exists) {
+ future.completeExceptionally(new TableNotFoundException(tableName));
+ } else {
+ isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else if (!tableEnabled) {
+ future.completeExceptionally(new TableNotEnabledException(tableName));
+ } else {
+ execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
+ new HashMap<>()).whenComplete((ret, err3) -> {
+ if (err3 != null) {
+ future.completeExceptionally(err3);
+ } else {
+ future.complete(ret);
+ }
+ });
+ }
+ });
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> flushRegion(byte[] regionName) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getRegionLocation(regionName).whenComplete(
+ (location, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future.completeExceptionally(new NoServerForRegionException(Bytes
+ .toStringBinary(regionName)));
+ return;
+ }
+
+ HRegionInfo regionInfo = location.getRegionInfo();
+ this.<Void> newAdminCaller()
+ .serverName(serverName)
+ .action(
+ (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
+ controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
+ .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
+ resp -> null)).call().whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> compact(TableName tableName, Optional<byte[]> columnFamily) {
+ return compact(tableName, columnFamily, false, CompactType.NORMAL);
+ }
+
+ @Override
+ public CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
+ return compactRegion(regionName, columnFamily, false);
+ }
+
+ @Override
+ public CompletableFuture<Void> majorCompact(TableName tableName, Optional<byte[]> columnFamily) {
+ return compact(tableName, columnFamily, true, CompactType.NORMAL);
+ }
+
+ @Override
+ public CompletableFuture<Void> majorCompactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
+ return compactRegion(regionName, columnFamily, true);
+ }
+
+ @Override
+ public CompletableFuture<Void> compactRegionServer(ServerName sn) {
+ return compactRegionServer(sn, false);
+ }
+
+ @Override
+ public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
+ return compactRegionServer(sn, true);
+ }
+
+ private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getOnlineRegions(sn).whenComplete((hRegionInfos, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
+ if (hRegionInfos != null) {
+ hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, Optional.empty())));
+ }
+ CompletableFuture
+ .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
+ .whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ private CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily,
+ boolean major) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getRegionLocation(regionName).whenComplete(
+ (location, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future.completeExceptionally(new NoServerForRegionException(Bytes
+ .toStringBinary(regionName)));
+ return;
+ }
+ compact(location.getServerName(), location.getRegionInfo(), major, columnFamily)
+ .whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ /**
+ * List all region locations for the specific table.
+ */
+ private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
+ if (TableName.META_TABLE_NAME.equals(tableName)) {
+ CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
+ // For meta table, we use zk to fetch all locations.
+ AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
+ registry.getMetaRegionLocation().whenComplete(
+ (metaRegions, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ } else if (metaRegions == null || metaRegions.isEmpty()
+ || metaRegions.getDefaultRegionLocation() == null) {
+ future.completeExceptionally(new IOException("meta region does not found"));
+ } else {
+ future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
+ }
+ // close the registry.
+ IOUtils.closeQuietly(registry);
+ });
+ return future;
+ } else {
+ // For non-meta table, we fetch all locations by scanning hbase:meta table
+ return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName));
+ }
+ }
+
+ /**
+ * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
+ */
+ private CompletableFuture<Void> compact(final TableName tableName, Optional<byte[]> columnFamily,
+ final boolean major, CompactType compactType) {
+ if (CompactType.MOB.equals(compactType)) {
+ // TODO support MOB compact.
+ return failedFuture(new UnsupportedOperationException("MOB compact does not support"));
+ }
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
+ for (HRegionLocation location : locations) {
+ if (location.getRegionInfo() == null || location.getRegionInfo().isOffline()) continue;
+ if (location.getServerName() == null) continue;
+ compactFutures
+ .add(compact(location.getServerName(), location.getRegionInfo(), major, columnFamily));
+ }
+ // future complete unless all of the compact futures are completed.
+ CompletableFuture
+ .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
+ .whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ /**
+ * Compact the region at specific region server.
+ */
+ private CompletableFuture<Void> compact(final ServerName sn, final HRegionInfo hri,
+ final boolean major, Optional<byte[]> columnFamily) {
+ return this
+ .<Void> newAdminCaller()
+ .serverName(sn)
+ .action(
+ (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
+ controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
+ major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
+ resp -> null)).call();
+ }
+
+ private byte[] toEncodeRegionName(byte[] regionName) {
+ try {
+ return HRegionInfo.isEncodedRegionName(regionName) ? regionName
+ : Bytes.toBytes(HRegionInfo.encodeRegionName(regionName));
+ } catch (IOException e) {
+ return regionName;
+ }
+ }
+
+ private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
+ CompletableFuture<TableName> result) {
+ getRegionLocation(encodeRegionName).whenComplete(
+ (location, err) -> {
+ if (err != null) {
+ result.completeExceptionally(err);
+ return;
+ }
+ HRegionInfo regionInfo = location.getRegionInfo();
+ if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+ result.completeExceptionally(new IllegalArgumentException(
+ "Can't invoke merge on non-default regions directly"));
+ return;
+ }
+ if (!tableName.compareAndSet(null, regionInfo.getTable())) {
+ if (!tableName.get().equals(regionInfo.getTable())) {
+ // tables of this two region should be same.
+ result.completeExceptionally(new IllegalArgumentException(
+ "Cannot merge regions from two different tables " + tableName.get() + " and "
+ + regionInfo.getTable()));
+ } else {
+ result.complete(tableName.get());
+ }
+ }
+ });
+ }
+
+ private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA,
+ byte[] encodeRegionNameB) {
+ AtomicReference<TableName> tableNameRef = new AtomicReference<>();
+ CompletableFuture<TableName> future = new CompletableFuture<>();
+
+ checkAndGetTableName(encodeRegionNameA, tableNameRef, future);
+ checkAndGetTableName(encodeRegionNameB, tableNameRef, future);
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB,
+ boolean forcible) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA);
+ final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB);
+
+ checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB)
+ .whenComplete((tableName, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+
+ MergeTableRegionsRequest request = null;
+ try {
+ request = RequestConverter.buildMergeTableRegionsRequest(
+ new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
+ ng.newNonce());
+ } catch (DeserializationException e) {
+ future.completeExceptionally(e);
+ return;
+ }
+
+ this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
+ (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
+ new MergeTableRegionProcedureBiConsumer(this, tableName)).whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> split(TableName tableName) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ tableExists(tableName).whenComplete((exist, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (!exist) {
+ future.completeExceptionally(new TableNotFoundException(tableName));
+ return;
+ }
+ metaTable
+ .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
+ .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
+ .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION)))
+ .whenComplete((results, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ return;
+ }
+ if (results != null && !results.isEmpty()) {
+ List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
+ for (Result r : results) {
+ if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == null) continue;
+ RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
+ if (rl != null) {
+ for (HRegionLocation h : rl.getRegionLocations()) {
+ if (h != null && h.getServerName() != null) {
+ HRegionInfo hri = h.getRegionInfo();
+ if (hri == null || hri.isSplitParent()
+ || hri.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID)
+ continue;
+ splitFutures.add(split(h.getServerName(), hri, Optional.empty()));
+ }
+ }
+ }
+ }
+ CompletableFuture
+ .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()]))
+ .whenComplete((ret, exception) -> {
+ if (exception != null) {
+ future.completeExceptionally(exception);
+ return;
+ }
+ future.complete(ret);
+ });
+ } else {
+ future.complete(null);
+ }
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ if (splitPoint == null) {
+ return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
+ }
+ connection.getRegionLocator(tableName).getRegionLocation(splitPoint)
+ .whenComplete((loc, err) -> {
+ if (err != null) {
+ result.completeExceptionally(err);
+ } else if (loc == null || loc.getRegionInfo() == null) {
+ result.completeExceptionally(new IllegalArgumentException(
+ "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
+ } else {
+ splitRegion(loc.getRegionInfo().getRegionName(), Optional.of(splitPoint))
+ .whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ result.completeExceptionally(err2);
+ } else {
+ result.complete(ret);
+ }
+
+ });
+ }
+ });
+ return result;
+ }
+
+ @Override
+ public CompletableFuture<Void> splitRegion(byte[] regionName, Optional<byte[]> splitPoint) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getRegionLocation(regionName).whenComplete(
+ (location, err) -> {
+ HRegionInfo regionInfo = location.getRegionInfo();
+ if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+ future.completeExceptionally(new IllegalArgumentException(
+ "Can't split replicas directly. "
+ + "Replicas are auto-split when their primary is split."));
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future.completeExceptionally(new NoServerForRegionException(Bytes
+ .toStringBinary(regionName)));
+ return;
+ }
+ split(serverName, regionInfo, splitPoint).whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ private CompletableFuture<Void> split(final ServerName sn, final HRegionInfo hri,
+ Optional<byte[]> splitPoint) {
+ if (hri.getStartKey() != null && splitPoint.isPresent()
+ && Bytes.compareTo(hri.getStartKey(), splitPoint.get()) == 0) {
+ return failedFuture(new IllegalArgumentException(
+ "should not give a splitkey which equals to startkey!"));
+ }
+ return this
+ .<Void> newAdminCaller()
+ .action(
+ (controller, stub) -> this.<SplitRegionRequest, SplitRegionResponse, Void> adminCall(
+ controller, stub,
+ ProtobufUtil.buildSplitRegionRequest(hri.getRegionName(), splitPoint),
+ (s, c, req, done) -> s.splitRegion(controller, req, done), resp -> null))
+ .serverName(sn).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> assign(byte[] regionName) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getRegionInfo(regionName).whenComplete(
+ (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ this.<Void> newMasterCaller()
+ .action(
+ ((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
+ controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo
+ .getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done),
+ resp -> null))).call().whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getRegionInfo(regionName).whenComplete(
+ (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ this.<Void> newMasterCaller()
+ .action(
+ ((controller, stub) -> this
+ .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
+ RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
+ (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))).call()
+ .whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> offline(byte[] regionName) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getRegionInfo(regionName).whenComplete(
+ (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ this.<Void> newMasterCaller()
+ .action(
+ ((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
+ controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo
+ .getRegionName()), (s, c, req, done) -> s.offlineRegion(c, req, done),
+ resp -> null))).call().whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> move(byte[] regionName, Optional<ServerName> destServerName) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getRegionInfo(regionName).whenComplete(
+ (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ this.<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(
+ controller, stub, RequestConverter.buildMoveRegionRequest(
+ regionInfo.getEncodedNameAsBytes(), destServerName), (s, c, req, done) -> s
+ .moveRegion(c, req, done), resp -> null)).call().whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> setQuota(QuotaSettings quota) {
+ return this
+ .<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
+ stub, QuotaSettings.buildSetQuotaRequestProto(quota),
+ (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
+ }
+
+ @Override
+ public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
+ CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
+ Scan scan = QuotaTableUtil.makeScan(filter);
+ this.connection.getRawTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
+ .scan(scan, new RawScanResultConsumer() {
+ List<QuotaSettings> settings = new ArrayList<>();
+
+ @Override
+ public void onNext(Result[] results, ScanController controller) {
+ for (Result result : results) {
+ try {
+ QuotaTableUtil.parseResultToCollection(result, settings);
+ } catch (IOException e) {
+ controller.terminate();
+ future.completeExceptionally(e);
+ }
+ }
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ future.completeExceptionally(error);
+ }
+
+ @Override
+ public void onComplete() {
+ future.complete(settings);
+ }
+ });
+ return future;
+ }
+
+ public CompletableFuture<Void> addReplicationPeer(String peerId,
+ ReplicationPeerConfig peerConfig) {
+ return this
+ .<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub,
+ RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig), (s, c, req,
+ done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> removeReplicationPeer(String peerId) {
+ return this
+ .<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse, Void> call(controller,
+ stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId),
+ (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> enableReplicationPeer(String peerId) {
+ return this
+ .<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<EnableReplicationPeerRequest, EnableReplicationPeerResponse, Void> call(controller,
+ stub, RequestConverter.buildEnableReplicationPeerRequest(peerId),
+ (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> disableReplicationPeer(String peerId) {
+ return this
+ .<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<DisableReplicationPeerRequest, DisableReplicationPeerResponse, Void> call(
+ controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s,
+ c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null))
+ .call();
+ }
+
+ public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
+ return this
+ .<ReplicationPeerConfig> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
+ controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
+ s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
+ (resp) -> ReplicationSerDeHelper.convert(resp.getPeerConfig()))).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
+ ReplicationPeerConfig peerConfig) {
+ return this
+ .<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse, Void> call(
+ controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId,
+ peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), (
+ resp) -> null)).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
+ Map<TableName, ? extends Collection<String>> tableCfs) {
+ if (tableCfs == null) {
+ return failedFuture(new ReplicationException("tableCfs is null"));
+ }
+
+ CompletableFuture<Void> future = new CompletableFuture<Void>();
+ getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
+ if (!completeExceptionally(future, error)) {
+ ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
+ updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
+ if (!completeExceptionally(future, error)) {
+ future.complete(result);
+ }
+ });
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
+ Map<TableName, ? extends Collection<String>> tableCfs) {
+ if (tableCfs == null) {
+ return failedFuture(new ReplicationException("tableCfs is null"));
+ }
+
+ CompletableFuture<Void> future = new CompletableFuture<Void>();
+ getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
+ if (!completeExceptionally(future, error)) {
+ try {
+ ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
+ } catch (ReplicationException e) {
+ future.completeExceptionally(e);
+ return;
+ }
+ updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
+ if (!completeExceptionally(future, error)) {
+ future.complete(result);
+ }
+ });
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Optional<Pattern> pattern) {
+ return this
+ .<List<ReplicationPeerDescription>> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
+ controller,
+ stub,
+ RequestConverter.buildListReplicationPeersRequest(pattern),
+ (s, c, req, done) -> s.listReplicationPeers(c, req, done),
+ (resp) -> resp.getPeerDescList().stream()
+ .map(ReplicationSerDeHelper::toReplicationPeerDescription)
+ .collect(Collectors.toList()))).call();
+ }
+
+ @Override
+ public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
+ CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
+ listTables().whenComplete(
+ (tables, error) -> {
+ if (!completeExceptionally(future, error)) {
+ List<TableCFs> replicatedTableCFs = new ArrayList<>();
+ tables.forEach(table -> {
+ Map<String, Integer> cfs = new HashMap<>();
+ Stream.of(table.getColumnFamilies())
+ .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
+ .forEach(column -> {
+ cfs.put(column.getNameAsString(), column.getScope());
+ });
+ if (!cfs.isEmpty()) {
+ replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
+ }
+ });
+ future.complete(replicatedTableCFs);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
+ SnapshotProtos.SnapshotDescription snapshot = ProtobufUtil
+ .createHBaseProtosSnapshotDesc(snapshotDesc);
+ try {
+ ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
+ } catch (IllegalArgumentException e) {
+ return failedFuture(e);
+ }
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
+ this.<Long> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
+ stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
+ resp -> resp.getExpectedTimeout())).call().whenComplete((expectedTimeout, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ TimerTask pollingTask = new TimerTask() {
+ int tries = 0;
+ long startTime = EnvironmentEdgeManager.currentTime();
+ long endTime = startTime + expectedTimeout;
+ long maxPauseTime = expectedTimeout / maxAttempts;
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (EnvironmentEdgeManager.currentTime() < endTime) {
+ isSnapshotFinished(snapshotDesc).whenComplete((done, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else if (done) {
+ future.complete(null);
+ } else {
+ // retry again after pauseTime.
+ long pauseTime = ConnectionUtils.getPauseTime(
+ TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
+ pauseTime = Math.min(pauseTime, maxPauseTime);
+ AsyncConnectionImpl.RETRY_TIMER
+ .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
+ }
+ } );
+ } else {
+ future.completeExceptionally(new SnapshotCreationException("Snapshot '"
+ + snapshot.getName() + "' wasn't completed in expectedTime:" + expectedTimeout
+ + " ms", snapshotDesc));
+ }
+ }
+ };
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
+ return this
+ .<Boolean> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
+ controller,
+ stub,
+ IsSnapshotDoneRequest.newBuilder()
+ .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
+ req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
+ boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
+ HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
+ HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
+ return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
+ }
+
+ @Override
+ public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ listSnapshots(Optional.of(Pattern.compile(snapshotName))).whenComplete(
+ (snapshotDescriptions, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ TableName tableName = null;
+ if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
+ for (SnapshotDescription snap : snapshotDescriptions) {
+ if (snap.getName().equals(snapshotName)) {
+ tableName = snap.getTableName();
+ break;
+ }
+ }
+ }
+ if (tableName == null) {
+ future.completeExceptionally(new RestoreSnapshotException(
+ "Unable to find the table name for snapshot=" + snapshotName));
+ return;
+ }
+ final TableName finalTableName = tableName;
+ tableExists(finalTableName)
+ .whenComplete((exists, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else if (!exists) {
+ // if table does not exist, then just clone snapshot into new table.
+ completeConditionalOnFuture(future,
+ internalRestoreSnapshot(snapshotName, finalTableName));
+ } else {
+ isTableDisabled(finalTableName).whenComplete(
+ (disabled, err4) -> {
+ if (err4 != null) {
+ future.completeExceptionally(err4);
+ } else if (!disabled) {
+ future.completeExceptionally(new TableNotDisabledException(finalTableName));
+ } else {
+ completeConditionalOnFuture(future,
+ restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
+ }
+ });
+ }
+ } );
+ });
+ return future;
+ }
+
+ private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
+ boolean takeFailSafeSnapshot) {
+ if (takeFailSafeSnapshot) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ // Step.1 Take a snapshot of the current state
+ String failSafeSnapshotSnapshotNameFormat = this.connection.getConfiguration().get(
+ HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
+ HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
+ final String failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotNameFormat
+ .replace("{snapshot.name}", snapshotName)
+ .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
+ .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
+ LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
+ snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ } else {
+ // Step.2 Restore snapshot
+ internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, err2) -> {
+ if (err2 != null) {
+ // Step.3.a Something went wrong during the restore and try to rollback.
+ internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName).whenComplete(
+ (void3, err3) -> {
+ if (err3 != null) {
+ future.completeExceptionally(err3);
+ } else {
+ String msg = "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
+ + failSafeSnapshotSnapshotName + " succeeded.";
+ future.completeExceptionally(new RestoreSnapshotException(msg));
+ }
+ });
+ } else {
+ // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
+ LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
+ deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete(
+ (ret3, err3) -> {
+ if (err3 != null) {
+ LOG.error(
+ "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, err3);
+ future.completeExceptionally(err3);
+ } else {
+ future.complete(ret3);
+ }
+ });
+ }
+ } );
+ }
+ } );
+ return future;
+ } else {
+ return internalRestoreSnapshot(snapshotName, tableName);
+ }
+ }
+
+ private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
+ CompletableFuture<T> parentFuture) {
+ parentFuture.whenComplete((res, err) -> {
+ if (err != null) {
+ dependentFuture.completeExceptionally(err);
+ } else {
+ dependentFuture.complete(res);
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ tableExists(tableName).whenComplete((exists, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ } else if (exists) {
+ future.completeExceptionally(new TableExistsException(tableName));
+ } else {
+ completeConditionalOnFuture(future, internalRestoreSnapshot(snapshotName, tableName));
+ }
+ });
+ return future;
+ }
+
+ private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName) {
+ SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
+ .setName(snapshotName).setTable(tableName.getNameAsString()).build();
+ try {
+ ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
+ } catch (IllegalArgumentException e) {
+ return failedFuture(e);
+ }
+ return waitProcedureResult(this
+ .<Long> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(
+ controller, stub, RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot)
+ .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(), (s, c, req,
+ done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())).call());
+ }
+
+ @Override
+ public CompletableFuture<List<SnapshotDescription>> listSnapshots(Optional<Pattern> pattern) {
+ CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
+ this.<GetCompletedSnapshotsResponse> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, GetCompletedSnapshotsResponse> call(
+ controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(), (s, c, req,
+ done) -> s.getCompletedSnapshots(c, req, done), resp -> resp))
+ .call()
+ .whenComplete(
+ (resp, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ future.complete(resp
+ .getSnapshotsList()
+ .stream()
+ .map(ProtobufUtil::createSnapshotDesc)
+ .filter(
+ snap -> pattern.isPresent() ? pattern.get().matcher(snap.getName()).matches()
+ : true).collect(Collectors.toList()));
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
+ Pattern snapshotNamePattern) {
+ CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
+ listTableNames(Optional.ofNullable(tableNamePattern), false).whenComplete(
+ (tableNames, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ if (tableNames == null || tableNames.size() <= 0) {
+ future.complete(Collections.emptyList());
+ return;
+ }
+ listSnapshots(Optional.ofNullable(snapshotNamePattern)).whenComplete(
+ (snapshotDescList, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ return;
+ }
+ if (snapshotDescList == null || snapshotDescList.isEmpty()) {
+ future.complete(Collections.emptyList());
+ return;
+ }
+ future.complete(snapshotDescList.stream()
+ .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
+ .collect(Collectors.toList()));
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
+ return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
+ return deleteTableSnapshots(null, snapshotNamePattern);
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
+ Pattern snapshotNamePattern) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ listTableSnapshots(tableNamePattern, snapshotNamePattern).whenComplete(
+ ((snapshotDescriptions, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
+ future.complete(null);
+ return;
+ }
+ List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>();
+ snapshotDescriptions.forEach(snapDesc -> deleteSnapshotFutures
+ .add(internalDeleteSnapshot(snapDesc)));
+ CompletableFuture.allOf(
+ deleteSnapshotFutures.toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()]))
+ .thenAccept(v -> future.complete(v));
+ }));
+ return future;
+ }
+
+ private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
+ return this
+ .<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
+ controller,
+ stub,
+ DeleteSnapshotRequest.newBuilder()
+ .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
+ req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> execProcedure(String signature, String instance,
+ Map<String, String> props) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ ProcedureDescription procDesc =
+ ProtobufUtil.buildProcedureDescription(signature, instance, props);
+ this.<Long> newMasterCaller()
+ .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
+ controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
+ (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
+ .call().whenComplete((expectedTimeout, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ TimerTask pollingTask = new TimerTask() {
+ int tries = 0;
+ long startTime = EnvironmentEdgeManager.currentTime();
+ long endTime = startTime + expectedTimeout;
+ long maxPauseTime = expectedTimeout / maxAttempts;
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (EnvironmentEdgeManager.currentTime() < endTime) {
+ isProcedureFinished(signature, instance, props).whenComplete((done, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ return;
+ }
+ if (done) {
+ future.complete(null);
+ } else {
+ // retry again after pauseTime.
+ long pauseTime = ConnectionUtils
+ .getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
+ pauseTime = Math.min(pauseTime, maxPauseTime);
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
+ TimeUnit.MICROSECONDS);
+ }
+ });
+ } else {
+ future.completeExceptionally(new IOException("Procedure '" + signature + " : "
+ + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
+ }
+ }
+ };
+ // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<byte[]> execProcedureWithRet(String signature, String instance,
+ Map<String, String> props) {
+ ProcedureDescription proDesc =
+ ProtobufUtil.buildProcedureDescription(signature, instance, props);
+ return this.<byte[]> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
+ controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
+ (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
+ resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
+ Map<String, String> props) {
+ ProcedureDescription proDesc =
+ ProtobufUtil.buildProcedureDescription(signature, instance, props);
+ return this.<Boolean> newMasterCaller()
+ .action((controller, stub) -> this
+ .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
+ IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
+ (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
+ return this.<Boolean> newMasterCaller().action(
+ (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
+ controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
+ (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<List<ProcedureInfo>> listProcedures() {
+ return this
+ .<List<ProcedureInfo>> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<ListProceduresRequest, ListProceduresResponse, List<ProcedureInfo>> call(
+ controller, stub, ListProceduresRequest.newBuilder().build(),
+ (s, c, req, done) -> s.listProcedures(c, req, done),
+ resp -> resp.getProcedureList().stream().map(ProtobufUtil::toProcedureInfo)
+ .collect(Collectors.toList()))).call();
+ }
+
+ /**
+ * Get the region location for the passed region name. The region name may be a full region name
+ * or encoded region name. If the region does not found, then it'll throw an
+ * UnknownRegionException wrapped by a {@link CompletableFuture}
+ * @param regionNameOrEncodedRegionName
+ * @return region location, wrapped by a {@link CompletableFuture}
+ */
+ @VisibleForTesting
+ CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
+ if (regionNameOrEncodedRegionName == null) {
+ return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
+ }
+ try {
+ CompletableFuture<Optional<HRegionLocation>> future;
+ if (HRegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
+ future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
+ regionNameOrEncodedRegionName);
+ } else {
+ future = AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
+ }
+
+ CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
+ future.whenComplete((location, err) -> {
+ if (err != null) {
+ returnedFuture.completeExceptionally(err);
+ return;
+ }
+ LOG.info("location is " + location);
+ if (!location.isPresent() || location.get().getRegionInfo() == null) {
+ LOG.info("unknown location is " + location);
+ returnedFuture.completeExceptionally(new UnknownRegionException(
+ "Invalid region name or encoded region name: "
+ + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
+ } else {
+ returnedFuture.complete(location.get());
+ }
+ });
+ return returnedFuture;
+ } catch (IOException e) {
+ return failedFuture(e);
+ }
+ }
+
+ /**
+ * Get the region info for the passed region name. The region name may be a full region name or
+ * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
+ * wrapped by a {@link CompletableFuture}
+ * @param regionNameOrEncodedRegionName
+ * @return region info, wrapped by a {@link CompletableFuture}
+ */
+ private CompletableFuture<HRegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
+ if (regionNameOrEncodedRegionName == null) {
+ return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
+ }
+
+ if (Bytes.equals(regionNameOrEncodedRegionName,
+ HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
+ || Bytes.equals(regionNameOrEncodedRegionName,
+ HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
+ return
<TRUNCATED>