You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/07/04 02:14:07 UTC
[4/5] hbase git commit: HBASE-18283 Provide a construct method which
accept a thread pool for AsyncAdmin
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 1da660c..36fd60d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -17,174 +17,27 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
+import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import java.util.stream.Stream;
-
-import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
-import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ProcedureInfo;
-import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
-import org.apache.hadoop.hbase.TableNotDisabledException;
-import org.apache.hadoop.hbase.TableNotEnabledException;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
-import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
-import org.apache.hadoop.hbase.client.Scan.ReadType;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
import org.apache.hadoop.hbase.client.replication.TableCFs;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.quotas.QuotaFilter;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
-import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
-import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
-import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
-import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
/**
@@ -192,2110 +45,404 @@ import org.apache.hadoop.hbase.util.Pair;
*/
@InterfaceAudience.Private
public class AsyncHBaseAdmin implements AsyncAdmin {
- public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class);
- private final AsyncConnectionImpl connection;
-
- private final RawAsyncTable metaTable;
-
- private final long rpcTimeoutNs;
-
- private final long operationTimeoutNs;
-
- private final long pauseNs;
-
- private final int maxAttempts;
-
- private final int startLogErrorsCnt;
+ private final RawAsyncHBaseAdmin rawAdmin;
- private final NonceGenerator ng;
-
- AsyncHBaseAdmin(AsyncConnectionImpl connection) {
- this.connection = connection;
- this.metaTable = connection.getRawTable(META_TABLE_NAME);
- this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs();
- this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs();
- this.pauseNs = connection.connConf.getPauseNs();
- this.maxAttempts = connection.connConf.getMaxRetries();
- this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt();
- this.ng = connection.getNonceGenerator();
- }
-
- private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
- return this.connection.callerFactory.<T> masterRequest()
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
- .startLogErrorsCnt(startLogErrorsCnt);
- }
-
- private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
- return this.connection.callerFactory.<T> adminRequest()
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
- .startLogErrorsCnt(startLogErrorsCnt);
- }
-
- @FunctionalInterface
- private interface MasterRpcCall<RESP, REQ> {
- void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
- RpcCallback<RESP> done);
- }
-
- @FunctionalInterface
- private interface AdminRpcCall<RESP, REQ> {
- void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
- RpcCallback<RESP> done);
- }
+ private final ExecutorService pool;
- @FunctionalInterface
- private interface Converter<D, S> {
- D convert(S src) throws IOException;
+ AsyncHBaseAdmin(RawAsyncHBaseAdmin rawAdmin, ExecutorService pool) {
+ this.rawAdmin = rawAdmin;
+ this.pool = pool;
}
- private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
- MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
- Converter<RESP, PRESP> respConverter) {
- CompletableFuture<RESP> future = new CompletableFuture<>();
- rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
-
- @Override
- public void run(PRESP resp) {
- if (controller.failed()) {
- future.completeExceptionally(controller.getFailed());
- } else {
- try {
- future.complete(respConverter.convert(resp));
- } catch (IOException e) {
- future.completeExceptionally(e);
- }
- }
- }
- });
- return future;
- }
-
- private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
- AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
- Converter<RESP, PRESP> respConverter) {
-
- CompletableFuture<RESP> future = new CompletableFuture<>();
- rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
-
- @Override
- public void run(PRESP resp) {
- if (controller.failed()) {
- future.completeExceptionally(new IOException(controller.errorText()));
- } else {
- try {
- future.complete(respConverter.convert(resp));
- } catch (IOException e) {
- future.completeExceptionally(e);
- }
- }
+ private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
+ CompletableFuture<T> asyncFuture = new CompletableFuture<>();
+ future.whenCompleteAsync((r, e) -> {
+ if (e != null) {
+ asyncFuture.completeExceptionally(e);
+ } else {
+ asyncFuture.complete(r);
}
- });
- return future;
- }
-
- private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
- MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
- ProcedureBiConsumer consumer) {
- CompletableFuture<Long> procFuture = this
- .<Long> newMasterCaller()
- .action(
- (controller, stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall,
- respConverter)).call();
- return waitProcedureResult(procFuture).whenComplete(consumer);
- }
-
- @FunctionalInterface
- private interface TableOperator {
- CompletableFuture<Void> operate(TableName table);
- }
-
- private CompletableFuture<List<TableDescriptor>> batchTableOperations(Pattern pattern,
- TableOperator operator, String operationType) {
- CompletableFuture<List<TableDescriptor>> future = new CompletableFuture<>();
- List<TableDescriptor> failed = new LinkedList<>();
- listTables(Optional.ofNullable(pattern), false).whenComplete(
- (tables, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- CompletableFuture[] futures =
- tables.stream()
- .map((table) -> operator.operate(table.getTableName()).whenComplete((v, ex) -> {
- if (ex != null) {
- LOG.info("Failed to " + operationType + " table " + table.getTableName(), ex);
- failed.add(table);
- }
- })).<CompletableFuture> toArray(size -> new CompletableFuture[size]);
- CompletableFuture.allOf(futures).thenAccept((v) -> {
- future.complete(failed);
- });
- });
- return future;
- }
-
- @Override
- public AsyncConnectionImpl getConnection() {
- return this.connection;
+ }, pool);
+ return asyncFuture;
}
@Override
public CompletableFuture<Boolean> tableExists(TableName tableName) {
- return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
+ return wrap(rawAdmin.tableExists(tableName));
}
@Override
public CompletableFuture<List<TableDescriptor>> listTables(Optional<Pattern> pattern,
boolean includeSysTables) {
- return this.<List<TableDescriptor>> newMasterCaller()
- .action((controller, stub) -> this
- .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
- controller, stub,
- RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables),
- (s, c, req, done) -> s.getTableDescriptors(c, req, done),
- (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
- .call();
+ return wrap(rawAdmin.listTables(pattern, includeSysTables));
}
@Override
public CompletableFuture<List<TableName>> listTableNames(Optional<Pattern> pattern,
boolean includeSysTables) {
- return this.<List<TableName>> newMasterCaller()
- .action((controller, stub) -> this
- .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller, stub,
- RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables),
- (s, c, req, done) -> s.getTableNames(c, req, done),
- (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
- .call();
+ return wrap(rawAdmin.listTableNames(pattern, includeSysTables));
}
@Override
public CompletableFuture<TableDescriptor> getTableDescriptor(TableName tableName) {
- CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
- this.<List<TableSchema>> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
- controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), (s,
- c, req, done) -> s.getTableDescriptors(c, req, done), (resp) -> resp
- .getTableSchemaList())).call().whenComplete((tableSchemas, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (!tableSchemas.isEmpty()) {
- future.complete(ProtobufUtil.convertToTableDesc(tableSchemas.get(0)));
- } else {
- future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
- }
- });
- return future;
- }
-
- @Override
- public CompletableFuture<Void> createTable(TableDescriptor desc) {
- return createTable(desc, null);
+ return wrap(rawAdmin.getTableDescriptor(tableName));
}
@Override
public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
int numRegions) {
- try {
- return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
- } catch (IllegalArgumentException e) {
- return failedFuture(e);
- }
- }
-
- @Override
- public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
- if (desc.getTableName() == null) {
- return failedFuture(new IllegalArgumentException("TableName cannot be null"));
- }
- if (splitKeys != null && splitKeys.length > 0) {
- Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
- // Verify there are no duplicate split keys
- byte[] lastKey = null;
- for (byte[] splitKey : splitKeys) {
- if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
- return failedFuture(new IllegalArgumentException(
- "Empty split key must not be passed in the split keys."));
- }
- if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
- return failedFuture(new IllegalArgumentException("All split keys must be unique, "
- + "found duplicate: " + Bytes.toStringBinary(splitKey) + ", "
- + Bytes.toStringBinary(lastKey)));
- }
- lastKey = splitKey;
- }
- }
+ return wrap(rawAdmin.createTable(desc, startKey, endKey, numRegions));
+ }
- return this.<CreateTableRequest, CreateTableResponse> procedureCall(
- RequestConverter.buildCreateTableRequest(desc, splitKeys, ng.getNonceGroup(), ng.newNonce()),
- (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
- new CreateTableProcedureBiConsumer(this, desc.getTableName()));
+ @Override
+ public CompletableFuture<Void> createTable(TableDescriptor desc, Optional<byte[][]> splitKeys) {
+ return wrap(rawAdmin.createTable(desc, splitKeys));
}
@Override
public CompletableFuture<Void> deleteTable(TableName tableName) {
- return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
- .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
- (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
- new DeleteTableProcedureBiConsumer(this, tableName));
+ return wrap(rawAdmin.deleteTable(tableName));
}
@Override
public CompletableFuture<List<TableDescriptor>> deleteTables(Pattern pattern) {
- return batchTableOperations(pattern, (table) -> deleteTable(table), "DELETE");
+ return wrap(rawAdmin.deleteTables(pattern));
}
@Override
public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
- return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(
- RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
- ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
- (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(this, tableName));
+ return wrap(rawAdmin.truncateTable(tableName, preserveSplits));
}
@Override
public CompletableFuture<Void> enableTable(TableName tableName) {
- return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter
- .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
- (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
- new EnableTableProcedureBiConsumer(this, tableName));
+ return wrap(rawAdmin.enableTable(tableName));
}
@Override
public CompletableFuture<List<TableDescriptor>> enableTables(Pattern pattern) {
- return batchTableOperations(pattern, (table) -> enableTable(table), "ENABLE");
+ return wrap(rawAdmin.enableTables(pattern));
}
@Override
public CompletableFuture<Void> disableTable(TableName tableName) {
- return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter
- .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
- (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
- new DisableTableProcedureBiConsumer(this, tableName));
+ return wrap(rawAdmin.disableTable(tableName));
}
@Override
public CompletableFuture<List<TableDescriptor>> disableTables(Pattern pattern) {
- return batchTableOperations(pattern, (table) -> disableTable(table), "DISABLE");
+ return wrap(rawAdmin.disableTables(pattern));
}
@Override
public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
- CompletableFuture<Boolean> future = new CompletableFuture<>();
- AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (state.isPresent()) {
- future.complete(state.get().inStates(TableState.State.ENABLED));
- } else {
- future.completeExceptionally(new TableNotFoundException(tableName));
- }
- });
- return future;
+ return wrap(rawAdmin.isTableEnabled(tableName));
}
@Override
public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
- CompletableFuture<Boolean> future = new CompletableFuture<>();
- AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (state.isPresent()) {
- future.complete(state.get().inStates(TableState.State.DISABLED));
- } else {
- future.completeExceptionally(new TableNotFoundException(tableName));
- }
- });
- return future;
- }
-
- @Override
- public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
- return isTableAvailable(tableName, null);
+ return wrap(rawAdmin.isTableDisabled(tableName));
}
@Override
public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) {
- CompletableFuture<Boolean> future = new CompletableFuture<>();
- isTableEnabled(tableName).whenComplete(
- (enabled, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (!enabled) {
- future.complete(false);
- } else {
- AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
- .whenComplete(
- (locations, error1) -> {
- if (error1 != null) {
- future.completeExceptionally(error1);
- return;
- }
- int notDeployed = 0;
- int regionCount = 0;
- for (HRegionLocation location : locations) {
- HRegionInfo info = location.getRegionInfo();
- if (location.getServerName() == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Table " + tableName + " has not deployed region "
- + info.getEncodedName());
- }
- notDeployed++;
- } else if (splitKeys != null
- && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
- for (byte[] splitKey : splitKeys) {
- // Just check if the splitkey is available
- if (Bytes.equals(info.getStartKey(), splitKey)) {
- regionCount++;
- break;
- }
- }
- } else {
- // Always empty start row should be counted
- regionCount++;
- }
- }
- if (notDeployed > 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Table " + tableName + " has " + notDeployed + " regions");
- }
- future.complete(false);
- } else if (splitKeys != null && regionCount != splitKeys.length + 1) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Table " + tableName + " expected to have "
- + (splitKeys.length + 1) + " regions, but only " + regionCount
- + " available");
- }
- future.complete(false);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Table " + tableName + " should be available");
- }
- future.complete(true);
- }
- });
- }
- });
- return future;
+ return wrap(rawAdmin.isTableAvailable(tableName, splitKeys));
}
@Override
public CompletableFuture<Pair<Integer, Integer>> getAlterStatus(TableName tableName) {
- return this
- .<Pair<Integer, Integer>>newMasterCaller()
- .action(
- (controller, stub) -> this
- .<GetSchemaAlterStatusRequest, GetSchemaAlterStatusResponse, Pair<Integer, Integer>> call(
- controller, stub, RequestConverter.buildGetSchemaAlterStatusRequest(tableName), (s,
- c, req, done) -> s.getSchemaAlterStatus(c, req, done), (resp) -> new Pair<>(
- resp.getYetToUpdateRegions(), resp.getTotalRegions()))).call();
+ return wrap(rawAdmin.getAlterStatus(tableName));
}
@Override
- public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
- return this.<AddColumnRequest, AddColumnResponse> procedureCall(
- RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
- ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
- new AddColumnFamilyProcedureBiConsumer(this, tableName));
+ public CompletableFuture<Void> addColumnFamily(TableName tableName,
+ ColumnFamilyDescriptor columnFamily) {
+ return wrap(rawAdmin.addColumnFamily(tableName, columnFamily));
}
@Override
public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
- return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(
- RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
- ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
- (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(this, tableName));
+ return wrap(rawAdmin.deleteColumnFamily(tableName, columnFamily));
}
@Override
public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
ColumnFamilyDescriptor columnFamily) {
- return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(
- RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
- ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
- (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(this, tableName));
+ return wrap(rawAdmin.modifyColumnFamily(tableName, columnFamily));
}
@Override
public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
- return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
- RequestConverter.buildCreateNamespaceRequest(descriptor),
- (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
- new CreateNamespaceProcedureBiConsumer(this, descriptor.getName()));
+ return wrap(rawAdmin.createNamespace(descriptor));
}
@Override
public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
- return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
- RequestConverter.buildModifyNamespaceRequest(descriptor),
- (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
- new ModifyNamespaceProcedureBiConsumer(this, descriptor.getName()));
+ return wrap(rawAdmin.modifyNamespace(descriptor));
}
@Override
public CompletableFuture<Void> deleteNamespace(String name) {
- return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
- RequestConverter.buildDeleteNamespaceRequest(name),
- (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
- new DeleteNamespaceProcedureBiConsumer(this, name));
+ return wrap(rawAdmin.deleteNamespace(name));
}
@Override
public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
- return this
- .<NamespaceDescriptor> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
- controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
- req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
- .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
+ return wrap(rawAdmin.getNamespaceDescriptor(name));
}
@Override
public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
- return this
- .<List<NamespaceDescriptor>> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
- controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
- done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
- .toNamespaceDescriptorList(resp))).call();
+ return wrap(rawAdmin.listNamespaceDescriptors());
}
@Override
- public CompletableFuture<Boolean> setBalancerOn(final boolean on) {
- return this
- .<Boolean> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller,
- stub, RequestConverter.buildSetBalancerRunningRequest(on, true),
- (s, c, req, done) -> s.setBalancerRunning(c, req, done),
- (resp) -> resp.getPrevBalanceValue())).call();
+ public CompletableFuture<Boolean> setBalancerOn(boolean on) {
+ return wrap(rawAdmin.setBalancerOn(on));
}
@Override
public CompletableFuture<Boolean> balance(boolean forcible) {
- return this
- .<Boolean> newMasterCaller()
- .action(
- (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
- stub, RequestConverter.buildBalanceRequest(forcible),
- (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
+ return wrap(rawAdmin.balance(forcible));
}
@Override
public CompletableFuture<Boolean> isBalancerOn() {
- return this
- .<Boolean> newMasterCaller()
- .action(
- (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
- controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
- (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
- .call();
+ return wrap(rawAdmin.isBalancerOn());
}
@Override
public CompletableFuture<Boolean> closeRegion(byte[] regionName, Optional<ServerName> serverName) {
- CompletableFuture<Boolean> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete((location, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- ServerName server = serverName.isPresent() ? serverName.get() : location.getServerName();
- if (server == null) {
- future.completeExceptionally(new NotServingRegionException(regionName));
- } else {
- closeRegion(location.getRegionInfo(), server).whenComplete((result, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(result);
- }
- });
- }
- });
- return future;
- }
-
- private CompletableFuture<Boolean> closeRegion(HRegionInfo hri, ServerName serverName) {
- return this
- .<Boolean> newAdminCaller()
- .action(
- (controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, Boolean> adminCall(
- controller, stub,
- ProtobufUtil.buildCloseRegionRequest(serverName, hri.getRegionName()),
- (s, c, req, done) -> s.closeRegion(controller, req, done), resp -> resp.getClosed()))
- .serverName(serverName).call();
+ return wrap(rawAdmin.closeRegion(regionName, serverName));
}
@Override
- public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn) {
- return this.<List<HRegionInfo>> newAdminCaller()
- .action((controller, stub) -> this
- .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<HRegionInfo>> adminCall(
- controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
- (s, c, req, done) -> s.getOnlineRegion(c, req, done),
- resp -> ProtobufUtil.getRegionInfos(resp)))
- .serverName(sn).call();
+ public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName) {
+ return wrap(rawAdmin.getOnlineRegions(serverName));
}
@Override
public CompletableFuture<Void> flush(TableName tableName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- tableExists(tableName).whenComplete((exists, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else if (!exists) {
- future.completeExceptionally(new TableNotFoundException(tableName));
- } else {
- isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else if (!tableEnabled) {
- future.completeExceptionally(new TableNotEnabledException(tableName));
- } else {
- execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
- new HashMap<>()).whenComplete((ret, err3) -> {
- if (err3 != null) {
- future.completeExceptionally(err3);
- } else {
- future.complete(ret);
- }
- });
- }
- });
- }
- });
- return future;
+ return wrap(rawAdmin.flush(tableName));
}
@Override
public CompletableFuture<Void> flushRegion(byte[] regionName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
- }
-
- HRegionInfo regionInfo = location.getRegionInfo();
- this.<Void> newAdminCaller()
- .serverName(serverName)
- .action(
- (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
- controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
- .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
- resp -> null)).call().whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
+ return wrap(rawAdmin.flushRegion(regionName));
}
@Override
public CompletableFuture<Void> compact(TableName tableName, Optional<byte[]> columnFamily) {
- return compact(tableName, columnFamily, false, CompactType.NORMAL);
+ return wrap(rawAdmin.compact(tableName, columnFamily));
}
@Override
public CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
- return compactRegion(regionName, columnFamily, false);
+ return wrap(rawAdmin.compactRegion(regionName, columnFamily));
}
@Override
public CompletableFuture<Void> majorCompact(TableName tableName, Optional<byte[]> columnFamily) {
- return compact(tableName, columnFamily, true, CompactType.NORMAL);
+ return wrap(rawAdmin.majorCompact(tableName, columnFamily));
}
@Override
- public CompletableFuture<Void> majorCompactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
- return compactRegion(regionName, columnFamily, true);
+ public CompletableFuture<Void>
+ majorCompactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
+ return wrap(rawAdmin.majorCompactRegion(regionName, columnFamily));
}
@Override
- public CompletableFuture<Void> compactRegionServer(ServerName sn) {
- return compactRegionServer(sn, false);
+ public CompletableFuture<Void> compactRegionServer(ServerName serverName) {
+ return wrap(rawAdmin.compactRegionServer(serverName));
}
@Override
- public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
- return compactRegionServer(sn, true);
- }
-
- private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getOnlineRegions(sn).whenComplete((hRegionInfos, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
- if (hRegionInfos != null) {
- hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, Optional.empty())));
- }
- CompletableFuture
- .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
- }
-
- private CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily,
- boolean major) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
- }
- compact(location.getServerName(), location.getRegionInfo(), major, columnFamily)
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
- }
-
- /**
- * List all region locations for the specific table.
- */
- private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
- if (TableName.META_TABLE_NAME.equals(tableName)) {
- CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
- // For meta table, we use zk to fetch all locations.
- AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
- registry.getMetaRegionLocation().whenComplete(
- (metaRegions, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else if (metaRegions == null || metaRegions.isEmpty()
- || metaRegions.getDefaultRegionLocation() == null) {
- future.completeExceptionally(new IOException("meta region does not found"));
- } else {
- future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
- }
- // close the registry.
- IOUtils.closeQuietly(registry);
- });
- return future;
- } else {
- // For non-meta table, we fetch all locations by scanning hbase:meta table
- return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName));
- }
- }
-
- /**
- * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
- */
- private CompletableFuture<Void> compact(final TableName tableName, Optional<byte[]> columnFamily,
- final boolean major, CompactType compactType) {
- if (CompactType.MOB.equals(compactType)) {
- // TODO support MOB compact.
- return failedFuture(new UnsupportedOperationException("MOB compact does not support"));
- }
- CompletableFuture<Void> future = new CompletableFuture<>();
- getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
- for (HRegionLocation location : locations) {
- if (location.getRegionInfo() == null || location.getRegionInfo().isOffline()) continue;
- if (location.getServerName() == null) continue;
- compactFutures
- .add(compact(location.getServerName(), location.getRegionInfo(), major, columnFamily));
- }
- // future complete unless all of the compact futures are completed.
- CompletableFuture
- .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
- }
-
- /**
- * Compact the region at specific region server.
- */
- private CompletableFuture<Void> compact(final ServerName sn, final HRegionInfo hri,
- final boolean major, Optional<byte[]> columnFamily) {
- return this
- .<Void> newAdminCaller()
- .serverName(sn)
- .action(
- (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
- controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
- major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
- resp -> null)).call();
- }
-
- private byte[] toEncodeRegionName(byte[] regionName) {
- try {
- return HRegionInfo.isEncodedRegionName(regionName) ? regionName
- : Bytes.toBytes(HRegionInfo.encodeRegionName(regionName));
- } catch (IOException e) {
- return regionName;
- }
- }
-
- private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
- CompletableFuture<TableName> result) {
- getRegionLocation(encodeRegionName).whenComplete(
- (location, err) -> {
- if (err != null) {
- result.completeExceptionally(err);
- return;
- }
- HRegionInfo regionInfo = location.getRegionInfo();
- if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
- result.completeExceptionally(new IllegalArgumentException(
- "Can't invoke merge on non-default regions directly"));
- return;
- }
- if (!tableName.compareAndSet(null, regionInfo.getTable())) {
- if (!tableName.get().equals(regionInfo.getTable())) {
- // tables of this two region should be same.
- result.completeExceptionally(new IllegalArgumentException(
- "Cannot merge regions from two different tables " + tableName.get() + " and "
- + regionInfo.getTable()));
- } else {
- result.complete(tableName.get());
- }
- }
- });
- }
-
- private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA,
- byte[] encodeRegionNameB) {
- AtomicReference<TableName> tableNameRef = new AtomicReference<>();
- CompletableFuture<TableName> future = new CompletableFuture<>();
-
- checkAndGetTableName(encodeRegionNameA, tableNameRef, future);
- checkAndGetTableName(encodeRegionNameB, tableNameRef, future);
- return future;
+ public CompletableFuture<Void> majorCompactRegionServer(ServerName serverName) {
+ return wrap(rawAdmin.majorCompactRegionServer(serverName));
}
@Override
public CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB,
boolean forcible) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA);
- final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB);
-
- checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB)
- .whenComplete((tableName, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
-
- MergeTableRegionsRequest request = null;
- try {
- request = RequestConverter.buildMergeTableRegionsRequest(
- new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
- ng.newNonce());
- } catch (DeserializationException e) {
- future.completeExceptionally(e);
- return;
- }
-
- this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
- (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
- new MergeTableRegionProcedureBiConsumer(this, tableName)).whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
-
- });
- return future;
+ return wrap(rawAdmin.mergeRegions(nameOfRegionA, nameOfRegionB, forcible));
}
@Override
public CompletableFuture<Void> split(TableName tableName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- tableExists(tableName).whenComplete((exist, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (!exist) {
- future.completeExceptionally(new TableNotFoundException(tableName));
- return;
- }
- metaTable
- .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
- .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
- .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION)))
- .whenComplete((results, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- return;
- }
- if (results != null && !results.isEmpty()) {
- List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
- for (Result r : results) {
- if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == null) continue;
- RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
- if (rl != null) {
- for (HRegionLocation h : rl.getRegionLocations()) {
- if (h != null && h.getServerName() != null) {
- HRegionInfo hri = h.getRegionInfo();
- if (hri == null || hri.isSplitParent()
- || hri.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID)
- continue;
- splitFutures.add(split(h.getServerName(), hri, Optional.empty()));
- }
- }
- }
- }
- CompletableFuture
- .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()]))
- .whenComplete((ret, exception) -> {
- if (exception != null) {
- future.completeExceptionally(exception);
- return;
- }
- future.complete(ret);
- });
- } else {
- future.complete(null);
- }
- });
- });
- return future;
+ return wrap(rawAdmin.split(tableName));
}
@Override
public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
- CompletableFuture<Void> result = new CompletableFuture<>();
- if (splitPoint == null) {
- return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
- }
- connection.getRegionLocator(tableName).getRegionLocation(splitPoint)
- .whenComplete((loc, err) -> {
- if (err != null) {
- result.completeExceptionally(err);
- } else if (loc == null || loc.getRegionInfo() == null) {
- result.completeExceptionally(new IllegalArgumentException(
- "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
- } else {
- splitRegion(loc.getRegionInfo().getRegionName(), Optional.of(splitPoint))
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- result.completeExceptionally(err2);
- } else {
- result.complete(ret);
- }
-
- });
- }
- });
- return result;
+ return wrap(rawAdmin.split(tableName, splitPoint));
}
@Override
public CompletableFuture<Void> splitRegion(byte[] regionName, Optional<byte[]> splitPoint) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- HRegionInfo regionInfo = location.getRegionInfo();
- if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
- future.completeExceptionally(new IllegalArgumentException(
- "Can't split replicas directly. "
- + "Replicas are auto-split when their primary is split."));
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
- }
- split(serverName, regionInfo, splitPoint).whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
- }
-
- private CompletableFuture<Void> split(final ServerName sn, final HRegionInfo hri,
- Optional<byte[]> splitPoint) {
- if (hri.getStartKey() != null && splitPoint.isPresent()
- && Bytes.compareTo(hri.getStartKey(), splitPoint.get()) == 0) {
- return failedFuture(new IllegalArgumentException(
- "should not give a splitkey which equals to startkey!"));
- }
- return this
- .<Void> newAdminCaller()
- .action(
- (controller, stub) -> this.<SplitRegionRequest, SplitRegionResponse, Void> adminCall(
- controller, stub,
- ProtobufUtil.buildSplitRegionRequest(hri.getRegionName(), splitPoint),
- (s, c, req, done) -> s.splitRegion(controller, req, done), resp -> null))
- .serverName(sn).call();
+ return wrap(rawAdmin.splitRegion(regionName, splitPoint));
}
@Override
public CompletableFuture<Void> assign(byte[] regionName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- this.<Void> newMasterCaller()
- .action(
- ((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
- controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo
- .getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done),
- resp -> null))).call().whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
+ return wrap(rawAdmin.assign(regionName));
}
@Override
public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- this.<Void> newMasterCaller()
- .action(
- ((controller, stub) -> this
- .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
- RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
- (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))).call()
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
+ return wrap(rawAdmin.unassign(regionName, forcible));
}
@Override
public CompletableFuture<Void> offline(byte[] regionName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- this.<Void> newMasterCaller()
- .action(
- ((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
- controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo
- .getRegionName()), (s, c, req, done) -> s.offlineRegion(c, req, done),
- resp -> null))).call().whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
+ return wrap(rawAdmin.offline(regionName));
}
@Override
public CompletableFuture<Void> move(byte[] regionName, Optional<ServerName> destServerName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- this.<Void> newMasterCaller()
- .action(
- (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(
- controller, stub, RequestConverter.buildMoveRegionRequest(
- regionInfo.getEncodedNameAsBytes(), destServerName), (s, c, req, done) -> s
- .moveRegion(c, req, done), resp -> null)).call().whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
+ return wrap(rawAdmin.move(regionName, destServerName));
}
@Override
public CompletableFuture<Void> setQuota(QuotaSettings quota) {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
- stub, QuotaSettings.buildSetQuotaRequestProto(quota),
- (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
+ return wrap(rawAdmin.setQuota(quota));
}
@Override
public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
- CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
- Scan scan = QuotaTableUtil.makeScan(filter);
- this.connection.getRawTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
- .scan(scan, new RawScanResultConsumer() {
- List<QuotaSettings> settings = new ArrayList<>();
-
- @Override
- public void onNext(Result[] results, ScanController controller) {
- for (Result result : results) {
- try {
- QuotaTableUtil.parseResultToCollection(result, settings);
- } catch (IOException e) {
- controller.terminate();
- future.completeExceptionally(e);
- }
- }
- }
-
- @Override
- public void onError(Throwable error) {
- future.completeExceptionally(error);
- }
-
- @Override
- public void onComplete() {
- future.complete(settings);
- }
- });
- return future;
- }
-
- public CompletableFuture<Void> addReplicationPeer(String peerId,
- ReplicationPeerConfig peerConfig) {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub,
- RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig), (s, c, req,
- done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
+ return wrap(rawAdmin.getQuota(filter));
+ }
+
+ @Override
+ public CompletableFuture<Void>
+ addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) {
+ return wrap(rawAdmin.addReplicationPeer(peerId, peerConfig));
}
@Override
public CompletableFuture<Void> removeReplicationPeer(String peerId) {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse, Void> call(controller,
- stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId),
- (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call();
+ return wrap(rawAdmin.removeReplicationPeer(peerId));
}
@Override
public CompletableFuture<Void> enableReplicationPeer(String peerId) {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<EnableReplicationPeerRequest, EnableReplicationPeerResponse, Void> call(controller,
- stub, RequestConverter.buildEnableReplicationPeerRequest(peerId),
- (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call();
+ return wrap(rawAdmin.enableReplicationPeer(peerId));
}
@Override
public CompletableFuture<Void> disableReplicationPeer(String peerId) {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<DisableReplicationPeerRequest, DisableReplicationPeerResponse, Void> call(
- controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s,
- c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null))
- .call();
+ return wrap(rawAdmin.disableReplicationPeer(peerId));
}
+ @Override
public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
- return this
- .<ReplicationPeerConfig> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
- controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
- s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
- (resp) -> ReplicationSerDeHelper.convert(resp.getPeerConfig()))).call();
+ return wrap(rawAdmin.getReplicationPeerConfig(peerId));
}
@Override
public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
ReplicationPeerConfig peerConfig) {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse, Void> call(
- controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId,
- peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), (
- resp) -> null)).call();
+ return wrap(rawAdmin.updateReplicationPeerConfig(peerId, peerConfig));
}
@Override
- public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
+ public CompletableFuture<Void> appendReplicationPeerTableCFs(String peerId,
Map<TableName, ? extends Collection<String>> tableCfs) {
- if (tableCfs == null) {
- return failedFuture(new ReplicationException("tableCfs is null"));
- }
-
- CompletableFuture<Void> future = new CompletableFuture<Void>();
- getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
- if (!completeExceptionally(future, error)) {
- ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
- updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
- if (!completeExceptionally(future, error)) {
- future.complete(result);
- }
- });
- }
- });
- return future;
+ return wrap(rawAdmin.appendReplicationPeerTableCFs(peerId, tableCfs));
}
@Override
- public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
+ public CompletableFuture<Void> removeReplicationPeerTableCFs(String peerId,
Map<TableName, ? extends Collection<String>> tableCfs) {
- if (tableCfs == null) {
- return failedFuture(new ReplicationException("tableCfs is null"));
- }
-
- CompletableFuture<Void> future = new CompletableFuture<Void>();
- getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
- if (!completeExceptionally(future, error)) {
- try {
- ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
- } catch (ReplicationException e) {
- future.completeExceptionally(e);
- return;
- }
- updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
- if (!completeExceptionally(future, error)) {
- future.complete(result);
- }
- });
- }
- });
- return future;
+ return wrap(rawAdmin.removeReplicationPeerTableCFs(peerId, tableCfs));
}
@Override
- public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Optional<Pattern> pattern) {
- return this
- .<List<ReplicationPeerDescription>> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
- controller,
- stub,
- RequestConverter.buildListReplicationPeersRequest(pattern),
- (s, c, req, done) -> s.listReplicationPeers(c, req, done),
- (resp) -> resp.getPeerDescList().stream()
- .map(ReplicationSerDeHelper::toReplicationPeerDescription)
- .collect(Collectors.toList()))).call();
+ public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
+ Optional<Pattern> pattern) {
+ return wrap(rawAdmin.listReplicationPeers(pattern));
}
@Override
public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
- CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
- listTables().whenComplete(
- (tables, error) -> {
- if (!completeExceptionally(future, error)) {
- List<TableCFs> replicatedTableCFs = new ArrayList<>();
- tables.forEach(table -> {
- Map<String, Integer> cfs = new HashMap<>();
- Stream.of(table.getColumnFamilies())
- .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
- .forEach(column -> {
- cfs.put(column.getNameAsString(), column.getScope());
- });
- if (!cfs.isEmpty()) {
- replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
- }
- });
- future.complete(replicatedTableCFs);
- }
- });
- return future;
- }
-
- @Override
- public CompletableFuture<Void> snapshot(String snapshotName, TableName tableName) {
- return snapshot(snapshotName, tableName, SnapshotType.FLUSH);
- }
-
- @Override
- public CompletableFuture<Void> snapshot(String snapshotName, TableName tableName,
- SnapshotType type) {
- return snapshot(new SnapshotDescription(snapshotName, tableName, type));
- }
-
- @Override
- public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
- SnapshotProtos.SnapshotDescription snapshot = ProtobufUtil
- .createHBaseProtosSnapshotDesc(snapshotDesc);
- try {
- ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
- } catch (IllegalArgumentException e) {
- return failedFuture(e);
- }
- CompletableFuture<Void> future = new CompletableFuture<>();
- final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
- this.<Long> newMasterCaller()
- .action(
- (controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
- stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
- resp -> resp.getExpectedTimeout())).call().whenComplete((expectedTimeout, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- TimerTask pollingTask = new TimerTask() {
- int tries = 0;
- long startTime = EnvironmentEdgeManager.currentTime();
- long endTime = startTime + expectedTimeout;
- long maxPauseTime = expectedTimeout / maxAttempts;
-
- @Override
- public void run(Timeout timeout) throws Exception {
- if (EnvironmentEdgeManager.currentTime() < endTime) {
- isSnapshotFinished(snapshotDesc).whenComplete((done, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else if (done) {
- future.complete(null);
- } else {
- // retry again after pauseTime.
- long pauseTime = ConnectionUtils.getPauseTime(
- TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
- pauseTime = Math.min(pauseTime, maxPauseTime);
- AsyncConnectionImpl.RETRY_TIMER
- .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
- }
- } );
- } else {
- future.completeExceptionally(new SnapshotCreationException("Snapshot '"
- + snapshot.getName() + "' wasn't completed in expectedTime:" + expectedTimeout
- + " ms", snapshotDesc));
- }
- }
- };
- AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
- });
- return future;
+ return wrap(rawAdmin.listReplicatedTableCFs());
+ }
+
+ @Override
+ public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
+ return wrap(rawAdmin.snapshot(snapshot));
}
@Override
public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
- return this
- .<Boolean> newMasterCaller()
- .action(
- (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
- controller,
- stub,
- IsSnapshotDoneRequest.newBuilder()
- .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
- req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
+ return wrap(rawAdmin.isSnapshotFinished(snapshot));
}
@Override
public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
- boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
- HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
- HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
- return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
+ return wrap(rawAdmin.restoreSnapshot(snapshotName));
}
@Override
public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- listSnapshots(Pattern.compile(snapshotName)).whenComplete(
- (snapshotDescriptions, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- TableName tableName = null;
- if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
- for (SnapshotDescription snap : snapshotDescriptions) {
- if (snap.getName().equals(snapshotName)) {
- tableName = snap.getTableName();
- break;
- }
- }
- }
- if (tableName == null) {
- future.completeExceptionally(new RestoreSnapshotException(
- "Unable to find the table name for snapshot=" + snapshotName));
- return;
- }
- final TableName finalTableName = tableName;
- tableExists(finalTableName)
- .whenComplete((exists, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else if (!exists) {
- // if table does not exist, then just clone snapshot into new table.
- completeConditionalOnFuture(future,
- internalRestoreSnapshot(snapshotName, finalTableName));
- } else {
- isTableDisabled(finalTableName).whenComplete(
- (disabled, err4) -> {
- if (err4 != null) {
- future.completeExceptionally(err4);
- } else if (!disabled) {
- future.completeExceptionally(new TableNotDisabledException(finalTableName));
- } else {
- completeConditionalOnFuture(future,
- restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
- }
- });
- }
- } );
- });
- return future;
- }
-
- private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
- boolean takeFailSafeSnapshot) {
- if (takeFailSafeSnapshot) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- // Step.1 Take a snapshot of the current state
- String failSafeSnapshotSnapshotNameFormat = this.connection.getConfiguration().get(
- HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
- HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
- final String failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotNameFormat
- .replace("{snapshot.name}", snapshotName)
- .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
- .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
- LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
- snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else {
- // Step.2 Restore snapshot
- internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, err2) -> {
- if (err2 != null) {
- // Step.3.a Something went wrong during the restore and try to rollback.
- internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName).whenComplete(
- (void3, err3) -> {
- if (err3 != null) {
- future.completeExceptionally(err3);
- } else {
- String msg = "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
- + failSafeSnapshotSnapshotName + " succeeded.";
- future.completeExceptionally(new RestoreSnapshotException(msg));
- }
- });
- } else {
- // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
- LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
- deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete(
- (ret3, err3) -> {
- if (err3 != null) {
- LOG.error(
- "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, err3);
- future.completeExceptionally(err3);
- } else {
- future.complete(ret3);
- }
- });
- }
- } );
- }
- } );
- return future;
- } else {
- return internalRestoreSnapshot(snapshotName, tableName);
- }
- }
-
- private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
- CompletableFuture<T> parentFuture) {
- parentFuture.whenComplete((res, err) -> {
- if (err != null) {
- dependentFuture.completeExceptionally(err);
- } else {
- dependentFuture.complete(res);
- }
- });
+ return wrap(rawAdmin.restoreSnapshot(snapshotName, takeFailSafeSnapshot));
}
@Override
public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- tableExists(tableName).whenComplete((exists, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else if (exists) {
- future.completeExceptionally(new TableExistsException(tableName));
- } else {
- completeConditionalOnFuture(future, internalRestoreSnapshot(snapshotName, tableName));
- }
- });
- return future;
- }
-
- private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName) {
- SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
- .setName(snapshotName).setTable(tableName.getNameAsString()).build();
- try {
- ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
- } catch (IllegalArgumentException e) {
- return failedFuture(e);
- }
- return waitProcedureResult(this
- .<Long> newMasterCaller()
- .action(
- (controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(
- controller, stub, RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot)
- .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(), (s, c, req,
- done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())).call());
- }
-
- @Override
- public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
- return this
- .<List<SnapshotDescription>> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(
- controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(), (s, c, req,
- done) -> s.getCompletedSnapshots(c, req, done), resp -> resp.getSnapshotsList()
- .stream().map(ProtobufUtil::createSnapshotDesc).collect(Collectors.toList())))
- .call();
- }
-
- @Override
- public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
- CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
- listSnapshots()
- .whenComplete(
- (snapshotDescList, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- if (snapshotDescList == null || snapshotDescList.isEmpty()) {
- future.complete(Collections.emptyList());
- return;
- }
- future.complete(snapshotDescList.stream()
- .filter(snap -> pattern.matcher(snap.getName()).matches())
- .collect(Collectors.toList()));
- });
- return future;
+ return wrap(rawAdmin.cloneSnapshot(snapshotName, tableName));
}
@Override
- public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
- Pattern snapshotNamePattern) {
- CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
- listTableNames(Optional.ofNullable(tableNamePattern), false).whenComplete(
- (tableNames, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- if (tableNames == null || tableNames.size() <= 0) {
- future.complete(Collections.emptyList());
- return;
- }
- listSnapshots(snapshotNamePattern).whenComplete(
- (snapshotDescList, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- return;
- }
- if (snapshotDescList == null || snapshotDescList.isEmpty()) {
- future.complete(Collections.emptyList());
- return;
- }
- future.complete(snapshotDescList.stream()
- .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
- .collect(Collectors.toList()));
- });
- });
- return future;
+ public CompletableFuture<List<SnapshotDescription>> listSnapshots(Optional<Pattern> pattern) {
+ return wrap(rawAdmin.listSnapshots(pattern));
}
@Override
- public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
- return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
+ public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
+ Pattern snapshotNamePattern) {
+ return wrap(rawAdmin.listTableSnapshots(tableNamePattern, snapshotNamePattern));
}
@Override
- public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
- return deleteTableSnapshots(null, snapshotNamePattern);
+ public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
+ return wrap(rawAdmin.deleteSnapshot(snapshotName));
}
@Override
public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
Pattern snapshotNamePattern) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- listTableSnapshots(tableNamePattern, snapshotNamePattern).whenComplete(
- ((snapshotDescriptions, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
- future.complete(null);
- return;
- }
- List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>();
- snapshotDescriptions.forEach(snapDesc -> deleteSnapshotFutures
- .add(internalDeleteSnapshot(snapDesc)));
- CompletableFuture.allOf(
- deleteSnapshotFutures.toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()]))
- .thenAccept(v -> future.complete(v));
- }));
- return future;
- }
-
- private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
- controller,
- stub,
- DeleteSnapshotRequest.newBuilder()
- .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
- req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
+ return wrap(rawAdmin.deleteTableSnapshots(tableNamePattern, snapshotNamePattern));
}
@Override
public CompletableFuture<Void> execProcedure(String signature, String instance,
Map<String, String> props) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- ProcedureDescription procDesc =
- ProtobufUtil.buildProcedureDescription(signature, instance, props);
- this.<Long> newMasterCaller()
- .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
- controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
- (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
- .call().whenComplete((expectedTimeout, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- TimerTask pollingTask = new TimerTask() {
- int tries = 0;
- lon
<TRUNCATED>