You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/08/04 16:16:41 UTC
[hbase] branch branch-2 updated: HBASE-26150 Let region server also
carry ClientMetaService (#3550)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 620806e HBASE-26150 Let region server also carry ClientMetaService (#3550)
620806e is described below
commit 620806e3fb27947fa3b36f24921060e328f4b39d
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Aug 4 23:44:10 2021 +0800
HBASE-26150 Let region server also carry ClientMetaService (#3550)
Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
.../client/AbstractRpcBasedConnectionRegistry.java | 272 ++++++++++++++++++++
.../hbase/client/ConnectionRegistryFactory.java | 4 +-
.../hbase/client/MasterAddressRefresher.java | 126 ---------
.../apache/hadoop/hbase/client/MasterRegistry.java | 283 +++------------------
.../hbase/client/RegistryEndpointsRefresher.java | 137 ++++++++++
.../hadoop/hbase/client/RpcConnectionRegistry.java | 99 +++++++
.../apache/hadoop/hbase/security/SecurityInfo.java | 9 +-
...s.java => TestRpcBasedRegistryHedgedReads.java} | 66 +++--
.../src/main/protobuf/Master.proto | 62 -----
.../src/main/protobuf/Registry.proto | 108 ++++++++
.../{master => }/MetaRegionLocationCache.java | 14 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 12 +-
.../hadoop/hbase/master/MasterRpcServices.java | 57 +++--
.../hadoop/hbase/regionserver/HRegionServer.java | 22 ++
.../hadoop/hbase/regionserver/RSRpcServices.java | 106 ++++++--
.../hadoop/hbase/security/HBasePolicyProvider.java | 14 +-
.../hadoop/hbase/client/FromClientSideBase.java | 2 +-
.../hbase/client/TestMasterAddressRefresher.java | 113 --------
.../hadoop/hbase/client/TestMasterRegistry.java | 16 +-
.../hbase/client/TestMetaRegionLocationCache.java | 19 +-
.../client/TestRegistryEndpointsRefresher.java | 114 +++++++++
.../hbase/client/TestRpcConnectionRegistry.java | 103 ++++++++
.../hbase/client/TestScannersFromClientSide.java | 2 +-
.../hbase/master/TestClientMetaServiceRPCs.java | 16 +-
.../zookeeper/RegionServerAddressTracker.java | 78 ++++++
.../zookeeper/TestRegionServerAddressTracker.java | 121 +++++++++
26 files changed, 1304 insertions(+), 671 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java
new file mode 100644
index 0000000..6a2919e
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
+import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
+
+/**
+ * Base class for rpc based connection registry implementation.
+ * <p/>
+ * The implementation needs a bootstrap node list in configuration, and then it will use the methods
+ * in {@link ClientMetaService} to refresh the connection registry end points.
+ * <p/>
+ * It also supports hedged reads, the default fan out value is 2.
+ * <p/>
+ * For the actual configuration names, see javadoc of sub classes.
+ */
+@InterfaceAudience.Private
+abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry {
+
+ /** Default value for the fan out of hedged requests. **/
+ public static final int HEDGED_REQS_FANOUT_DEFAULT = 2;
+
+ private final int hedgedReadFanOut;
+
+ // Configured list of end points to probe the meta information from.
+ private volatile ImmutableMap<ServerName, ClientMetaService.Interface> addr2Stub;
+
+ // RPC client used to talk to the masters.
+ private final RpcClient rpcClient;
+ private final RpcControllerFactory rpcControllerFactory;
+ private final int rpcTimeoutMs;
+
+ private final RegistryEndpointsRefresher registryEndpointRefresher;
+
+ protected AbstractRpcBasedConnectionRegistry(Configuration conf,
+ String hedgedReqsFanoutConfigName, String refreshIntervalSecsConfigName,
+ String minRefreshIntervalSecsConfigName) throws IOException {
+ this.hedgedReadFanOut =
+ Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT));
+ rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+ conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+ // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
+ // this through the master registry...
+ // This is a problem as we will use the cluster id to determine the authentication method
+ rpcClient = RpcClientFactory.createClient(conf, null);
+ rpcControllerFactory = RpcControllerFactory.instantiate(conf);
+ populateStubs(getBootstrapNodes(conf));
+ registryEndpointRefresher = new RegistryEndpointsRefresher(conf, refreshIntervalSecsConfigName,
+ minRefreshIntervalSecsConfigName, this::refreshStubs);
+ registryEndpointRefresher.start();
+ }
+
+ protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;
+
+ protected abstract CompletableFuture<Set<ServerName>> fetchEndpoints();
+
+ private void refreshStubs() throws IOException {
+ populateStubs(FutureUtils.get(fetchEndpoints()));
+ }
+
+ private void populateStubs(Set<ServerName> addrs) throws IOException {
+ Preconditions.checkNotNull(addrs);
+ ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
+ ImmutableMap.builderWithExpectedSize(addrs.size());
+ User user = User.getCurrent();
+ for (ServerName masterAddr : addrs) {
+ builder.put(masterAddr,
+ ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
+ }
+ addr2Stub = builder.build();
+ }
+
+ /**
+ * For describing the actual asynchronous rpc call.
+ * <p/>
+ * Typically, you can use lambda expression to implement this interface as
+ *
+ * <pre>
+ * (c, s, d) -> s.xxx(c, your request here, d)
+ * </pre>
+ */
+ @FunctionalInterface
+ protected interface Callable<T> {
+ void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
+ }
+
+ private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
+ Callable<T> callable) {
+ HBaseRpcController controller = rpcControllerFactory.newController();
+ CompletableFuture<T> future = new CompletableFuture<>();
+ callable.call(controller, stub, resp -> {
+ if (controller.failed()) {
+ IOException failureReason = controller.getFailed();
+ future.completeExceptionally(failureReason);
+ if (ClientExceptionsUtil.isConnectionException(failureReason)) {
+ // RPC has failed, trigger a refresh of end points. We can have some spurious
+ // refreshes, but that is okay since the RPC is not expensive and not in a hot path.
+ registryEndpointRefresher.refreshNow();
+ }
+ } else {
+ future.complete(resp);
+ }
+ });
+ return future;
+ }
+
+ private IOException badResponse(String debug) {
+ return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
+ }
+
+ /**
+ * send requests concurrently to hedgedReadsFanout end points. If any of the request is succeeded,
+ * we will complete the future and quit. If all the requests in one round are failed, we will
+ * start another round to send requests concurrently tohedgedReadsFanout end points. If all end
+ * points have been tried and all of them are failed, we will fail the future.
+ */
+ private <T extends Message> void groupCall(CompletableFuture<T> future, Set<ServerName> servers,
+ List<ClientMetaService.Interface> stubs, int startIndexInclusive, Callable<T> callable,
+ Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
+ int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, stubs.size());
+ AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
+ for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
+ addListener(call(stubs.get(i), callable), (r, e) -> {
+ // a simple check to skip all the later operations earlier
+ if (future.isDone()) {
+ return;
+ }
+ if (e == null && !isValidResp.test(r)) {
+ e = badResponse(debug);
+ }
+ if (e != null) {
+ // make sure when remaining reaches 0 we have all exceptions in the errors queue
+ errors.add(e);
+ if (remaining.decrementAndGet() == 0) {
+ if (endIndexExclusive == stubs.size()) {
+ // we are done, complete the future with exception
+ RetriesExhaustedException ex =
+ new RetriesExhaustedException("masters", stubs.size(), new ArrayList<>(errors));
+ future.completeExceptionally(new MasterRegistryFetchException(servers, ex));
+ } else {
+ groupCall(future, servers, stubs, endIndexExclusive, callable, isValidResp, debug,
+ errors);
+ }
+ }
+ } else {
+ // do not need to decrement the counter any more as we have already finished the future.
+ future.complete(r);
+ }
+ });
+ }
+ }
+
+ protected final <T extends Message> CompletableFuture<T> call(Callable<T> callable,
+ Predicate<T> isValidResp, String debug) {
+ ImmutableMap<ServerName, ClientMetaService.Interface> addr2StubRef = addr2Stub;
+ Set<ServerName> servers = addr2StubRef.keySet();
+ List<ClientMetaService.Interface> stubs = new ArrayList<>(addr2StubRef.values());
+ Collections.shuffle(stubs, ThreadLocalRandom.current());
+ CompletableFuture<T> future = new CompletableFuture<>();
+ groupCall(future, servers, stubs, 0, callable, isValidResp, debug,
+ new ConcurrentLinkedQueue<>());
+ return future;
+ }
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ Set<ServerName> getParsedServers() {
+ return addr2Stub.keySet();
+ }
+
+ /**
+ * Simple helper to transform the result of getMetaRegionLocations() rpc.
+ */
+ private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
+ List<HRegionLocation> regionLocations = new ArrayList<>();
+ resp.getMetaLocationsList()
+ .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
+ return new RegionLocations(regionLocations);
+ }
+
+ @Override
+ public CompletableFuture<RegionLocations> getMetaRegionLocations() {
+ return this
+ .<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
+ GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
+ "getMetaLocationsCount")
+ .thenApply(AbstractRpcBasedConnectionRegistry::transformMetaRegionLocations);
+ }
+
+ @Override
+ public CompletableFuture<String> getClusterId() {
+ return this
+ .<GetClusterIdResponse> call(
+ (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
+ GetClusterIdResponse::hasClusterId, "getClusterId()")
+ .thenApply(GetClusterIdResponse::getClusterId);
+ }
+
+ @Override
+ public CompletableFuture<ServerName> getActiveMaster() {
+ return this
+ .<GetActiveMasterResponse> call(
+ (c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
+ GetActiveMasterResponse::hasServerName, "getActiveMaster()")
+ .thenApply(resp -> ProtobufUtil.toServerName(resp.getServerName()));
+ }
+
+ @Override
+ public void close() {
+ if (registryEndpointRefresher != null) {
+ registryEndpointRefresher.stop();
+ }
+ if (rpcClient != null) {
+ rpcClient.close();
+ }
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
index 9308443..e9af7e7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
@@ -35,8 +35,8 @@ final class ConnectionRegistryFactory {
* @return The connection registry implementation to use.
*/
static ConnectionRegistry getRegistry(Configuration conf) {
- Class<? extends ConnectionRegistry> clazz = conf.getClass(
- CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
+ Class<? extends ConnectionRegistry> clazz =
+ conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
ConnectionRegistry.class);
return ReflectionUtils.newInstance(clazz, conf);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java
deleted file mode 100644
index 3cbb9f7..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
-
-/**
- * Thread safe utility that keeps master end points used by {@link MasterRegistry} up to date. This
- * uses the RPC {@link ClientMetaService#getMasters} to fetch the latest list of registered masters.
- * By default the refresh happens periodically (configured via
- * {@link #PERIODIC_REFRESH_INTERVAL_SECS}). The refresh can also be triggered on demand via
- * {@link #refreshNow()}. To prevent a flood of on-demand refreshes we expect that any attempts two
- * should be spaced at least {@link #MIN_SECS_BETWEEN_REFRESHES} seconds apart.
- */
-@InterfaceAudience.Private
-public class MasterAddressRefresher implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(MasterAddressRefresher.class);
- public static final String PERIODIC_REFRESH_INTERVAL_SECS =
- "hbase.client.master_registry.refresh_interval_secs";
- private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
- public static final String MIN_SECS_BETWEEN_REFRESHES =
- "hbase.client.master_registry.min_secs_between_refreshes";
- private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;
-
- private final ExecutorService pool;
- private final MasterRegistry registry;
- private final long periodicRefreshMs;
- private final long timeBetweenRefreshesMs;
- private final Object refreshMasters = new Object();
-
- @Override
- public void close() {
- pool.shutdownNow();
- }
-
- /**
- * Thread that refreshes the master end points until it is interrupted via {@link #close()}.
- * Multiple callers attempting to refresh at the same time synchronize on {@link #refreshMasters}.
- */
- private class RefreshThread implements Runnable {
- @Override
- public void run() {
- long lastRpcTs = 0;
- while (!Thread.interrupted()) {
- try {
- // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
- // have duplicate refreshes because once the thread is past the wait(), notify()s are
- // ignored until the thread is back to the waiting state.
- synchronized (refreshMasters) {
- refreshMasters.wait(periodicRefreshMs);
- }
- long currentTs = EnvironmentEdgeManager.currentTime();
- if (lastRpcTs != 0 && currentTs - lastRpcTs <= timeBetweenRefreshesMs) {
- continue;
- }
- lastRpcTs = currentTs;
- LOG.debug("Attempting to refresh master address end points.");
- Set<ServerName> newMasters = new HashSet<>(registry.getMasters().get());
- registry.populateMasterStubs(newMasters);
- LOG.debug("Finished refreshing master end points. {}", newMasters);
- } catch (InterruptedException e) {
- LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e);
- break;
- } catch (ExecutionException | IOException e) {
- LOG.debug("Error populating latest list of masters.", e);
- }
- }
- LOG.info("Master end point refresher loop exited.");
- }
- }
-
- MasterAddressRefresher(Configuration conf, MasterRegistry registry) {
- pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("master-registry-refresh-end-points").setDaemon(true).build());
- periodicRefreshMs = TimeUnit.SECONDS.toMillis(conf.getLong(PERIODIC_REFRESH_INTERVAL_SECS,
- PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
- timeBetweenRefreshesMs = TimeUnit.SECONDS.toMillis(conf.getLong(MIN_SECS_BETWEEN_REFRESHES,
- MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
- Preconditions.checkArgument(periodicRefreshMs > 0);
- Preconditions.checkArgument(timeBetweenRefreshesMs < periodicRefreshMs);
- this.registry = registry;
- pool.submit(new RefreshThread());
- }
-
- /**
- * Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh.
- * See class comment for details.
- */
- void refreshNow() {
- synchronized (refreshMasters) {
- refreshMasters.notify();
- }
- }
-}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
index 1a46579..76477aa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
@@ -19,50 +19,27 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
import static org.apache.hadoop.hbase.util.DNS.getHostname;
-import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
-import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.RpcClient;
-import org.apache.hadoop.hbase.ipc.RpcClientFactory;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.util.DNS.ServerType;
+import org.apache.yetus.audience.InterfaceAudience;
+
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
-import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
-import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
/**
* Master based registry implementation. Makes RPCs to the configured master addresses from config
@@ -70,40 +47,31 @@ import org.apache.yetus.audience.InterfaceAudience;
* <p/>
* It supports hedged reads, set the fan out of the requests batch by
* {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable
- * it(the default value is {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT}).
+ * it(the default value is {@link AbstractRpcBasedConnectionRegistry#HEDGED_REQS_FANOUT_DEFAULT}).
* <p/>
* TODO: Handle changes to the configuration dynamically without having to restart the client.
*/
@InterfaceAudience.Private
-public class MasterRegistry implements ConnectionRegistry {
+public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
/** Configuration key that controls the fan out of requests **/
public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
"hbase.client.master_registry.hedged.fanout";
- /** Default value for the fan out of hedged requests. **/
- public static final int MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT = 2;
-
- private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
+ public static final String MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS =
+ "hbase.client.master_registry.refresh_interval_secs";
- private final int hedgedReadFanOut;
+ public static final String MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES =
+ "hbase.client.master_registry.min_secs_between_refreshes";
- // Configured list of masters to probe the meta information from.
- private volatile ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
-
- // RPC client used to talk to the masters.
- private final RpcClient rpcClient;
- private final RpcControllerFactory rpcControllerFactory;
- private final int rpcTimeoutMs;
-
- protected final MasterAddressRefresher masterAddressRefresher;
+ private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
/**
* Parses the list of master addresses from the provided configuration. Supported format is comma
* separated host[:port] values. If no port number if specified, default master port is assumed.
* @param conf Configuration to parse from.
*/
- private static Set<ServerName> parseMasterAddrs(Configuration conf) throws UnknownHostException {
+ public static Set<ServerName> parseMasterAddrs(Configuration conf) throws UnknownHostException {
Set<ServerName> masterAddrs = new HashSet<>();
String configuredMasters = getMasterAddr(conf);
for (String masterAddr : configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
@@ -116,31 +84,18 @@ public class MasterRegistry implements ConnectionRegistry {
}
MasterRegistry(Configuration conf) throws IOException {
- this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
- MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
- rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
- conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
- // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
- // this through the master registry...
- // This is a problem as we will use the cluster id to determine the authentication method
- rpcClient = RpcClientFactory.createClient(conf, null);
- rpcControllerFactory = RpcControllerFactory.instantiate(conf);
- // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters
- // by fetching the end points from this list.
- populateMasterStubs(parseMasterAddrs(conf));
- masterAddressRefresher = new MasterAddressRefresher(conf, this);
+ super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
+ MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
}
- void populateMasterStubs(Set<ServerName> masters) throws IOException {
- Preconditions.checkNotNull(masters);
- ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
- ImmutableMap.builderWithExpectedSize(masters.size());
- User user = User.getCurrent();
- for (ServerName masterAddr : masters) {
- builder.put(masterAddr,
- ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
- }
- masterAddr2Stub = builder.build();
+ @Override
+ protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
+ return parseMasterAddrs(conf);
+ }
+
+ @Override
+ protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
+ return getMasters();
}
/**
@@ -158,186 +113,18 @@ public class MasterRegistry implements ConnectionRegistry {
return String.format("%s:%d", hostname, port);
}
- /**
- * For describing the actual asynchronous rpc call.
- * <p/>
- * Typically, you can use lambda expression to implement this interface as
- *
- * <pre>
- * (c, s, d) -> s.xxx(c, your request here, d)
- * </pre>
- */
- @FunctionalInterface
- private interface Callable<T> {
- void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
- }
-
- private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
- Callable<T> callable) {
- HBaseRpcController controller = rpcControllerFactory.newController();
- CompletableFuture<T> future = new CompletableFuture<>();
- callable.call(controller, stub, resp -> {
- if (controller.failed()) {
- IOException failureReason = controller.getFailed();
- future.completeExceptionally(failureReason);
- if (ClientExceptionsUtil.isConnectionException(failureReason)) {
- // RPC has failed, trigger a refresh of master end points. We can have some spurious
- // refreshes, but that is okay since the RPC is not expensive and not in a hot path.
- masterAddressRefresher.refreshNow();
- }
- } else {
- future.complete(resp);
- }
- });
- return future;
- }
-
- private IOException badResponse(String debug) {
- return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
- }
-
- /**
- * send requests concurrently to hedgedReadsFanout masters. If any of the request is succeeded, we
- * will complete the future and quit. If all the requests in one round are failed, we will start
- * another round to send requests concurrently tohedgedReadsFanout masters. If all masters have
- * been tried and all of them are failed, we will fail the future.
- */
- private <T extends Message> void groupCall(CompletableFuture<T> future,
- Set<ServerName> masterServers, List<ClientMetaService.Interface> masterStubs,
- int startIndexInclusive, Callable<T> callable, Predicate<T> isValidResp, String debug,
- ConcurrentLinkedQueue<Throwable> errors) {
- int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size());
- AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
- for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
- addListener(call(masterStubs.get(i), callable), (r, e) -> {
- // a simple check to skip all the later operations earlier
- if (future.isDone()) {
- return;
- }
- if (e == null && !isValidResp.test(r)) {
- e = badResponse(debug);
- }
- if (e != null) {
- // make sure when remaining reaches 0 we have all exceptions in the errors queue
- errors.add(e);
- if (remaining.decrementAndGet() == 0) {
- if (endIndexExclusive == masterStubs.size()) {
- // we are done, complete the future with exception
- RetriesExhaustedException ex = new RetriesExhaustedException("masters",
- masterStubs.size(), new ArrayList<>(errors));
- future.completeExceptionally(
- new MasterRegistryFetchException(masterServers, ex));
- } else {
- groupCall(future, masterServers, masterStubs, endIndexExclusive, callable,
- isValidResp, debug, errors);
- }
- }
- } else {
- // do not need to decrement the counter any more as we have already finished the future.
- future.complete(r);
- }
- });
- }
- }
-
- private <T extends Message> CompletableFuture<T> call(Callable<T> callable,
- Predicate<T> isValidResp, String debug) {
- ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2StubRef = masterAddr2Stub;
- Set<ServerName> masterServers = masterAddr2StubRef.keySet();
- List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2StubRef.values());
- Collections.shuffle(masterStubs, ThreadLocalRandom.current());
- CompletableFuture<T> future = new CompletableFuture<>();
- groupCall(future, masterServers, masterStubs, 0, callable, isValidResp, debug,
- new ConcurrentLinkedQueue<>());
- return future;
- }
-
- /**
- * Simple helper to transform the result of getMetaRegionLocations() rpc.
- */
- private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
- List<HRegionLocation> regionLocations = new ArrayList<>();
- resp.getMetaLocationsList()
- .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
- return new RegionLocations(regionLocations);
- }
-
- @Override
- public CompletableFuture<RegionLocations> getMetaRegionLocations() {
- return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
- GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
- "getMetaLocationsCount").thenApply(MasterRegistry::transformMetaRegionLocations);
+ private static Set<ServerName> transformServerNames(GetMastersResponse resp) {
+ return resp.getMasterServersList().stream()
+ .map(s -> ProtobufUtil.toServerName(s.getServerName())).collect(Collectors.toSet());
}
- @Override
- public CompletableFuture<String> getClusterId() {
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/(.*/MasterRegistry.java|src/test/.*)")
+ CompletableFuture<Set<ServerName>> getMasters() {
return this
- .<GetClusterIdResponse> call(
- (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
- GetClusterIdResponse::hasClusterId, "getClusterId()")
- .thenApply(GetClusterIdResponse::getClusterId);
- }
-
- private static boolean hasActiveMaster(GetMastersResponse resp) {
- List<GetMastersResponseEntry> activeMasters =
- resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
- Collectors.toList());
- return activeMasters.size() == 1;
- }
-
- private static ServerName filterActiveMaster(GetMastersResponse resp) throws IOException {
- List<GetMastersResponseEntry> activeMasters =
- resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
- Collectors.toList());
- if (activeMasters.size() != 1) {
- throw new IOException(String.format("Incorrect number of active masters encountered." +
- " Expected: 1 found: %d. Content: %s", activeMasters.size(), activeMasters));
- }
- return ProtobufUtil.toServerName(activeMasters.get(0).getServerName());
- }
-
- @Override
- public CompletableFuture<ServerName> getActiveMaster() {
- CompletableFuture<ServerName> future = new CompletableFuture<>();
- addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
- MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> {
- if (ex != null) {
- future.completeExceptionally(ex);
- }
- ServerName result = null;
- try {
- result = filterActiveMaster((GetMastersResponse)resp);
- } catch (IOException e) {
- future.completeExceptionally(e);
- }
- future.complete(result);
- });
- return future;
- }
-
- private static List<ServerName> transformServerNames(GetMastersResponse resp) {
- return resp.getMasterServersList().stream().map(s -> ProtobufUtil.toServerName(
- s.getServerName())).collect(Collectors.toList());
- }
-
- CompletableFuture<List<ServerName>> getMasters() {
- return this
- .<GetMastersResponse> call((c, s, d) -> s.getMasters(
- c, GetMastersRequest.getDefaultInstance(), d), r -> r.getMasterServersCount() != 0,
- "getMasters()").thenApply(MasterRegistry::transformServerNames);
- }
-
- Set<ServerName> getParsedMasterServers() {
- return masterAddr2Stub.keySet();
- }
-
- @Override
- public void close() {
- if (masterAddressRefresher != null) {
- masterAddressRefresher.close();
- }
- if (rpcClient != null) {
- rpcClient.close();
- }
+ .<GetMastersResponse> call(
+ (c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
+ r -> r.getMasterServersCount() != 0, "getMasters()")
+ .thenApply(MasterRegistry::transformServerNames);
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java
new file mode 100644
index 0000000..7eb81b0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+/**
+ * Thread safe utility that keeps registry end points used by {@link ConnectionRegistry} up to date.
+ * By default the refresh happens periodically (configured via {@code intervalSecsConfigName}). The
+ * refresh can also be triggered on demand via {@link #refreshNow()}. To prevent a flood of
+ * on-demand refreshes we expect that any attempts two should be spaced at least
+ * {@code minIntervalSecsConfigName} seconds apart.
+ */
+@InterfaceAudience.Private
+class RegistryEndpointsRefresher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class);
+
+ public static final String PERIODIC_REFRESH_INTERVAL_SECS =
+ "hbase.client.rpc_registry.refresh_interval_secs";
+ private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
+
+ public static final String MIN_SECS_BETWEEN_REFRESHES =
+ "hbase.client.rpc_registry.min_secs_between_refreshes";
+ private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;
+
+ private final Thread thread;
+ private final Refresher refresher;
+ private final long periodicRefreshMs;
+ private final long minTimeBetweenRefreshesMs;
+
+ private boolean refreshNow = false;
+ private boolean stopped = false;
+
+ public void start() {
+ thread.start();
+ }
+
+ public synchronized void stop() {
+ stopped = true;
+ notifyAll();
+ }
+
+ // The main loop for the refresh thread.
+ private void mainLoop() {
+ long lastRefreshTime = EnvironmentEdgeManager.currentTime();
+ for (;;) {
+ synchronized (this) {
+ for (;;) {
+ if (stopped) {
+ LOG.info("Registry end points refresher loop exited.");
+ return;
+ }
+ // if refreshNow is true, then we will wait until minTimeBetweenRefreshesMs elapsed,
+ // otherwise wait until periodicRefreshMs elapsed
+ long waitTime = (refreshNow ? minTimeBetweenRefreshesMs : periodicRefreshMs) -
+ (EnvironmentEdgeManager.currentTime() - lastRefreshTime);
+ if (waitTime <= 0) {
+ break;
+ }
+ try {
+ wait(waitTime);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted during wait", e);
+ Thread.currentThread().interrupt();
+ continue;
+ }
+ }
+ // we are going to refresh, reset this flag
+ refreshNow = false;
+ }
+ LOG.debug("Attempting to refresh registry end points");
+ try {
+ refresher.refresh();
+ } catch (IOException e) {
+ LOG.warn("Error refresh registry end points", e);
+ }
+ // We do not think it is a big deal to fail one time, so no matter what is refresh result, we
+ // just update this refresh time and wait for the next round. If later this becomes critical,
+ // could change to only update this value when we have done a successful refreshing.
+ lastRefreshTime = EnvironmentEdgeManager.currentTime();
+ LOG.debug("Finished refreshing registry end points");
+ }
+ }
+
+ @FunctionalInterface
+ public interface Refresher {
+
+ void refresh() throws IOException;
+ }
+
+ RegistryEndpointsRefresher(Configuration conf, String intervalSecsConfigName,
+ String minIntervalSecsConfigName, Refresher refresher) {
+ periodicRefreshMs = TimeUnit.SECONDS
+ .toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
+ minTimeBetweenRefreshesMs = TimeUnit.SECONDS
+ .toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
+ Preconditions.checkArgument(periodicRefreshMs > 0);
+ Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
+ thread = new Thread(this::mainLoop);
+ thread.setName("Registry-endpoints-refresh-end-points");
+ thread.setDaemon(true);
+ this.refresher = refresher;
+ }
+
+ /**
+ * Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh.
+ * See class comment for details.
+ */
+ synchronized void refreshNow() {
+ refreshNow = true;
+ notifyAll();
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java
new file mode 100644
index 0000000..0096bfc
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
+
+/**
+ * Rpc based connection registry. It will make use of the {@link ClientMetaService} to get registry
+ * information.
+ * <p/>
+ * It needs bootstrap node list when start up, and then it will use {@link ClientMetaService} to
+ * refresh the bootstrap node list periodically.
+ * <p/>
+ * Usually, you could set masters as the bootstrap nodes,as they will also implement the
+ * {@link ClientMetaService}, and then, we will switch to use region servers after refreshing the
+ * bootstrap nodes.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
+
+ /** Configuration key that controls the fan out of requests **/
+ public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.rpc_registry.hedged.fanout";
+
+ public static final String PERIODIC_REFRESH_INTERVAL_SECS =
+ "hbase.client.bootstrap.refresh_interval_secs";
+
+ public static final String MIN_SECS_BETWEEN_REFRESHES =
+ "hbase.client.bootstrap.min_secs_between_refreshes";
+
+ public static final String BOOTSTRAP_NODES = "hbase.client.bootstrap.servers";
+
+ private static final char ADDRS_CONF_SEPARATOR = ',';
+
+ RpcConnectionRegistry(Configuration conf) throws IOException {
+ super(conf, HEDGED_REQS_FANOUT_KEY, PERIODIC_REFRESH_INTERVAL_SECS, MIN_SECS_BETWEEN_REFRESHES);
+ }
+
+ @Override
+ protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
+ // try get bootstrap nodes config first
+ String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES);
+ if (!StringUtils.isBlank(configuredBootstrapNodes)) {
+ return Splitter.on(ADDRS_CONF_SEPARATOR).trimResults().splitToStream(configuredBootstrapNodes)
+ .map(addr -> ServerName.valueOf(addr, ServerName.NON_STARTCODE))
+ .collect(Collectors.toSet());
+ } else {
+ // otherwise, just use master addresses
+ return MasterRegistry.parseMasterAddrs(conf);
+ }
+ }
+
+ private static Set<ServerName> transformServerNames(GetBootstrapNodesResponse resp) {
+ return resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
+ .collect(Collectors.toSet());
+ }
+
+ private CompletableFuture<Set<ServerName>> getBootstrapNodes() {
+ return this
+ .<GetBootstrapNodesResponse> call(
+ (c, s, d) -> s.getBootstrapNodes(c, GetBootstrapNodesRequest.getDefaultInstance(), d),
+ r -> r.getServerNameCount() != 0, "getBootstrapNodes()")
+ .thenApply(RpcConnectionRegistry::transformServerNames);
+ }
+
+ @Override
+ protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
+ return getBootstrapNodes();
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
index e5b4de2..a2f086f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
@@ -19,14 +19,15 @@ package org.apache.hadoop.hbase.security;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
+import org.apache.yetus.audience.InterfaceAudience;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos;
/**
* Maps RPC protocol interfaces to required configuration
@@ -49,7 +50,7 @@ public class SecurityInfo {
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(MasterProtos.HbckService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
- infos.put(MasterProtos.ClientMetaService.getDescriptor().getName(),
+ infos.put(RegistryProtos.ClientMetaService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
// NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider ALSO ELSE
// new Service will not be found when all is Kerberized!!!!
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java
similarity index 78%
rename from hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java
rename to hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java
index 40a38c7..dacfbd1 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java
@@ -33,7 +33,6 @@ import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
@@ -58,22 +57,30 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
@Category({ ClientTests.class, SmallTests.class })
-public class TestMasterRegistryHedgedReads {
+public class TestRpcBasedRegistryHedgedReads {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestMasterRegistryHedgedReads.class);
+ HBaseClassTestRule.forClass(TestRpcBasedRegistryHedgedReads.class);
- private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegistryHedgedReads.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class);
+
+ private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout";
+ private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME =
+ "hbase.test.refresh.interval.secs";
+ private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME =
+ "hbase.test.min.refresh.interval.secs";
private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
private static final ExecutorService EXECUTOR =
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
+ private static Set<ServerName> BOOTSTRAP_NODES;
+
private static AtomicInteger CALLED = new AtomicInteger(0);
private static volatile int BAD_RESP_INDEX;
@@ -142,14 +149,35 @@ public class TestMasterRegistryHedgedReads {
}
}
+ private AbstractRpcBasedConnectionRegistry createRegistry(int hedged) throws IOException {
+ Configuration conf = UTIL.getConfiguration();
+ conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged);
+ return new AbstractRpcBasedConnectionRegistry(conf, HEDGED_REQS_FANOUT_CONFIG_NAME,
+ REFRESH_INTERVAL_SECS_CONFIG_NAME, MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {
+
+ @Override
+ protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
+ return BOOTSTRAP_NODES;
+ }
+
+ @Override
+ protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
+ return CompletableFuture.completedFuture(BOOTSTRAP_NODES);
+ }
+ };
+ }
+
@BeforeClass
public static void setUpBeforeClass() {
Configuration conf = UTIL.getConfiguration();
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
RpcClient.class);
- String masters = IntStream.range(0, 10).mapToObj(i -> "localhost:" + (10000 + 100 * i))
- .collect(Collectors.joining(","));
- conf.set(HConstants.MASTER_ADDRS_KEY, masters);
+ // disable refresh, we do not need to refresh in this test
+ conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE);
+ conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1);
+ BOOTSTRAP_NODES = IntStream.range(0, 10)
+ .mapToObj(i -> ServerName.valueOf("localhost", (10000 + 100 * i), ServerName.NON_STARTCODE))
+ .collect(Collectors.toSet());
}
@AfterClass
@@ -175,9 +203,7 @@ public class TestMasterRegistryHedgedReads {
@Test
public void testAllFailNoHedged() throws IOException {
- Configuration conf = UTIL.getConfiguration();
- conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 1);
- try (MasterRegistry registry = new MasterRegistry(conf)) {
+ try (AbstractRpcBasedConnectionRegistry registry = createRegistry(1)) {
assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
assertEquals(10, CALLED.get());
}
@@ -185,10 +211,8 @@ public class TestMasterRegistryHedgedReads {
@Test
public void testAllFailHedged3() throws IOException {
- Configuration conf = UTIL.getConfiguration();
- conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 3);
BAD_RESP_INDEX = 5;
- try (MasterRegistry registry = new MasterRegistry(conf)) {
+ try (AbstractRpcBasedConnectionRegistry registry = createRegistry(3)) {
assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
assertEquals(10, CALLED.get());
}
@@ -196,12 +220,10 @@ public class TestMasterRegistryHedgedReads {
@Test
public void testFirstSucceededNoHedge() throws IOException {
- Configuration conf = UTIL.getConfiguration();
- // will be set to 1
- conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 0);
GOOD_RESP_INDEXS =
IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet());
- try (MasterRegistry registry = new MasterRegistry(conf)) {
+ // will be set to 1
+ try (AbstractRpcBasedConnectionRegistry registry = createRegistry(0)) {
String clusterId = logIfError(registry.getClusterId());
assertEquals(RESP.getClusterId(), clusterId);
assertEquals(1, CALLED.get());
@@ -210,10 +232,8 @@ public class TestMasterRegistryHedgedReads {
@Test
public void testSecondRoundSucceededHedge4() throws IOException {
- Configuration conf = UTIL.getConfiguration();
- conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
GOOD_RESP_INDEXS = Collections.singleton(6);
- try (MasterRegistry registry = new MasterRegistry(conf)) {
+ try (AbstractRpcBasedConnectionRegistry registry = createRegistry(4)) {
String clusterId = logIfError(registry.getClusterId());
assertEquals(RESP.getClusterId(), clusterId);
UTIL.waitFor(5000, () -> CALLED.get() == 8);
@@ -222,10 +242,8 @@ public class TestMasterRegistryHedgedReads {
@Test
public void testSucceededWithLargestHedged() throws IOException, InterruptedException {
- Configuration conf = UTIL.getConfiguration();
- conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, Integer.MAX_VALUE);
GOOD_RESP_INDEXS = Collections.singleton(5);
- try (MasterRegistry registry = new MasterRegistry(conf)) {
+ try (AbstractRpcBasedConnectionRegistry registry = createRegistry(Integer.MAX_VALUE)) {
String clusterId = logIfError(registry.getClusterId());
assertEquals(RESP.getClusterId(), clusterId);
UTIL.waitFor(5000, () -> CALLED.get() == 10);
diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto
index 5e2e6ce..045feb4 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto
@@ -1251,65 +1251,3 @@ service HbckService {
rpc FixMeta(FixMetaRequest)
returns(FixMetaResponse);
}
-
-/** Request and response to get the clusterID for this cluster */
-message GetClusterIdRequest {
-}
-message GetClusterIdResponse {
- /** Not set if cluster ID could not be determined. */
- optional string cluster_id = 1;
-}
-
-/** Request and response to get the currently active master name for this cluster */
-message GetActiveMasterRequest {
-}
-message GetActiveMasterResponse {
- /** Not set if an active master could not be determined. */
- optional ServerName server_name = 1;
-}
-
-/** Request and response to get the current list of all registers master servers */
-message GetMastersRequest {
-}
-message GetMastersResponseEntry {
- required ServerName server_name = 1;
- required bool is_active = 2;
-}
-message GetMastersResponse {
- repeated GetMastersResponseEntry master_servers = 1;
-}
-
-/** Request and response to get the current list of meta region locations */
-message GetMetaRegionLocationsRequest {
-}
-message GetMetaRegionLocationsResponse {
- /** Not set if meta region locations could not be determined. */
- repeated RegionLocation meta_locations = 1;
-}
-
-/**
- * Implements all the RPCs needed by clients to look up cluster meta information needed for
- * connection establishment.
- */
-service ClientMetaService {
- /**
- * Get Cluster ID for this cluster.
- */
- rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse);
-
- /**
- * Get active master server name for this cluster. Retained for out of sync client and master
- * rolling upgrades. Newer clients switched to GetMasters RPC request.
- */
- rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse);
-
- /**
- * Get registered list of master servers in this cluster.
- */
- rpc GetMasters(GetMastersRequest) returns(GetMastersResponse);
-
- /**
- * Get current meta replicas' region locations.
- */
- rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse);
-}
diff --git a/hbase-protocol-shaded/src/main/protobuf/Registry.proto b/hbase-protocol-shaded/src/main/protobuf/Registry.proto
new file mode 100644
index 0000000..8dd0d1a
--- /dev/null
+++ b/hbase-protocol-shaded/src/main/protobuf/Registry.proto
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto2";
+
+// The protos for ConnectionRegistry.
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "RegistryProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "HBase.proto";
+
+/** Request and response to get the clusterID for this cluster */
+message GetClusterIdRequest {
+}
+message GetClusterIdResponse {
+ /** Not set if cluster ID could not be determined. */
+ optional string cluster_id = 1;
+}
+
+/** Request and response to get the currently active master name for this cluster */
+message GetActiveMasterRequest {
+}
+message GetActiveMasterResponse {
+ /** Not set if an active master could not be determined. */
+ optional ServerName server_name = 1;
+}
+
+/** Request and response to get the current list of all registers master servers */
+message GetMastersRequest {
+ option deprecated = true;
+}
+message GetMastersResponseEntry {
+ option deprecated = true;
+ required ServerName server_name = 1;
+ required bool is_active = 2;
+}
+message GetMastersResponse {
+ option deprecated = true;
+ repeated GetMastersResponseEntry master_servers = 1;
+}
+
+/** Request and response to get the current list of meta region locations */
+message GetMetaRegionLocationsRequest {
+}
+message GetMetaRegionLocationsResponse {
+ /** Not set if meta region locations could not be determined. */
+ repeated RegionLocation meta_locations = 1;
+}
+
+/** Request and response to get the nodes which could be used to as ClientMetaService */
+message GetBootstrapNodesRequest {
+}
+message GetBootstrapNodesResponse {
+ repeated ServerName server_name = 1;
+}
+
+/**
+ * Implements all the RPCs needed by clients to look up cluster meta information needed for
+ * connection establishment.
+ */
+service ClientMetaService {
+ /**
+ * Get Cluster ID for this cluster.
+ */
+ rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse);
+
+ /**
+ * Get active master server name for this cluster. Retained for out of sync client and master
+ * rolling upgrades. Newer clients switched to GetMasters RPC request.
+ */
+ rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse);
+
+ /**
+ * Get registered list of master servers in this cluster.
+ */
+ rpc GetMasters(GetMastersRequest) returns(GetMastersResponse) {
+ option deprecated = true;
+ };
+
+ /**
+ * Get current meta replicas' region locations.
+ */
+ rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse);
+
+ /**
+ * Get nodes which could be used as ClientMetaService
+ */
+ rpc GetBootstrapNodes(GetBootstrapNodesRequest) returns (GetBootstrapNodesResponse);
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaRegionLocationCache.java
similarity index 96%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/MetaRegionLocationCache.java
index 07512d1..b294f7be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaRegionLocationCache.java
@@ -15,15 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.master;
+package org.apache.hadoop.hbase;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ThreadFactory;
-import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
@@ -35,7 +35,9 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
@@ -87,10 +89,10 @@ public class MetaRegionLocationCache extends ZKListener {
// are established. Subsequent updates are handled by the registered listener. Also, this runs
// in a separate thread in the background to not block master init.
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build();
- RetryCounterFactory retryFactory = new RetryCounterFactory(
- Integer.MAX_VALUE, SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX);
- threadFactory.newThread(
- ()->loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT)).start();
+ RetryCounterFactory retryFactory = new RetryCounterFactory(Integer.MAX_VALUE,
+ SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX);
+ threadFactory.newThread(() -> loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT))
+ .start();
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 66ca847..8b1638a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MetaRegionLocationCache;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.PleaseRestartMasterException;
@@ -302,12 +303,6 @@ public class HMaster extends HRegionServer implements MasterServices {
// manager of assignment nodes in zookeeper
private AssignmentManager assignmentManager;
- /**
- * Cache for the meta region replica's locations. Also tracks their changes to avoid stale
- * cache entries.
- */
- private final MetaRegionLocationCache metaRegionLocationCache;
-
// manager of replication
private ReplicationPeerManager replicationPeerManager;
@@ -474,7 +469,6 @@ public class HMaster extends HRegionServer implements MasterServices {
}
}
- this.metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper);
this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);
cachedClusterId = new CachedClusterId(this, conf);
@@ -3793,10 +3787,6 @@ public class HMaster extends HRegionServer implements MasterServices {
return cachedClusterId.getFromCacheOrFetch();
}
- public MetaRegionLocationCache getMetaRegionLocationCache() {
- return this.metaRegionLocationCache;
- }
-
/**
* Get the compaction state of the table
*
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 1c3137f..da40aaa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.Server;
@@ -76,7 +75,6 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
-import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
import org.apache.hadoop.hbase.master.janitor.MetaFixer;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@@ -178,7 +176,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceReq
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
@@ -203,21 +200,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProced
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
@@ -354,6 +342,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
@@ -2986,9 +2984,11 @@ public class MasterRpcServices extends RSRpcServices implements
return true;
}
+ // Override this method since for backup master we will not set the clusterId field, which means
+ // we need to find another way to get cluster id for backup masters.
@Override
public GetClusterIdResponse getClusterId(RpcController rpcController, GetClusterIdRequest request)
- throws ServiceException {
+ throws ServiceException {
GetClusterIdResponse.Builder resp = GetClusterIdResponse.newBuilder();
String clusterId = master.getClusterId();
if (clusterId != null) {
@@ -2997,40 +2997,43 @@ public class MasterRpcServices extends RSRpcServices implements
return resp.build();
}
+ // Override this method since we use ActiveMasterManager to get active master on HMaster while in
+ // HRegionServer we use MasterAddressTracker
@Override
public GetActiveMasterResponse getActiveMaster(RpcController rpcController,
- GetActiveMasterRequest request) throws ServiceException {
+ GetActiveMasterRequest request) throws ServiceException {
GetActiveMasterResponse.Builder resp = GetActiveMasterResponse.newBuilder();
Optional<ServerName> serverName = master.getActiveMaster();
serverName.ifPresent(name -> resp.setServerName(ProtobufUtil.toServerName(name)));
return resp.build();
}
+ // Override this method since we use ActiveMasterManager to get backup masters on HMaster while in
+ // HRegionServer we use MasterAddressTracker
@Override
public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequest request)
- throws ServiceException {
+ throws ServiceException {
GetMastersResponse.Builder resp = GetMastersResponse.newBuilder();
// Active master
Optional<ServerName> serverName = master.getActiveMaster();
serverName.ifPresent(name -> resp.addMasterServers(GetMastersResponseEntry.newBuilder()
- .setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
+ .setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
// Backup masters
- for (ServerName backupMaster: master.getBackupMasters()) {
- resp.addMasterServers(GetMastersResponseEntry.newBuilder().setServerName(
- ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build());
+ for (ServerName backupMaster : master.getBackupMasters()) {
+ resp.addMasterServers(GetMastersResponseEntry.newBuilder()
+ .setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build());
}
return resp.build();
}
@Override
- public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController rpcController,
- GetMetaRegionLocationsRequest request) throws ServiceException {
- GetMetaRegionLocationsResponse.Builder response = GetMetaRegionLocationsResponse.newBuilder();
- Optional<List<HRegionLocation>> metaLocations =
- master.getMetaRegionLocationCache().getMetaRegionLocations();
- metaLocations.ifPresent(hRegionLocations -> hRegionLocations.forEach(
- location -> response.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
- return response.build();
+ public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
+ GetBootstrapNodesRequest request) throws ServiceException {
+ GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
+ for (ServerName sn : master.getServerManager().getOnlineServers().keySet()) {
+ builder.addServerName(ProtobufUtil.toServerName(sn));
+ }
+ return builder.build();
}
@Override
public HBaseProtos.LogEntry getLogEntries(RpcController controller,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 5582702..f30a8cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HealthCheckChore;
+import org.apache.hadoop.hbase.MetaRegionLocationCache;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.PleaseHoldException;
@@ -181,6 +182,7 @@ import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.RegionServerAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -427,6 +429,16 @@ public class HRegionServer extends Thread implements
// master address tracker
private final MasterAddressTracker masterAddressTracker;
+ /**
+ * Cache for the meta region replica's locations. Also tracks their changes to avoid stale cache
+ * entries. Used for serving ClientMetaService.
+ */
+ private final MetaRegionLocationCache metaRegionLocationCache;
+ /**
+ * Cache for all the region servers in the cluster. Used for serving ClientMetaService.
+ */
+ private final RegionServerAddressTracker regionServerAddressTracker;
+
// Cluster Status Tracker
protected final ClusterStatusTracker clusterStatusTracker;
@@ -677,6 +689,8 @@ public class HRegionServer extends Thread implements
clusterStatusTracker = null;
}
this.rpcServices.start(zooKeeper);
+ this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);
+ this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this);
// This violates 'no starting stuff in Constructor' but Master depends on the below chore
// and executor being created and takes a different startup route. Lots of overlap between HRS
// and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
@@ -3970,4 +3984,12 @@ public class HRegionServer extends Thread implements
public long getRetryPauseTime() {
return this.retryPauseTime;
}
+
+ public MetaRegionLocationCache getMetaRegionLocationCache() {
+ return this.metaRegionLocationCache;
+ }
+
+ RegionServerAddressTracker getRegionServerAddressTracker() {
+ return regionServerAddressTracker;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 92a10c8..06e7ccf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.PrivateCellUtil;
@@ -156,6 +157,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -254,6 +256,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -266,8 +280,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
@InterfaceAudience.Private
@SuppressWarnings("deprecation")
public class RSRpcServices implements HBaseRPCErrorHandler,
- AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
- ConfigurationObserver {
+ AdminService.BlockingInterface, ClientService.BlockingInterface,
+ ClientMetaService.BlockingInterface, PriorityFunction, ConfigurationObserver {
protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
/** RPC scheduler to use for the region server. */
@@ -373,9 +387,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* where you would ever turn off one or the other).
*/
public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
- "hbase.regionserver.admin.executorService";
+ "hbase.regionserver.admin.executorService";
public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
- "hbase.regionserver.client.executorService";
+ "hbase.regionserver.client.executorService";
+ public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG =
+ "hbase.regionserver.client.meta.executorService";
/**
* An Rpc callback for closing a RegionScanner.
@@ -1578,23 +1594,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* supports
*/
protected List<BlockingServiceAndInterface> getServices() {
- boolean admin =
- getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
- boolean client =
- getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
+ boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
+ boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
+ boolean clientMeta =
+ getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true);
List<BlockingServiceAndInterface> bssi = new ArrayList<>();
if (client) {
- bssi.add(new BlockingServiceAndInterface(
- ClientService.newReflectiveBlockingService(this),
- ClientService.BlockingInterface.class));
+ bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this),
+ ClientService.BlockingInterface.class));
}
if (admin) {
- bssi.add(new BlockingServiceAndInterface(
- AdminService.newReflectiveBlockingService(this),
- AdminService.BlockingInterface.class));
+ bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this),
+ AdminService.BlockingInterface.class));
}
- return new org.apache.hbase.thirdparty.com.google.common.collect.
- ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
+ if (clientMeta) {
+ bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
+ ClientMetaService.BlockingInterface.class));
+ }
+ return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
}
public InetSocketAddress getSocketAddress() {
@@ -4008,4 +4025,61 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
protected ZKPermissionWatcher getZkPermissionWatcher() {
return zkPermissionWatcher;
}
+
+ @Override
+ public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest request)
+ throws ServiceException {
+ return GetClusterIdResponse.newBuilder().setClusterId(regionServer.getClusterId()).build();
+ }
+
+ @Override
+ public GetActiveMasterResponse getActiveMaster(RpcController controller,
+ GetActiveMasterRequest request) throws ServiceException {
+ GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder();
+ ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress();
+ if (activeMaster != null) {
+ builder.setServerName(ProtobufUtil.toServerName(activeMaster));
+ }
+ return builder.build();
+ }
+
+ @Override
+ public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request)
+ throws ServiceException {
+ try {
+ GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
+ ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress();
+ if (activeMaster != null) {
+ builder.addMasterServers(GetMastersResponseEntry.newBuilder()
+ .setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true));
+ }
+ for (ServerName backupMaster : regionServer.getMasterAddressTracker().getBackupMasters()) {
+ builder.addMasterServers(GetMastersResponseEntry.newBuilder()
+ .setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false));
+ }
+ return builder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller,
+ GetMetaRegionLocationsRequest request) throws ServiceException {
+ GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder();
+ Optional<List<HRegionLocation>> metaLocations =
+ regionServer.getMetaRegionLocationCache().getMetaRegionLocations();
+ metaLocations.ifPresent(hRegionLocations -> hRegionLocations
+ .forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
+ return builder.build();
+ }
+
+ @Override
+ public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
+ GetBootstrapNodesRequest request) throws ServiceException {
+ GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
+ regionServer.getRegionServerAddressTracker().getRegionServers().stream()
+ .map(ProtobufUtil::toServerName).forEach(builder::addServerName);
+ return builder.build();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
index b7ab7f0..8fbe6ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
@@ -17,18 +17,20 @@
*/
package org.apache.hadoop.hbase.security;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos;
+
/**
* Implementation of secure Hadoop policy provider for mapping
* protocol interfaces to hbase-policy.xml entries.
@@ -41,7 +43,7 @@ public class HBasePolicyProvider extends PolicyProvider {
new Service("security.client.protocol.acl",
MasterProtos.HbckService.BlockingInterface.class),
new Service("security.client.protocol.acl",
- MasterProtos.ClientMetaService.BlockingInterface.class),
+ RegistryProtos.ClientMetaService.BlockingInterface.class),
new Service("security.admin.protocol.acl", MasterService.BlockingInterface.class),
new Service("security.masterregion.protocol.acl",
RegionServerStatusService.BlockingInterface.class)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java
index 3c539a0..be8e432 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java
@@ -95,7 +95,7 @@ class FromClientSideBase {
Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
ZKConnectionRegistry.class);
int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
- MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
+ AbstractRpcBasedConnectionRegistry.HEDGED_REQS_FANOUT_DEFAULT);
return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterAddressRefresher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterAddressRefresher.java
deleted file mode 100644
index ad1e738..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterAddressRefresher.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
-
-@Category({ClientTests.class, SmallTests.class})
-public class TestMasterAddressRefresher {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestMasterAddressRefresher.class);
-
- private class DummyMasterRegistry extends MasterRegistry {
-
- private final AtomicInteger getMastersCallCounter = new AtomicInteger(0);
- private final List<Long> callTimeStamps = new ArrayList<>();
-
- DummyMasterRegistry(Configuration conf) throws IOException {
- super(conf);
- }
-
- @Override
- CompletableFuture<List<ServerName>> getMasters() {
- getMastersCallCounter.incrementAndGet();
- callTimeStamps.add(EnvironmentEdgeManager.currentTime());
- return CompletableFuture.completedFuture(new ArrayList<>());
- }
-
- public int getMastersCount() {
- return getMastersCallCounter.get();
- }
-
- public List<Long> getCallTimeStamps() {
- return callTimeStamps;
- }
- }
-
- @Test
- public void testPeriodicMasterEndPointRefresh() throws IOException {
- Configuration conf = HBaseConfiguration.create();
- // Refresh every 1 second.
- conf.setLong(MasterAddressRefresher.PERIODIC_REFRESH_INTERVAL_SECS, 1);
- conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 0);
- try (DummyMasterRegistry registry = new DummyMasterRegistry(conf)) {
- // Wait for > 3 seconds to see that at least 3 getMasters() RPCs have been made.
- Waiter.waitFor(
- conf, 5000, (Waiter.Predicate<Exception>) () -> registry.getMastersCount() > 3);
- }
- }
-
- @Test
- public void testDurationBetweenRefreshes() throws IOException {
- Configuration conf = HBaseConfiguration.create();
- // Disable periodic refresh
- conf.setLong(MasterAddressRefresher.PERIODIC_REFRESH_INTERVAL_SECS, Integer.MAX_VALUE);
- // A minimum duration of 1s between refreshes
- conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 1);
- try (DummyMasterRegistry registry = new DummyMasterRegistry(conf)) {
- // Issue a ton of manual refreshes.
- for (int i = 0; i < 10000; i++) {
- registry.masterAddressRefresher.refreshNow();
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
- }
- // Overall wait time is 10000 ms, so the number of requests should be <=10
- List<Long> callTimeStamps = registry.getCallTimeStamps();
- // Actual calls to getMasters() should be much lower than the refresh count.
- Assert.assertTrue(
- String.valueOf(registry.getMastersCount()), registry.getMastersCount() <= 20);
- Assert.assertTrue(callTimeStamps.size() > 0);
- // Verify that the delta between subsequent RPCs is at least 1sec as configured.
- for (int i = 1; i < callTimeStamps.size() - 1; i++) {
- long delta = callTimeStamps.get(i) - callTimeStamps.get(i - 1);
- // Few ms cushion to account for any env jitter.
- Assert.assertTrue(callTimeStamps.toString(), delta > 990);
- }
- }
-
- }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
index 359ad61..6301b05 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
@@ -53,7 +53,7 @@ public class TestMasterRegistry {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestMasterRegistry.class);
+ HBaseClassTestRule.forClass(TestMasterRegistry.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@BeforeClass
@@ -90,7 +90,7 @@ public class TestMasterRegistry {
int numMasters = 10;
conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters));
try (MasterRegistry registry = new MasterRegistry(conf)) {
- List<ServerName> parsedMasters = new ArrayList<>(registry.getParsedMasterServers());
+ List<ServerName> parsedMasters = new ArrayList<>(registry.getParsedServers());
// Half of them would be without a port, duplicates are removed.
assertEquals(numMasters / 2 + 1, parsedMasters.size());
// Sort in the increasing order of port numbers.
@@ -149,17 +149,17 @@ public class TestMasterRegistry {
// Set the hedging fan out so that all masters are queried.
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
// Do not limit the number of refreshes during the test run.
- conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 0);
+ conf.setLong(MasterRegistry.MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES, 0);
try (MasterRegistry registry = new MasterRegistry(conf)) {
- final Set<ServerName> masters = registry.getParsedMasterServers();
+ final Set<ServerName> masters = registry.getParsedServers();
assertTrue(masters.contains(badServer));
// Make a registry RPC, this should trigger a refresh since one of the hedged RPC fails.
assertEquals(registry.getClusterId().get(), clusterId);
// Wait for new set of masters to be populated.
TEST_UTIL.waitFor(5000,
- (Waiter.Predicate<Exception>) () -> !registry.getParsedMasterServers().equals(masters));
+ (Waiter.Predicate<Exception>) () -> !registry.getParsedServers().equals(masters));
// new set of masters should not include the bad server
- final Set<ServerName> newMasters = registry.getParsedMasterServers();
+ final Set<ServerName> newMasters = registry.getParsedServers();
// Bad one should be out.
assertEquals(3, newMasters.size());
assertFalse(newMasters.contains(badServer));
@@ -170,8 +170,8 @@ public class TestMasterRegistry {
TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(10000);
// Wait until the killed master de-registered. This should also trigger another refresh.
TEST_UTIL.waitFor(10000, () -> registry.getMasters().get().size() == 2);
- TEST_UTIL.waitFor(20000, () -> registry.getParsedMasterServers().size() == 2);
- final Set<ServerName> newMasters2 = registry.getParsedMasterServers();
+ TEST_UTIL.waitFor(20000, () -> registry.getParsedServers().size() == 2);
+ final Set<ServerName> newMasters2 = registry.getParsedServers();
assertEquals(2, newMasters2.size());
assertFalse(newMasters2.contains(activeMaster.getServerName()));
} finally {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
index 24e8823..4aadeb3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
@@ -28,11 +28,11 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MetaRegionLocationCache;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.MetaRegionLocationCache;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -94,20 +94,20 @@ public class TestMetaRegionLocationCache {
break;
}
}
- List<HRegionLocation> metaHRLs =
- master.getMetaRegionLocationCache().getMetaRegionLocations().get();
- assertFalse(metaHRLs.isEmpty());
ZKWatcher zk = master.getZooKeeper();
List<String> metaZnodes = zk.getMetaReplicaNodes();
// Wait till all replicas available.
retries = 0;
- while (master.getMetaRegionLocationCache().getMetaRegionLocations().get().size() !=
- metaZnodes.size()) {
+ while (master.getMetaRegionLocationCache().getMetaRegionLocations().get().size() != metaZnodes
+ .size()) {
Thread.sleep(1000);
if (++retries == 10) {
break;
}
}
+ List<HRegionLocation> metaHRLs =
+ master.getMetaRegionLocationCache().getMetaRegionLocations().get();
+ assertFalse(metaHRLs.isEmpty());
assertEquals(metaZnodes.size(), metaHRLs.size());
List<HRegionLocation> actualHRLs = getCurrentMetaLocations(zk);
Collections.sort(metaHRLs);
@@ -115,13 +115,14 @@ public class TestMetaRegionLocationCache {
assertEquals(actualHRLs, metaHRLs);
}
- @Test public void testInitialMetaLocations() throws Exception {
+ @Test
+ public void testInitialMetaLocations() throws Exception {
verifyCachedMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster());
}
- @Test public void testStandByMetaLocations() throws Exception {
+ @Test
+ public void testStandByMetaLocations() throws Exception {
HMaster standBy = TEST_UTIL.getMiniHBaseCluster().startMaster().getMaster();
- standBy.isInitialized();
verifyCachedMetaLocations(standBy);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java
new file mode 100644
index 0000000..1447099
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestRegistryEndpointsRefresher {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegistryEndpointsRefresher.class);
+
+ private static final String INTERVAL_SECS_CONFIG_NAME =
+ "hbase.test.registry.refresh.interval.secs";
+ private static final String MIN_INTERVAL_SECS_CONFIG_NAME =
+ "hbase.test.registry.refresh.min.interval.secs";
+
+ private Configuration conf;
+ private RegistryEndpointsRefresher refresher;
+ private AtomicInteger getMastersCallCounter;
+ private CopyOnWriteArrayList<Long> callTimestamps;
+
+ @Before
+ public void setUp() {
+ conf = HBaseConfiguration.create();
+ getMastersCallCounter = new AtomicInteger(0);
+ callTimestamps = new CopyOnWriteArrayList<>();
+ }
+
+ @After
+ public void tearDown() {
+ if (refresher != null) {
+ refresher.stop();
+ }
+ }
+
+ private void refresh() {
+ getMastersCallCounter.incrementAndGet();
+ callTimestamps.add(EnvironmentEdgeManager.currentTime());
+ }
+
+ private void createAndStartRefresher(long intervalSecs, long minIntervalSecs) {
+ conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
+ conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
+ refresher = new RegistryEndpointsRefresher(conf, INTERVAL_SECS_CONFIG_NAME,
+ MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
+ refresher.start();
+ }
+
+ @Test
+ public void testPeriodicMasterEndPointRefresh() throws IOException {
+ // Refresh every 1 second.
+ createAndStartRefresher(1, 0);
+ // Wait for > 3 seconds to see that at least 3 getMasters() RPCs have been made.
+ Waiter.waitFor(conf, 5000, () -> getMastersCallCounter.get() > 3);
+ }
+
+ @Test
+ public void testDurationBetweenRefreshes() throws IOException {
+ // Disable periodic refresh
+ // A minimum duration of 1s between refreshes
+ createAndStartRefresher(Integer.MAX_VALUE, 1);
+ // Issue a ton of manual refreshes.
+ for (int i = 0; i < 10000; i++) {
+ refresher.refreshNow();
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
+ }
+ // Overall wait time is 10000 ms, so the number of requests should be <=10
+ // Actual calls to getMasters() should be much lower than the refresh count.
+ assertTrue(String.valueOf(getMastersCallCounter.get()), getMastersCallCounter.get() <= 20);
+ assertTrue(callTimestamps.size() > 0);
+ // Verify that the delta between subsequent RPCs is at least 1sec as configured.
+ for (int i = 1; i < callTimestamps.size() - 1; i++) {
+ long delta = callTimestamps.get(i) - callTimestamps.get(i - 1);
+ // Few ms cushion to account for any env jitter.
+ assertTrue(callTimestamps.toString(), delta > 990);
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java
new file mode 100644
index 0000000..20e77a8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestRpcConnectionRegistry {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRpcConnectionRegistry.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private RpcConnectionRegistry registry;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // allow refresh immediately so we will switch to use region servers soon.
+ UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1);
+ UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0);
+ UTIL.startMiniCluster(3);
+ HBaseTestingUtility.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ registry = new RpcConnectionRegistry(UTIL.getConfiguration());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ Closeables.close(registry, true);
+ }
+
+ @Test
+ public void testRegistryRPCs() throws Exception {
+ HMaster activeMaster = UTIL.getHBaseCluster().getMaster();
+ // wait until we switch to use region servers
+ UTIL.waitFor(10000, () -> registry.getParsedServers().size() == 3);
+ assertThat(registry.getParsedServers(),
+ hasItems(activeMaster.getServerManager().getOnlineServersList().toArray(new ServerName[0])));
+
+ // Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
+ // because not all replicas had made it up before test started.
+ RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
+
+ assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
+ assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
+ List<HRegionLocation> metaLocations =
+ Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
+ List<HRegionLocation> actualMetaLocations =
+ activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get();
+ Collections.sort(metaLocations);
+ Collections.sort(actualMetaLocations);
+ assertEquals(actualMetaLocations, metaLocations);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index 88b4fb7..39e8191 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -146,7 +146,7 @@ public class TestScannersFromClientSide {
Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
ZKConnectionRegistry.class);
int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
- MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
+ AbstractRpcBasedConnectionRegistry.HEDGED_REQS_FANOUT_DEFAULT);
return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java
index 428aee2..dfc35f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.junit.Assert.assertEquals;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -45,14 +46,15 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
@Category({MediumTests.class, MasterTests.class})
public class TestClientMetaServiceRPCs {
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerAddressTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerAddressTracker.java
new file mode 100644
index 0000000..e478639
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerAddressTracker.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
+/**
+ * Class for tracking the region servers for a cluster.
+ */
+@InterfaceAudience.Private
+public class RegionServerAddressTracker extends ZKListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RegionServerAddressTracker.class);
+
+ private volatile List<ServerName> regionServers = Collections.emptyList();
+
+ private final Abortable abortable;
+
+ public RegionServerAddressTracker(ZKWatcher watcher, Abortable abortable) {
+ super(watcher);
+ this.abortable = abortable;
+ watcher.registerListener(this);
+ loadRegionServerList();
+ }
+
+ private void loadRegionServerList() {
+ List<String> names;
+ try {
+ names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
+ } catch (KeeperException e) {
+ LOG.error("failed to list region servers", e);
+ abortable.abort("failed to list region servers", e);
+ return;
+ }
+ if (CollectionUtils.isEmpty(names)) {
+ regionServers = Collections.emptyList();
+ } else {
+ regionServers = names.stream().map(ServerName::parseServerName)
+ .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
+ }
+ }
+
+ @Override
+ public void nodeChildrenChanged(String path) {
+ if (path.equals(watcher.getZNodePaths().rsZNode)) {
+ loadRegionServerList();
+ }
+ }
+
+ public List<ServerName> getRegionServers() {
+ return regionServers;
+ }
+}
diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRegionServerAddressTracker.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRegionServerAddressTracker.java
new file mode 100644
index 0000000..6d2cc2b
--- /dev/null
+++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRegionServerAddressTracker.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ZKTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ ZKTests.class, MediumTests.class })
+public class TestRegionServerAddressTracker {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegionServerAddressTracker.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerAddressTracker.class);
+
+ private static final HBaseZKTestingUtility TEST_UTIL = new HBaseZKTestingUtility();
+
+ private ZKWatcher zk;
+
+ private RegionServerAddressTracker tracker;
+
+ @Rule
+ public TestName name = new TestName();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniZKCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniZKCluster();
+ }
+
+ @Before
+ public void setUp() throws ZooKeeperConnectionException, IOException, KeeperException {
+ TEST_UTIL.getConfiguration().set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + name.getMethodName());
+ zk = new ZKWatcher(TEST_UTIL.getConfiguration(), name.getMethodName(), null);
+ ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
+ tracker = new RegionServerAddressTracker(zk, new WarnOnlyAbortable());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ Closeables.close(zk, true);
+ }
+
+ @Test
+ public void test() throws KeeperException {
+ ServerName rs1 = ServerName.valueOf("127.0.0.1", 16000, EnvironmentEdgeManager.currentTime());
+ ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString()));
+ TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1);
+ assertEquals(rs1, tracker.getRegionServers().get(0));
+
+ ServerName rs2 = ServerName.valueOf("127.0.0.2", 16000, EnvironmentEdgeManager.currentTime());
+ ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString()));
+ TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 2);
+ assertThat(tracker.getRegionServers(), hasItems(rs1, rs2));
+
+ ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString()));
+ TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1);
+ assertEquals(rs2, tracker.getRegionServers().get(0));
+
+ ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString()));
+ TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().isEmpty());
+ }
+
+ private static final class WarnOnlyAbortable implements Abortable {
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.warn("RegionServerAddressTracker received abort, ignoring. Reason: {}", why, e);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+ }
+}