You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bh...@apache.org on 2020/02/11 01:05:53 UTC
[hbase] 05/11: HBASE-23648: Re-use underlying connection registry
in RawAsyncHBaseAdmin (#994)
This is an automated email from the ASF dual-hosted git repository.
bharathv pushed a commit to branch HBASE-18095/client-locate-meta-no-zookeeper
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit d8533f66e3fde7a2e1996f6575f0adc8b9dc4620
Author: Bharath Vissapragada <bh...@apache.org>
AuthorDate: Thu Jan 9 12:27:09 2020 -0800
HBASE-23648: Re-use underlying connection registry in RawAsyncHBaseAdmin (#994)
* HBASE-23648: Re-use underlying connection registry in RawAsyncHBaseAdmin
No need to create and close a new registry on demand. Audited other
usages of getRegistry() and the code looks fine.
* Fix checkstyle issues in RawAsyncHBaseAdmin
---
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 110 +++++++++------------
1 file changed, 47 insertions(+), 63 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 3e5bea3..69bd611 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
-
import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel;
import java.io.IOException;
@@ -46,7 +45,6 @@ import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
import org.apache.hadoop.hbase.CacheEvictionStats;
@@ -99,14 +97,12 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
-
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
@@ -755,7 +751,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
@Override
- public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
+ public CompletableFuture<Void> addColumnFamily(
+ TableName tableName, ColumnFamilyDescriptor columnFamily) {
return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
@@ -809,10 +806,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
.<NamespaceDescriptor> newMasterCaller()
.action(
(controller, stub) -> this
- .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
- controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
- req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
- .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
+ .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor>
+ call(controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name),
+ (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), (resp)
+ -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
}
@Override
@@ -830,13 +827,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
return this
- .<List<NamespaceDescriptor>> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
- controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
- done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
- .toNamespaceDescriptorList(resp))).call();
+ .<List<NamespaceDescriptor>> newMasterCaller().action((controller, stub) -> this
+ .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse,
+ List<NamespaceDescriptor>> call(controller, stub,
+ ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) ->
+ s.listNamespaceDescriptors(c, req, done),
+ (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))).call();
}
@Override
@@ -1080,10 +1076,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
if (TableName.META_TABLE_NAME.equals(tableName)) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
- // For meta table, we use zk to fetch all locations.
- ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(
- connection.getConfiguration());
- addListener(registry.getMetaRegionLocations(), (metaRegions, err) -> {
+ addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else if (metaRegions == null || metaRegions.isEmpty() ||
@@ -1092,8 +1085,6 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
} else {
future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
}
- // close the registry.
- IOUtils.closeQuietly(registry);
});
return future;
} else {
@@ -1689,11 +1680,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
return this.<ReplicationPeerConfig> newMasterCaller().action((controller, stub) -> this
- .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
- controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
- (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
- (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
- .call();
+ .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig>
+ call(controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
+ (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
+ (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
}
@Override
@@ -1710,13 +1700,13 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
SyncReplicationState clusterState) {
- return this
- .<TransitReplicationPeerSyncReplicationStateRequest, TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
+ return this.<TransitReplicationPeerSyncReplicationStateRequest,
+ TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
clusterState),
- (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
- (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
- () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
+ (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
+ (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
+ () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
}
@Override
@@ -1786,11 +1776,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return this
.<List<ReplicationPeerDescription>> newMasterCaller()
.action(
- (controller, stub) -> this
- .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
- controller,
- stub,
- request,
+ (controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
+ List<ReplicationPeerDescription>> call(controller, stub, request,
(s, c, req, done) -> s.listReplicationPeers(c, req, done),
(resp) -> resp.getPeerDescList().stream()
.map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
@@ -2299,11 +2286,13 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
@Override
- public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload) {
+ public CompletableFuture<Void> decommissionRegionServers(
+ List<ServerName> servers, boolean offload) {
return this.<Void> newMasterCaller()
.action((controller, stub) -> this
.<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
- controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
+ controller, stub,
+ RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
(s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
.call();
}
@@ -2325,11 +2314,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
public CompletableFuture<Void> recommissionRegionServer(ServerName server,
List<byte[]> encodedRegionNames) {
return this.<Void> newMasterCaller()
- .action((controller, stub) -> this
- .<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(controller,
- stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
- (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
- .call();
+ .action((controller, stub) ->
+ this.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(
+ controller, stub, RequestConverter.buildRecommissionRegionServerRequest(
+ server, encodedRegionNames), (s, c, req, done) -> s.recommissionRegionServer(
+ c, req, done), resp -> null)).call();
}
/**
@@ -2395,7 +2384,6 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
* Get the region info for the passed region name. The region name may be a full region name or
* encoded region name. If the region does not found, then it'll throw an UnknownRegionException
* wrapped by a {@link CompletableFuture}
- * @param regionNameOrEncodedRegionName
* @return region info, wrapped by a {@link CompletableFuture}
*/
private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
@@ -2886,10 +2874,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
.<List<SecurityCapability>> newMasterCaller()
.action(
(controller, stub) -> this
- .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>> call(
- controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), (s, c, req,
- done) -> s.getSecurityCapabilities(c, req, done), (resp) -> ProtobufUtil
- .toSecurityCapabilityList(resp.getCapabilitiesList()))).call();
+ .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>>
+ call(controller, stub, SecurityCapabilitiesRequest.newBuilder().build(),
+ (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
+ (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
+ .call();
}
@Override
@@ -3066,14 +3055,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
MajorCompactionTimestampRequest request =
MajorCompactionTimestampRequest.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
- return this
- .<Optional<Long>> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
- controller, stub, request,
- (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
- ProtobufUtil::toOptionalTimestamp)).call();
+ return this.<Optional<Long>> newMasterCaller().action((controller, stub) ->
+ this.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>>
+ call(controller, stub, request, (s, c, req, done) -> s.getLastMajorCompactionTimestamp(
+ c, req, done), ProtobufUtil::toOptionalTimestamp)).call();
}
@Override
@@ -3213,11 +3198,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
public CompletableFuture<Boolean> isBalancerEnabled() {
return this
.<Boolean> newMasterCaller()
- .action(
- (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
- controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
- (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
- .call();
+ .action((controller, stub) ->
+ this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(controller,
+ stub, RequestConverter.buildIsBalancerEnabledRequest(), (s, c, req, done)
+ -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())).call();
}
@Override