You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2020/01/14 16:30:42 UTC
[hbase] 06/06: HBASE-23305: Master based registry implementation
(#954)
This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch HBASE-18095/client-locate-meta-no-zookeeper
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit d9bb034c9439d034f23dd0b66faafce9726e2fa5
Author: Bharath Vissapragada <bh...@apache.org>
AuthorDate: Tue Jan 14 08:24:07 2020 -0800
HBASE-23305: Master based registry implementation (#954)
Implements a master based registry for clients.
- Supports hedged RPCs (fan out configured via configs).
- Parameterized existing client tests to run with multiple registry combinations.
- Added unit-test coverage for the new registry implementation.
Signed-off-by: Nick Dimiduk <nd...@apache.org>
Signed-off-by: stack <st...@apache.org>
Signed-off-by: Andrew Purtell <ap...@apache.org>
---
.../hbase/client/ConnectionRegistryFactory.java | 4 +-
.../apache/hadoop/hbase/client/MasterRegistry.java | 226 +++++++++++
.../MasterRegistryFetchException.java} | 32 +-
.../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 57 +--
.../apache/hadoop/hbase/ipc/BlockingRpcClient.java | 7 +-
.../apache/hadoop/hbase/ipc/HedgedRpcChannel.java | 274 ++++++++++++++
.../apache/hadoop/hbase/ipc/NettyRpcClient.java | 34 +-
.../org/apache/hadoop/hbase/ipc/RpcClient.java | 19 +-
.../hbase/client/TestConnectionRegistryLeak.java | 3 +-
.../java/org/apache/hadoop/hbase/HConstants.java | 20 +-
.../apache/hadoop/hbase/util/PrettyPrinter.java | 23 +-
.../org/apache/hadoop/hbase/TestTableName.java | 16 +-
.../apache/hadoop/hbase/util/JVMClusterUtil.java | 7 +
.../apache/hadoop/hbase/HBaseTestingUtility.java | 3 +
.../hbase/client/DummyConnectionRegistry.java | 3 +-
.../hadoop/hbase/client/TestFromClientSide.java | 420 +++++++++++----------
.../client/TestFromClientSideWithCoprocessor.java | 23 +-
.../hadoop/hbase/client/TestMasterRegistry.java | 125 ++++++
.../hbase/client/TestScannersFromClientSide.java | 136 ++++---
.../apache/hadoop/hbase/ipc/AbstractTestIPC.java | 120 +++++-
.../hbase/ipc/TestProtobufRpcServiceImpl.java | 25 +-
21 files changed, 1243 insertions(+), 334 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
index 80d358b..9308443 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -27,9 +28,6 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
final class ConnectionRegistryFactory {
- static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY =
- "hbase.client.connection.registry.impl";
-
private ConnectionRegistryFactory() {
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
new file mode 100644
index 0000000..5680847
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_DEFAULT;
+import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
+import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT;
+import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
+
+/**
+ * Master based registry implementation. Makes RPCs to the configured master addresses from config
+ * {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
+ *
+ * It supports hedged reads, which can be enabled by setting
+ * {@value org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to True. Fan
+ * out the requests batch is controlled by
+ * {@value org.apache.hadoop.hbase.HConstants#HBASE_RPCS_HEDGED_REQS_FANOUT_KEY}.
+ *
+ * TODO: Handle changes to the configuration dynamically without having to restart the client.
+ */
+@InterfaceAudience.Private
+public class MasterRegistry implements ConnectionRegistry {
+ private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
+
+ // Configured list of masters to probe the meta information from.
+ private final Set<ServerName> masterServers;
+
+ // RPC client used to talk to the masters.
+ private final RpcClient rpcClient;
+ private final RpcControllerFactory rpcControllerFactory;
+ private final int rpcTimeoutMs;
+
+ MasterRegistry(Configuration conf) {
+ boolean hedgedReadsEnabled = conf.getBoolean(MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY,
+ MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT);
+ Configuration finalConf;
+ if (!hedgedReadsEnabled) {
+ // If hedged reads are disabled, it is equivalent to setting a fan out of 1. We make a copy of
+ // the configuration so that other places reusing this reference is not affected.
+ finalConf = new Configuration(conf);
+ finalConf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, 1);
+ } else {
+ finalConf = conf;
+ }
+ rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+ masterServers = new HashSet<>();
+ parseMasterAddrs(finalConf);
+ rpcClient = RpcClientFactory.createClient(finalConf, HConstants.CLUSTER_ID_DEFAULT);
+ rpcControllerFactory = RpcControllerFactory.instantiate(finalConf);
+ }
+
+ /**
+ * @return Stub needed to make RPC using a hedged channel to the master end points.
+ */
+ private ClientMetaService.Interface getMasterStub() throws IOException {
+ return ClientMetaService.newStub(
+ rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), rpcTimeoutMs));
+ }
+
+ /**
+ * Parses the list of master addresses from the provided configuration. Supported format is
+ * comma separated host[:port] values. If no port number if specified, default master port is
+ * assumed.
+ * @param conf Configuration to parse from.
+ */
+ private void parseMasterAddrs(Configuration conf) {
+ String configuredMasters = conf.get(MASTER_ADDRS_KEY, MASTER_ADDRS_DEFAULT);
+ for (String masterAddr: configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
+ HostAndPort masterHostPort =
+ HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
+ masterServers.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
+ }
+ Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master address is needed");
+ }
+
+ @VisibleForTesting
+ public Set<ServerName> getParsedMasterServers() {
+ return Collections.unmodifiableSet(masterServers);
+ }
+
+ /**
+ * Returns a call back that can be passed along to the non-blocking rpc call. It is invoked once
+ * the rpc finishes and the response is propagated to the passed future.
+ * @param future Result future to which the rpc response is propagated.
+ * @param isValidResp Checks if the rpc response has a valid result.
+ * @param transformResult Transforms the result to a different form as expected by callers.
+ * @param hrc RpcController instance for this rpc.
+ * @param debug Debug message passed along to the caller in case of exceptions.
+ * @param <T> RPC result type.
+ * @param <R> Transformed type of the result.
+ * @return A call back that can be embedded in the non-blocking rpc call.
+ */
+ private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
+ Predicate<T> isValidResp, Function<T, R> transformResult, HBaseRpcController hrc,
+ final String debug) {
+ return rpcResult -> {
+ if (rpcResult == null) {
+ future.completeExceptionally(
+ new MasterRegistryFetchException(masterServers, hrc.getFailed()));
+ }
+ if (!isValidResp.test(rpcResult)) {
+ // Rpc returned ok, but result was malformed.
+ future.completeExceptionally(new IOException(
+ String.format("Invalid result for request %s. Will be retried", debug)));
+
+ }
+ future.complete(transformResult.apply(rpcResult));
+ };
+ }
+
+ /**
+ * Simple helper to transform the result of getMetaRegionLocations() rpc.
+ */
+ private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
+ List<HRegionLocation> regionLocations = new ArrayList<>();
+ resp.getMetaLocationsList().forEach(
+ location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
+ return new RegionLocations(regionLocations);
+ }
+
+ @Override
+ public CompletableFuture<RegionLocations> getMetaRegionLocations() {
+ CompletableFuture<RegionLocations> result = new CompletableFuture<>();
+ HBaseRpcController hrc = rpcControllerFactory.newController();
+ RpcCallback<GetMetaRegionLocationsResponse> callback = getRpcCallBack(result,
+ (rpcResp) -> rpcResp.getMetaLocationsCount() != 0, this::transformMetaRegionLocations, hrc,
+ "getMetaRegionLocations()");
+ try {
+ getMasterStub().getMetaRegionLocations(
+ hrc, GetMetaRegionLocationsRequest.getDefaultInstance(), callback);
+ } catch (IOException e) {
+ result.completeExceptionally(e);
+ }
+ return result;
+ }
+
+ @Override
+ public CompletableFuture<String> getClusterId() {
+ CompletableFuture<String> result = new CompletableFuture<>();
+ HBaseRpcController hrc = rpcControllerFactory.newController();
+ RpcCallback<GetClusterIdResponse> callback = getRpcCallBack(result,
+ GetClusterIdResponse::hasClusterId, GetClusterIdResponse::getClusterId, hrc,
+ "getClusterId()");
+ try {
+ getMasterStub().getClusterId(hrc, GetClusterIdRequest.getDefaultInstance(), callback);
+ } catch (IOException e) {
+ result.completeExceptionally(e);
+ }
+ return result;
+ }
+
+ private ServerName transformServerName(GetActiveMasterResponse resp) {
+ return ProtobufUtil.toServerName(resp.getServerName());
+ }
+
+ @Override
+ public CompletableFuture<ServerName> getActiveMaster() {
+ CompletableFuture<ServerName> result = new CompletableFuture<>();
+ HBaseRpcController hrc = rpcControllerFactory.newController();
+ RpcCallback<GetActiveMasterResponse> callback = getRpcCallBack(result,
+ GetActiveMasterResponse::hasServerName, this::transformServerName, hrc,
+ "getActiveMaster()");
+ try {
+ getMasterStub().getActiveMaster(hrc, GetActiveMasterRequest.getDefaultInstance(), callback);
+ } catch (IOException e) {
+ result.completeExceptionally(e);
+ }
+ return result;
+ }
+
+ @Override
+ public void close() {
+ if (rpcClient != null) {
+ rpcClient.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java
similarity index 53%
copy from hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
copy to hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java
index 80d358b..18871be 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java
@@ -15,31 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.client;
+package org.apache.hadoop.hbase.exceptions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
+import java.util.Set;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.PrettyPrinter;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * Factory class to get the instance of configured connection registry.
+ * Exception thrown when an master registry RPC fails in client. The exception includes the list of
+ * masters to which RPC was attempted and the last exception encountered. Prior exceptions are
+ * included in the logs.
*/
@InterfaceAudience.Private
-final class ConnectionRegistryFactory {
-
- static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY =
- "hbase.client.connection.registry.impl";
-
- private ConnectionRegistryFactory() {
- }
-
- /**
- * @return The connection registry implementation to use.
- */
- static ConnectionRegistry getRegistry(Configuration conf) {
- Class<? extends ConnectionRegistry> clazz = conf.getClass(
- CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
- ConnectionRegistry.class);
- return ReflectionUtils.newInstance(clazz, conf);
+public class MasterRegistryFetchException extends HBaseIOException {
+ public MasterRegistryFetchException(Set<ServerName> masters, Throwable failure) {
+ super(String.format("Exception making rpc to masters %s", PrettyPrinter.toString(masters)),
+ failure);
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 629efe6..72b9f83 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -20,22 +20,6 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
-import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
-import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
-import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -43,18 +27,15 @@ import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
@@ -69,7 +50,22 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
-
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
/**
@@ -106,7 +102,8 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_MUTABLE_COLLECTION_PKGPROTECT",
justification="the rest of the system which live in the different package can use")
- protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDLERS = new HashMap<>();
+ protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDLERS =
+ new HashMap<>();
static {
TOKEN_HANDLERS.put(Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector());
@@ -217,7 +214,9 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
// have some pending calls on connection so we should not shutdown the connection outside.
// The connection itself will disconnect if there is no pending call for maxIdleTime.
if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
- if (LOG.isTraceEnabled()) LOG.trace("Cleanup idle connection to " + conn.remoteId().address);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Cleanup idle connection to {}", conn.remoteId().address);
+ }
connections.removeValue(conn.remoteId(), conn);
conn.cleanupConnection();
}
@@ -398,7 +397,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
}
}
- private void callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
+ Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket, final InetSocketAddress addr,
final RpcCallback<Message> callback) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
@@ -435,9 +434,10 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
} catch (Exception e) {
call.setException(toIOE(e));
}
+ return call;
}
- private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
+ InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort());
if (addr.isUnresolved()) {
throw new UnknownHostException("can not resolve " + sn.getServerName());
@@ -527,6 +527,13 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
}
+ @Override
+ public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
+ throws UnknownHostException {
+ // Check HedgedRpcChannel implementation for detailed comments.
+ throw new UnsupportedOperationException("Hedging not supported for this implementation.");
+ }
+
private static class AbstractRpcChannel {
protected final InetSocketAddress addr;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
index f84c308..22eca53 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
@@ -17,18 +17,15 @@
*/
package org.apache.hadoop.hbase.ipc;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
import java.io.IOException;
import java.net.SocketAddress;
-
import javax.net.SocketFactory;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.net.NetUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* Does RPC against a cluster. Manages connections per regionserver in the cluster.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java
new file mode 100644
index 0000000..7b681e0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.PrettyPrinter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+/**
+ * A non-blocking implementation of RpcChannel that hedges requests to multiple service end points.
+ * First received response is returned to the caller. This abstracts out the logic needed to batch
+ * requests to multiple end points underneath and presents itself as a single logical RpcChannel to
+ * the client.
+ *
+ * Hedging Details:
+ * ---------------
+ * - Hedging of RPCs happens in multiple batches. In each iteration, we select a 'batch' of address
+ * end points to make the call to. We do multiple iterations until we get a proper response to the
+ * rpc call or all the service addresses are exhausted, which ever happens first. Size of each is
+ * configurable and is also known as 'fanOutSize'.
+ *
+ * - We randomize the addresses up front so that the batch order per client is non deterministic.
+ * This avoids hot spots on the service side. The size of each batch is controlled via 'fanOutSize'.
+ * Higher fanOutSize implies we make more rpc calls in a single batch. One needs to mindful of the
+ * load on the client and server side when configuring the fan out.
+ *
+ * - In a happy case, once we receive a response from one end point, we cancel all the
+ * other inflight rpcs in the same batch and return the response to the caller. If we do not get a
+ * valid response from any address end point, we propagate the error back to the caller.
+ *
+ * - Rpc timeouts are applied to every hedged rpc.
+ *
+ * - Callers need to be careful about what rpcs they are trying to hedge. Not every kind of call can
+ * be hedged (for example: cluster state changing rpcs).
+ *
+ * (TODO) Retries and Adaptive hedging policy:
+ * ------------------------------------------
+ *
+ * - No retries are handled at the channel level. Retries can be built in upper layers. However the
+ * question is, do we even need retries? Hedging in fact is a substitute for retries.
+ *
+ * - Clearly hedging puts more load on the service side. To mitigate this, we can make the hedging
+ * policy more adaptive. In most happy cases, the rpcs from the first few end points should return
+ * right away (especially short lived rpcs, that do not take up much time). In such cases, hedging
+ * is not needed. So, the idea is to make this request pattern pluggable so that the requests are
+ * hedged only when needed.
+ */
+class HedgedRpcChannel implements RpcChannel {
+ private static final Logger LOG = LoggerFactory.getLogger(HedgedRpcChannel.class);
+
+ /**
+ * Currently hedging is only supported for non-blocking connection implementation types because
+ * the channel implementation inherently relies on the connection implementation being async.
+ * Refer to the comments in doCallMethod().
+ */
+ private final NettyRpcClient rpcClient;
+ // List of service addresses to hedge the requests to.
+ private final List<InetSocketAddress> addrs;
+ private final User ticket;
+ private final int rpcTimeout;
+ // Controls the size of request fan out (number of rpcs per a single batch).
+ private final int fanOutSize;
+
+ /**
+ * A simple rpc call back implementation to notify the batch context if any rpc is successful.
+ */
+ private static class BatchRpcCtxCallBack implements RpcCallback<Message> {
+ private final BatchRpcCtx batchRpcCtx;
+ private final HBaseRpcController rpcController;
+ BatchRpcCtxCallBack(BatchRpcCtx batchRpcCtx, HBaseRpcController rpcController) {
+ this.batchRpcCtx = batchRpcCtx;
+ this.rpcController = rpcController;
+ }
+ @Override
+ public void run(Message result) {
+ batchRpcCtx.setResultIfNotSet(result, rpcController);
+ }
+ }
+
+ /**
+ * A shared RPC context between a batch of hedged RPCs. Tracks the state and helpers needed to
+ * synchronize on multiple RPCs to different end points fetching the result. All the methods are
+ * thread-safe.
+ */
+ private static class BatchRpcCtx {
+ // Result set by the thread finishing first. Set only once.
+ private final AtomicReference<Message> result = new AtomicReference<>();
+ // Caller waits on this latch being set.
+ // We set this to 1, so that the first successful RPC result is returned to the client.
+ private CountDownLatch resultsReady = new CountDownLatch(1);
+ // Failed rpc book-keeping.
+ private AtomicInteger failedRpcCount = new AtomicInteger();
+ // All the call handles for this batch.
+ private final List<Call> callsInFlight = Collections.synchronizedList(new ArrayList<>());
+
+ // Target addresses.
+ private final List<InetSocketAddress> addresses;
+ // Called when the result is ready.
+ private final RpcCallback<Message> callBack;
+ // Last failed rpc's exception. Used to propagate the reason to the controller.
+ private IOException lastFailedRpcReason;
+
+
+ BatchRpcCtx(List<InetSocketAddress> addresses, RpcCallback<Message> callBack) {
+ this.addresses = addresses;
+ this.callBack = Preconditions.checkNotNull(callBack);
+ }
+
+ /**
+ * Sets the result only if it is not already set by another thread. Thread that successfully
+ * sets the result also count downs the latch.
+ * @param result Result to be set.
+ */
+ public void setResultIfNotSet(Message result, HBaseRpcController rpcController) {
+ if (rpcController.failed()) {
+ incrementFailedRpcs(rpcController.getFailed());
+ return;
+ }
+ if (this.result.compareAndSet(null, result)) {
+ resultsReady.countDown();
+ // Cancel all pending in flight calls.
+ for (Call call: callsInFlight) {
+ // It is ok to do it for all calls as it is a no-op if the call is already done.
+ final String exceptionMsg = String.format("%s canceled because another hedged attempt " +
+ "for the same rpc already succeeded. This is not needed anymore.", call);
+ call.setException(new CallCancelledException(exceptionMsg));
+ }
+ }
+ }
+
+ /**
+ * Waits until the results are populated and calls the callback if the call is successful.
+ * @return true for successful rpc and false otherwise.
+ */
+ public boolean waitForResults() {
+ try {
+ // We do not set a timeout on await() because we rely on the underlying RPCs to timeout if
+ // something on the remote is broken. Worst case we should wait for rpc time out to kick in.
+ resultsReady.await();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for batched master RPC results. Aborting wait.", e);
+ }
+ Message message = result.get();
+ if (message != null) {
+ callBack.run(message);
+ return true;
+ }
+ return false;
+ }
+
+ public void addCallInFlight(Call c) {
+ callsInFlight.add(c);
+ }
+
+ public void incrementFailedRpcs(IOException reason) {
+ if (failedRpcCount.incrementAndGet() == addresses.size()) {
+ lastFailedRpcReason = reason;
+ // All the rpcs in this batch have failed. Invoke the waiting threads.
+ resultsReady.countDown();
+ }
+ }
+
+ public IOException getLastFailedRpcReason() {
+ return lastFailedRpcReason;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Batched rpc for target(s) %s", PrettyPrinter.toString(addresses));
+ }
+ }
+
+ public HedgedRpcChannel(NettyRpcClient rpcClient, Set<InetSocketAddress> addrs,
+ User ticket, int rpcTimeout, int fanOutSize) {
+ this.rpcClient = rpcClient;
+ this.addrs = new ArrayList<>(Preconditions.checkNotNull(addrs));
+ Preconditions.checkArgument(this.addrs.size() >= 1);
+ // For non-deterministic client query pattern. Not all clients want to hedge RPCs in the same
+ // order, creating hot spots on the service end points.
+ Collections.shuffle(this.addrs);
+ this.ticket = ticket;
+ this.rpcTimeout = rpcTimeout;
+ // fanOutSize controls the number of hedged RPCs per batch.
+ this.fanOutSize = fanOutSize;
+ }
+
+ private HBaseRpcController applyRpcTimeout(RpcController controller) {
+ HBaseRpcController hBaseRpcController = (HBaseRpcController) controller;
+ int rpcTimeoutToSet =
+ hBaseRpcController.hasCallTimeout() ? hBaseRpcController.getCallTimeout() : rpcTimeout;
+ HBaseRpcController response = new HBaseRpcControllerImpl();
+ response.setCallTimeout(rpcTimeoutToSet);
+ return response;
+ }
+
+ private void doCallMethod(Descriptors.MethodDescriptor method, HBaseRpcController controller,
+ Message request, Message responsePrototype, RpcCallback<Message> done) {
+ int i = 0;
+ BatchRpcCtx lastBatchCtx = null;
+ while (i < addrs.size()) {
+ // Each iteration picks fanOutSize addresses to run as batch.
+ int batchEnd = Math.min(addrs.size(), i + fanOutSize);
+ List<InetSocketAddress> addrSubList = addrs.subList(i, batchEnd);
+ BatchRpcCtx batchRpcCtx = new BatchRpcCtx(addrSubList, done);
+ lastBatchCtx = batchRpcCtx;
+ LOG.debug("Attempting request {}, {}", method.getName(), batchRpcCtx);
+ for (InetSocketAddress address : addrSubList) {
+ HBaseRpcController rpcController = applyRpcTimeout(controller);
+ // ** WARN ** This is a blocking call if the underlying connection for the rpc client is
+ // a blocking implementation (ex: BlockingRpcConnection). That essentially serializes all
+ // the write calls. Handling blocking connection means that this should be run in a separate
+ // thread and hence more code complexity. Is it ok to handle only non-blocking connections?
+ batchRpcCtx.addCallInFlight(rpcClient.callMethod(method, rpcController, request,
+ responsePrototype, ticket, address,
+ new BatchRpcCtxCallBack(batchRpcCtx, rpcController)));
+ }
+ if (batchRpcCtx.waitForResults()) {
+ return;
+ }
+ // Entire batch has failed, lets try the next batch.
+ LOG.debug("Failed request {}, {}.", method.getName(), batchRpcCtx);
+ i = batchEnd;
+ }
+ Preconditions.checkNotNull(lastBatchCtx);
+ // All the batches failed, mark it a failed rpc.
+ // Propagate the failure reason. We propagate the last batch's last failing rpc reason.
+ // Can we do something better?
+ controller.setFailed(lastBatchCtx.getLastFailedRpcReason());
+ done.run(null);
+ }
+
+ @Override
+ public void callMethod(Descriptors.MethodDescriptor method, RpcController controller,
+ Message request, Message responsePrototype, RpcCallback<Message> done) {
+ // There is no reason to use any other implementation of RpcController.
+ Preconditions.checkState(controller instanceof HBaseRpcController);
+ // To make the channel non-blocking, we run the actual doCalMethod() async. The call back is
+ // called once the hedging finishes.
+ CompletableFuture.runAsync(
+ () -> doCallMethod(method, (HBaseRpcController)controller, request, responsePrototype, done));
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
index 61dedbb..c4f70b0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
@@ -17,21 +17,26 @@
*/
package org.apache.hadoop.hbase.ipc;
-import org.apache.hbase.thirdparty.io.netty.channel.Channel;
-import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
-import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
-
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
-
+import java.net.UnknownHostException;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
/**
* Netty client for the requests and responses.
@@ -75,6 +80,19 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
}
@Override
+ public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
+ throws UnknownHostException {
+ final int hedgedRpcFanOut = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
+ HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
+ Set<InetSocketAddress> addresses = new HashSet<>();
+ for (ServerName sn: sns) {
+ addresses.add(createAddr(sn));
+ }
+ return new HedgedRpcChannel(this, addresses, user, rpcTimeout,
+ hedgedRpcFanOut);
+ }
+
+ @Override
protected void closeInternal() {
if (shutdownGroupWhenClose) {
group.shutdownGracefully();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
index 0e00695..558fcee 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
@@ -17,15 +17,14 @@
*/
package org.apache.hadoop.hbase.ipc;
-import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
-
import java.io.Closeable;
import java.io.IOException;
-
+import java.util.Set;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
/**
* Interface for RpcClient implementations so ConnectionManager can handle it.
@@ -84,6 +83,16 @@ public interface RpcClient extends Closeable {
throws IOException;
/**
+ * Creates a channel that can hedge request to multiple underlying channels.
+ * @param sns Set of servers for underlying channels.
+ * @param user user for the connection.
+ * @param rpcTimeout rpc timeout to use.
+ * @return A hedging rpc channel for this rpc client instance.
+ */
+ RpcChannel createHedgedRpcChannel(final Set<ServerName> sns, final User user, int rpcTimeout)
+ throws IOException;
+
+ /**
* Interrupt the connections to the given server. This should be called if the server
* is known as actually dead. This will not prevent current operation to be retried, and,
* depending on their own behavior, they may retry on the same server. This can be a feature,
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java
index f02ec42..561b1f5 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.FutureUtils;
@@ -70,7 +71,7 @@ public class TestConnectionRegistryLeak {
@BeforeClass
public static void setUp() {
- CONF.setClass(ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+ CONF.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
ConnectionRegistryForTest.class, ConnectionRegistry.class);
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 132d3e0..ef525d7 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -180,10 +180,17 @@ public final class HConstants {
public static final String MASTER_INFO_PORT = "hbase.master.info.port";
/** Configuration key for the list of master host:ports **/
- public static final String MASTER_ADDRS_KEY = "hbase.master.addrs";
+ public static final String MASTER_ADDRS_KEY = "hbase.masters";
public static final String MASTER_ADDRS_DEFAULT = "localhost:" + DEFAULT_MASTER_PORT;
+ /** Configuration to enable hedged reads on master registry **/
+ public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY =
+ "hbase.client.master_registry.enable_hedged_reads";
+
+ /** Default value for enabling hedging reads on master registry **/
+ public static final boolean MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT = false;
+
/** Parameter name for the master type being backup (waits for primary to go inactive). */
public static final String MASTER_TYPE_BACKUP = "hbase.master.backup";
@@ -909,6 +916,12 @@ public final class HConstants {
*/
public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
+ /** Configuration key that controls the fan out of requests in hedged channel implementation. **/
+ public static final String HBASE_RPCS_HEDGED_REQS_FANOUT_KEY = "hbase.rpc.hedged.fanout";
+
+ /** Default value for the fan out of hedged requests. **/
+ public static final int HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT = 2;
+
/**
* timeout for each read RPC
*/
@@ -940,6 +953,11 @@ public final class HConstants {
*/
public static final long NO_SEQNUM = -1;
+ /**
+ * Registry implementation to be used on the client side.
+ */
+ public static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY =
+ "hbase.client.registry.impl";
/*
* cluster replication constants.
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/PrettyPrinter.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/PrettyPrinter.java
index 147e916..ff7064b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/PrettyPrinter.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/PrettyPrinter.java
@@ -19,9 +19,12 @@
package org.apache.hadoop.hbase.util;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.HBaseException;
import org.apache.yetus.audience.InterfaceAudience;
@@ -29,7 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
-public class PrettyPrinter {
+public final class PrettyPrinter {
private static final Logger LOG = LoggerFactory.getLogger(PrettyPrinter.class);
@@ -117,7 +120,7 @@ public class PrettyPrinter {
sb.append(" DAY").append(days == 1 ? "" : "S");
}
- if (hours > 0 ) {
+ if (hours > 0) {
sb.append(days > 0 ? " " : "");
sb.append(hours);
sb.append(" HOUR").append(hours == 1 ? "" : "S");
@@ -188,4 +191,18 @@ public class PrettyPrinter {
return ttl;
}
+ /**
+ * Pretty prints a collection of any type to a string. Relies on toString() implementation of the
+ * object type.
+ * @param collection collection to pretty print.
+ * @return Pretty printed string for the collection.
+ */
+ public static String toString(Collection<?> collection) {
+ List<String> stringList = new ArrayList<>();
+ for (Object o: collection) {
+ stringList.add(Objects.toString(o));
+ }
+ return "[" + String.join(",", stringList) + "]";
+ }
+
}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTableName.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTableName.java
index 43a384a..7e44399 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTableName.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTableName.java
@@ -51,7 +51,21 @@ public class TestTableName extends TestWatcher {
*/
@Override
protected void starting(Description description) {
- tableName = TableName.valueOf(description.getMethodName());
+ tableName = TableName.valueOf(cleanUpTestName(description.getMethodName()));
+ }
+
+ /**
+ * Helper to handle parameterized method names. Unlike regular test methods, parameterized method
+ * names look like 'foo[x]'. This is problematic for tests that use this name for HBase tables.
+ * This helper strips out the parameter suffixes.
+ * @return current test method name with out parameterized suffixes.
+ */
+ private static String cleanUpTestName(String methodName) {
+ int index = methodName.indexOf('[');
+ if (index == -1) {
+ return methodName;
+ }
+ return methodName.substring(0, index);
}
public TableName getTableName() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
index 5c6ad95..cfa6f75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
@@ -26,12 +26,14 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* Utility used running a cluster all in the one JVM.
@@ -136,6 +138,11 @@ public class JVMClusterUtil {
} catch (Exception e) {
throw new IOException(e);
}
+ // Needed if a master based registry is configured for internal cluster connections. Here, we
+ // just add the current master host port since we do not know other master addresses up front
+ // in mini cluster tests.
+ c.set(HConstants.MASTER_ADDRS_KEY,
+ Preconditions.checkNotNull(server.getServerName().getAddress()).toString());
return new JVMClusterUtil.MasterThread(server, index);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index f1e91de..d1f2f1f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -1128,6 +1128,9 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
this.hbaseCluster =
new MiniHBaseCluster(c, option.getNumMasters(), option.getNumRegionServers(),
option.getRsPorts(), option.getMasterClass(), option.getRsClass());
+ // Populate the master address configuration from mini cluster configuration.
+ conf.set(HConstants.MASTER_ADDRS_KEY,
+ c.get(HConstants.MASTER_ADDRS_KEY, HConstants.MASTER_ADDRS_DEFAULT));
// Don't leave here till we've done a successful scan of the hbase:meta
Table t = getConnection().getTable(TableName.META_TABLE_NAME);
ResultScanner s = t.getScanner(new Scan());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
index a669362..c9d67f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
@@ -28,7 +29,7 @@ import org.apache.hadoop.hbase.ServerName;
public class DummyConnectionRegistry implements ConnectionRegistry {
public static final String REGISTRY_IMPL_CONF_KEY =
- ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
+ HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 61d78ce..4f4fe79 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,10 +28,10 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -64,7 +64,9 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TestTableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@@ -106,23 +108,28 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.NonRepeatedEnvironmentEdge;
import org.apache.hadoop.hbase.util.TableDescriptorChecker;
import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Assume;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* Run tests that use the HBase clients; {@link Table}.
* Sets up the HBase mini cluster once at start and runs through all client tests.
* Each creates a table named for the method and does its stuff against that.
+ *
+ * Parameterized to run with different registry implementations.
*/
@Category({LargeTests.class, ClientTests.class})
@SuppressWarnings ("deprecation")
+@RunWith(Parameterized.class)
public class TestFromClientSide {
@ClassRule
@@ -131,7 +138,7 @@ public class TestFromClientSide {
// NOTE: Increment tests were moved to their own class, TestIncrementsFromClientSide.
private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide.class);
- protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ protected static HBaseTestingUtility TEST_UTIL;
private static byte [] ROW = Bytes.toBytes("testRow");
private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static final byte[] INVALID_FAMILY = Bytes.toBytes("invalidTestFamily");
@@ -139,10 +146,54 @@ public class TestFromClientSide {
private static byte [] VALUE = Bytes.toBytes("testValue");
protected static int SLAVES = 3;
- @Rule
- public TestName name = new TestName();
+ @Rule public TestTableName name = new TestTableName();
+
+ // To keep the child classes happy.
+ TestFromClientSide() {}
+
+ public TestFromClientSide(Class registry, int numHedgedReqs) throws Exception {
+ initialize(registry, numHedgedReqs, MultiRowMutationEndpoint.class);
+ }
+
+ @Parameterized.Parameters
+ public static Collection parameters() {
+ return Arrays.asList(new Object[][] {
+ { MasterRegistry.class, 1},
+ { MasterRegistry.class, 2},
+ { ZKConnectionRegistry.class, 1}
+ });
+ }
+
+ /**
+ * JUnit does not provide an easy way to run a hook after each parameterized run. Without that
+ * there is no easy way to restart the test cluster after each parameterized run. Annotation
+ * BeforeParam does not work either because it runs before parameterization and hence does not
+ * have access to the test parameters (which is weird).
+ *
+ * This *hack* checks if the current instance of test cluster configuration has the passed
+ * parameterized configs. In such a case, we can just reuse the cluster for test and do not need
+ * to initialize from scratch. While this is a hack, it saves a ton of time for the full
+ * test and de-flakes it.
+ */
+ private static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) {
+ if (TEST_UTIL == null) {
+ return false;
+ }
+ Configuration conf = TEST_UTIL.getConfiguration();
+ Class confClass = conf.getClass(
+ HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
+ int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
+ HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
+ return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
+ }
- protected static final void initialize(Class<?>... cps) throws Exception {
+ protected static final void initialize(Class registryImpl, int numHedgedReqs, Class<?>... cps)
+ throws Exception {
+ // initialize() is called for every unit test, however we only want to reset the cluster state
+ // at the end of every parameterized run.
+ if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
+ return;
+ }
// Uncomment the following lines if more verbosity is needed for
// debugging (see HBASE-12285 for details).
// ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
@@ -150,22 +201,35 @@ public class TestFromClientSide {
// ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
// make sure that we do not get the same ts twice, see HBASE-19731 for more details.
EnvironmentEdgeManager.injectEdge(new NonRepeatedEnvironmentEdge());
+ if (TEST_UTIL != null) {
+ // We reached end of a parameterized run, clean up.
+ TEST_UTIL.shutdownMiniCluster();
+ }
+ TEST_UTIL = new HBaseTestingUtility();
Configuration conf = TEST_UTIL.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
Arrays.stream(cps).map(Class::getName).toArray(String[]::new));
conf.setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, true); // enable for below tests
- // We need more than one region server in this test
- TEST_UTIL.startMiniCluster(SLAVES);
- }
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- initialize(MultiRowMutationEndpoint.class);
+ conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
+ ConnectionRegistry.class);
+ if (numHedgedReqs == 1) {
+ conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
+ } else {
+ Preconditions.checkArgument(numHedgedReqs > 1);
+ conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
+ }
+ conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
+ StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
+ // Multiple masters needed only when hedged reads for master registry are enabled.
+ builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(SLAVES);
+ TEST_UTIL.startMiniCluster(builder.build());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
+ if (TEST_UTIL != null) {
+ TEST_UTIL.shutdownMiniCluster();
+ }
}
/**
@@ -173,7 +237,7 @@ public class TestFromClientSide {
*/
@Test
public void testDuplicateAppend() throws Exception {
- HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(name.getMethodName());
+ HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(name.getTableName());
Map<String, String> kvs = new HashMap<>();
kvs.put(SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
hdt.addCoprocessor(SleepAtFirstRpcCall.class.getName(), null, 1, kvs);
@@ -181,11 +245,11 @@ public class TestFromClientSide {
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
- // Client will retry beacuse rpc timeout is small than the sleep time of first rpc call
+ // Client will retry because rpc timeout is small than the sleep time of first rpc call
c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
try (Connection connection = ConnectionFactory.createConnection(c);
- Table table = connection.getTableBuilder(TableName.valueOf(name.getMethodName()), null)
+ Table table = connection.getTableBuilder(name.getTableName(), null)
.setOperationTimeout(3 * 1000).build()) {
Append append = new Append(ROW);
append.addColumn(HBaseTestingUtility.fam1, QUALIFIER, VALUE);
@@ -209,7 +273,7 @@ public class TestFromClientSide {
*/
@Test
public void testKeepDeletedCells() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
final byte[] FAMILY = Bytes.toBytes("family");
final byte[] C0 = Bytes.toBytes("c0");
@@ -275,7 +339,7 @@ public class TestFromClientSide {
*/
@Test
public void testPurgeFutureDeletes() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
final byte[] ROW = Bytes.toBytes("row");
final byte[] FAMILY = Bytes.toBytes("family");
final byte[] COLUMN = Bytes.toBytes("column");
@@ -328,7 +392,7 @@ public class TestFromClientSide {
*/
@Test
public void testGetConfiguration() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
Configuration conf = TEST_UTIL.getConfiguration();
try (Table table = TEST_UTIL.createTable(tableName, FAMILIES)) {
@@ -342,7 +406,7 @@ public class TestFromClientSide {
*/
@Test
public void testWeirdCacheBehaviour() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"),
Bytes.toBytes("trans-type"), Bytes.toBytes("trans-date"),
Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
@@ -383,7 +447,7 @@ public class TestFromClientSide {
}
private void deleteColumns(Table ht, String value, String keyPrefix)
- throws IOException {
+ throws IOException {
ResultScanner scanner = buildScanner(keyPrefix, value, ht);
Iterator<Result> it = scanner.iterator();
int count = 0;
@@ -468,8 +532,8 @@ public class TestFromClientSide {
*/
@Test
public void testFilterAcrossMultipleRegions()
- throws IOException, InterruptedException {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ throws IOException, InterruptedException {
+ final TableName tableName = name.getTableName();
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
assertRowCount(t, rowCount);
@@ -555,8 +619,7 @@ public class TestFromClientSide {
* @param t Table to split.
* @return Map of regions to servers.
*/
- private List<HRegionLocation> splitTable(final Table t)
- throws IOException, InterruptedException {
+ private List<HRegionLocation> splitTable(final Table t) throws IOException {
// Split this table in two.
Admin admin = TEST_UTIL.getAdmin();
admin.split(t.getName());
@@ -572,8 +635,7 @@ public class TestFromClientSide {
* @param t
* @return Map of table regions; caller needs to check table actually split.
*/
- private List<HRegionLocation> waitOnSplit(final Table t)
- throws IOException {
+ private List<HRegionLocation> waitOnSplit(final Table t) throws IOException {
try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(t.getName())) {
List<HRegionLocation> regions = locator.getAllRegionLocations();
int originalCount = regions.size();
@@ -585,8 +647,9 @@ public class TestFromClientSide {
e.printStackTrace();
}
regions = locator.getAllRegionLocations();
- if (regions.size() > originalCount)
+ if (regions.size() > originalCount) {
break;
+ }
}
return regions;
}
@@ -594,7 +657,7 @@ public class TestFromClientSide {
@Test
public void testSuperSimple() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE);
@@ -610,7 +673,7 @@ public class TestFromClientSide {
@Test
public void testMaxKeyValueSize() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
Configuration conf = TEST_UTIL.getConfiguration();
String oldMaxSize = conf.get(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY);
try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
@@ -640,7 +703,7 @@ public class TestFromClientSide {
@Test
public void testFilters() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
byte[][] ROWS = makeN(ROW, 10);
byte[][] QUALIFIERS = {
@@ -677,7 +740,7 @@ public class TestFromClientSide {
@Test
public void testFilterWithLongCompartor() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
byte[][] ROWS = makeN(ROW, 10);
byte[][] values = new byte[10][];
@@ -709,7 +772,7 @@ public class TestFromClientSide {
@Test
public void testKeyOnlyFilter() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
byte[][] ROWS = makeN(ROW, 10);
byte[][] QUALIFIERS = {
@@ -747,7 +810,7 @@ public class TestFromClientSide {
*/
@Test
public void testSimpleMissing() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
byte[][] ROWS = makeN(ROW, 4);
@@ -858,7 +921,7 @@ public class TestFromClientSide {
*/
@Test
public void testSingleRowMultipleFamily() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] ROWS = makeN(ROW, 3);
byte [][] FAMILIES = makeNAscii(FAMILY, 10);
byte [][] QUALIFIERS = makeN(QUALIFIER, 10);
@@ -1166,7 +1229,7 @@ public class TestFromClientSide {
@Test(expected = IllegalArgumentException.class)
public void testNullFamilyName() throws IOException {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
// Null family (should NOT work)
TEST_UTIL.createTable(tableName, new byte[][]{null});
@@ -1175,7 +1238,7 @@ public class TestFromClientSide {
@Test
public void testNullRowAndQualifier() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
@@ -1211,7 +1274,7 @@ public class TestFromClientSide {
@Test
public void testNullEmptyQualifier() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
@@ -1249,7 +1312,7 @@ public class TestFromClientSide {
@Test
public void testNullValue() throws IOException {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
// Null value
@@ -1284,7 +1347,7 @@ public class TestFromClientSide {
@Test
public void testNullQualifier() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
// Work for Put
@@ -1342,7 +1405,7 @@ public class TestFromClientSide {
@Test
public void testVersions() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
long [] STAMPS = makeStamps(20);
byte [][] VALUES = makeNAscii(VALUE, 20);
@@ -1569,7 +1632,7 @@ public class TestFromClientSide {
@Test
@SuppressWarnings("checkstyle:MethodLength")
public void testVersionLimits() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] FAMILIES = makeNAscii(FAMILY, 3);
int [] LIMITS = {1,3,5};
long [] STAMPS = makeStamps(10);
@@ -1764,7 +1827,7 @@ public class TestFromClientSide {
@Test
public void testDeleteFamilyVersion() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte[][] QUALIFIERS = makeNAscii(QUALIFIER, 1);
byte[][] VALUES = makeN(VALUE, 5);
@@ -1804,7 +1867,7 @@ public class TestFromClientSide {
@Test
public void testDeleteFamilyVersionWithOtherDeletes() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] QUALIFIERS = makeNAscii(QUALIFIER, 5);
byte [][] VALUES = makeN(VALUE, 5);
@@ -1922,7 +1985,7 @@ public class TestFromClientSide {
@Test
public void testDeleteWithFailed() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] FAMILIES = makeNAscii(FAMILY, 3);
byte [][] VALUES = makeN(VALUE, 5);
@@ -1948,7 +2011,7 @@ public class TestFromClientSide {
@Test
public void testDeletes() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] ROWS = makeNAscii(ROW, 6);
byte [][] FAMILIES = makeNAscii(FAMILY, 3);
@@ -2254,7 +2317,7 @@ public class TestFromClientSide {
*/
@Test
public void testBatchOperationsWithErrors() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table foo = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 10)) {
int NUM_OPS = 100;
@@ -2380,7 +2443,7 @@ public class TestFromClientSide {
int numRows = 10;
int numColsPerRow = 2000;
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] ROWS = makeN(ROW, numRows);
byte [][] QUALIFIERS = makeN(QUALIFIER, numColsPerRow);
@@ -2463,7 +2526,7 @@ public class TestFromClientSide {
*/
@Test
public void testJiraTest861() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] VALUES = makeNAscii(VALUE, 7);
long [] STAMPS = makeStamps(7);
@@ -2526,7 +2589,7 @@ public class TestFromClientSide {
*/
@Test
public void testJiraTest33() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] VALUES = makeNAscii(VALUE, 7);
long [] STAMPS = makeStamps(7);
@@ -2574,7 +2637,7 @@ public class TestFromClientSide {
*/
@Test
public void testJiraTest1014() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table ht = TEST_UTIL.createTable(tableName, FAMILY, 10)) {
@@ -2598,7 +2661,7 @@ public class TestFromClientSide {
*/
@Test
public void testJiraTest1182() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] VALUES = makeNAscii(VALUE, 7);
long [] STAMPS = makeStamps(7);
@@ -2642,7 +2705,7 @@ public class TestFromClientSide {
*/
@Test
public void testJiraTest52() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] VALUES = makeNAscii(VALUE, 7);
long [] STAMPS = makeStamps(7);
@@ -2678,8 +2741,7 @@ public class TestFromClientSide {
private void getVersionRangeAndVerifyGreaterThan(Table ht, byte [] row,
byte [] family, byte [] qualifier, long [] stamps, byte [][] values,
- int start, int end)
- throws IOException {
+ int start, int end) throws IOException {
Get get = new Get(row);
get.addColumn(family, qualifier);
get.readVersions(Integer.MAX_VALUE);
@@ -2689,8 +2751,7 @@ public class TestFromClientSide {
}
private void getVersionRangeAndVerify(Table ht, byte [] row, byte [] family,
- byte [] qualifier, long [] stamps, byte [][] values, int start, int end)
- throws IOException {
+ byte [] qualifier, long [] stamps, byte [][] values, int start, int end) throws IOException {
Get get = new Get(row);
get.addColumn(family, qualifier);
get.readVersions(Integer.MAX_VALUE);
@@ -2700,8 +2761,7 @@ public class TestFromClientSide {
}
private void getAllVersionsAndVerify(Table ht, byte [] row, byte [] family,
- byte [] qualifier, long [] stamps, byte [][] values, int start, int end)
- throws IOException {
+ byte [] qualifier, long [] stamps, byte [][] values, int start, int end) throws IOException {
Get get = new Get(row);
get.addColumn(family, qualifier);
get.readVersions(Integer.MAX_VALUE);
@@ -2711,8 +2771,7 @@ public class TestFromClientSide {
private void scanVersionRangeAndVerifyGreaterThan(Table ht, byte [] row,
byte [] family, byte [] qualifier, long [] stamps, byte [][] values,
- int start, int end)
- throws IOException {
+ int start, int end) throws IOException {
Scan scan = new Scan(row);
scan.addColumn(family, qualifier);
scan.setMaxVersions(Integer.MAX_VALUE);
@@ -2722,8 +2781,7 @@ public class TestFromClientSide {
}
private void scanVersionRangeAndVerify(Table ht, byte [] row, byte [] family,
- byte [] qualifier, long [] stamps, byte [][] values, int start, int end)
- throws IOException {
+ byte [] qualifier, long [] stamps, byte [][] values, int start, int end) throws IOException {
Scan scan = new Scan(row);
scan.addColumn(family, qualifier);
scan.setMaxVersions(Integer.MAX_VALUE);
@@ -2733,8 +2791,7 @@ public class TestFromClientSide {
}
private void scanAllVersionsAndVerify(Table ht, byte [] row, byte [] family,
- byte [] qualifier, long [] stamps, byte [][] values, int start, int end)
- throws IOException {
+ byte [] qualifier, long [] stamps, byte [][] values, int start, int end) throws IOException {
Scan scan = new Scan(row);
scan.addColumn(family, qualifier);
scan.setMaxVersions(Integer.MAX_VALUE);
@@ -2743,8 +2800,7 @@ public class TestFromClientSide {
}
private void getVersionAndVerify(Table ht, byte [] row, byte [] family,
- byte [] qualifier, long stamp, byte [] value)
- throws Exception {
+ byte [] qualifier, long stamp, byte [] value) throws Exception {
Get get = new Get(row);
get.addColumn(family, qualifier);
get.setTimestamp(stamp);
@@ -2754,8 +2810,7 @@ public class TestFromClientSide {
}
private void getVersionAndVerifyMissing(Table ht, byte [] row, byte [] family,
- byte [] qualifier, long stamp)
- throws Exception {
+ byte [] qualifier, long stamp) throws Exception {
Get get = new Get(row);
get.addColumn(family, qualifier);
get.setTimestamp(stamp);
@@ -2765,8 +2820,7 @@ public class TestFromClientSide {
}
private void scanVersionAndVerify(Table ht, byte [] row, byte [] family,
- byte [] qualifier, long stamp, byte [] value)
- throws Exception {
+ byte [] qualifier, long stamp, byte [] value) throws Exception {
Scan scan = new Scan(row);
scan.addColumn(family, qualifier);
scan.setTimestamp(stamp);
@@ -2776,8 +2830,7 @@ public class TestFromClientSide {
}
private void scanVersionAndVerifyMissing(Table ht, byte [] row,
- byte [] family, byte [] qualifier, long stamp)
- throws Exception {
+ byte [] family, byte [] qualifier, long stamp) throws Exception {
Scan scan = new Scan(row);
scan.addColumn(family, qualifier);
scan.setTimestamp(stamp);
@@ -2786,10 +2839,7 @@ public class TestFromClientSide {
assertNullResult(result);
}
- private void getTestNull(Table ht, byte [] row, byte [] family,
- byte [] value)
- throws Exception {
-
+ private void getTestNull(Table ht, byte [] row, byte [] family, byte [] value) throws Exception {
Get get = new Get(row);
get.addColumn(family, null);
Result result = ht.get(get);
@@ -2866,9 +2916,7 @@ public class TestFromClientSide {
}
private void singleRowGetTest(Table ht, byte [][] ROWS, byte [][] FAMILIES,
- byte [][] QUALIFIERS, byte [][] VALUES)
- throws Exception {
-
+ byte [][] QUALIFIERS, byte [][] VALUES) throws Exception {
// Single column from memstore
Get get = new Get(ROWS[0]);
get.addColumn(FAMILIES[4], QUALIFIERS[0]);
@@ -2923,7 +2971,7 @@ public class TestFromClientSide {
assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
new int [][] {
{2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}
- });
+ });
// Multiple columns from everywhere storefile, many family, wildcard
get = new Get(ROWS[0]);
@@ -2939,7 +2987,7 @@ public class TestFromClientSide {
assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
new int [][] {
{2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}
- });
+ });
// Everything
get = new Get(ROWS[0]);
@@ -2947,7 +2995,7 @@ public class TestFromClientSide {
assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
new int [][] {
{2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}, {9, 0, 0}
- });
+ });
// Get around inserted columns
@@ -2964,9 +3012,7 @@ public class TestFromClientSide {
}
private void singleRowScanTest(Table ht, byte [][] ROWS, byte [][] FAMILIES,
- byte [][] QUALIFIERS, byte [][] VALUES)
- throws Exception {
-
+ byte [][] QUALIFIERS, byte [][] VALUES) throws Exception {
// Single column from memstore
Scan scan = new Scan();
scan.addColumn(FAMILIES[4], QUALIFIERS[0]);
@@ -3021,7 +3067,7 @@ public class TestFromClientSide {
assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
new int [][] {
{2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}
- });
+ });
// Multiple columns from everywhere storefile, many family, wildcard
scan = new Scan();
@@ -3037,7 +3083,7 @@ public class TestFromClientSide {
assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
new int [][] {
{2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}
- });
+ });
// Everything
scan = new Scan();
@@ -3045,7 +3091,7 @@ public class TestFromClientSide {
assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
new int [][] {
{2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}, {9, 0, 0}
- });
+ });
// Scan around inserted columns
@@ -3065,13 +3111,9 @@ public class TestFromClientSide {
* Expects family and qualifier arrays to be valid for at least
* the range: idx-2 < idx < idx+2
*/
- private void getVerifySingleColumn(Table ht,
- byte [][] ROWS, int ROWIDX,
- byte [][] FAMILIES, int FAMILYIDX,
- byte [][] QUALIFIERS, int QUALIFIERIDX,
- byte [][] VALUES, int VALUEIDX)
- throws Exception {
-
+ private void getVerifySingleColumn(Table ht, byte [][] ROWS, int ROWIDX, byte [][] FAMILIES,
+ int FAMILYIDX, byte [][] QUALIFIERS, int QUALIFIERIDX, byte [][] VALUES, int VALUEIDX)
+ throws Exception {
Get get = new Get(ROWS[ROWIDX]);
Result result = ht.get(get);
assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX],
@@ -3123,13 +3165,9 @@ public class TestFromClientSide {
* the range: idx-2 to idx+2
* Expects row array to be valid for at least idx to idx+2
*/
- private void scanVerifySingleColumn(Table ht,
- byte [][] ROWS, int ROWIDX,
- byte [][] FAMILIES, int FAMILYIDX,
- byte [][] QUALIFIERS, int QUALIFIERIDX,
- byte [][] VALUES, int VALUEIDX)
- throws Exception {
-
+ private void scanVerifySingleColumn(Table ht, byte [][] ROWS, int ROWIDX, byte [][] FAMILIES,
+ int FAMILYIDX, byte [][] QUALIFIERS, int QUALIFIERIDX, byte [][] VALUES, int VALUEIDX)
+ throws Exception {
Scan scan = new Scan();
Result result = getSingleScanResult(ht, scan);
assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX],
@@ -3183,12 +3221,8 @@ public class TestFromClientSide {
* Verify we do not read any values by accident around a single column
* Same requirements as getVerifySingleColumn
*/
- private void getVerifySingleEmpty(Table ht,
- byte [][] ROWS, int ROWIDX,
- byte [][] FAMILIES, int FAMILYIDX,
- byte [][] QUALIFIERS, int QUALIFIERIDX)
- throws Exception {
-
+ private void getVerifySingleEmpty(Table ht, byte [][] ROWS, int ROWIDX, byte [][] FAMILIES,
+ int FAMILYIDX, byte [][] QUALIFIERS, int QUALIFIERIDX) throws Exception {
Get get = new Get(ROWS[ROWIDX]);
get.addFamily(FAMILIES[4]);
get.addColumn(FAMILIES[4], QUALIFIERS[1]);
@@ -3214,12 +3248,8 @@ public class TestFromClientSide {
}
- private void scanVerifySingleEmpty(Table ht,
- byte [][] ROWS, int ROWIDX,
- byte [][] FAMILIES, int FAMILYIDX,
- byte [][] QUALIFIERS, int QUALIFIERIDX)
- throws Exception {
-
+ private void scanVerifySingleEmpty(Table ht, byte [][] ROWS, int ROWIDX, byte [][] FAMILIES,
+ int FAMILYIDX, byte [][] QUALIFIERS, int QUALIFIERIDX) throws Exception {
Scan scan = new Scan(ROWS[ROWIDX+1]);
Result result = getSingleScanResult(ht, scan);
assertNullResult(result);
@@ -3244,9 +3274,7 @@ public class TestFromClientSide {
// Verifiers
//
- private void assertKey(Cell key, byte [] row, byte [] family,
- byte [] qualifier, byte [] value)
- throws Exception {
+ private void assertKey(Cell key, byte [] row, byte [] family, byte [] qualifier, byte [] value) {
assertTrue("Expected row [" + Bytes.toString(row) + "] " +
"Got row [" + Bytes.toString(CellUtil.cloneRow(key)) +"]",
equals(row, CellUtil.cloneRow(key)));
@@ -3262,8 +3290,7 @@ public class TestFromClientSide {
}
static void assertIncrementKey(Cell key, byte [] row, byte [] family,
- byte [] qualifier, long value)
- throws Exception {
+ byte [] qualifier, long value) {
assertTrue("Expected row [" + Bytes.toString(row) + "] " +
"Got row [" + Bytes.toString(CellUtil.cloneRow(key)) +"]",
equals(row, CellUtil.cloneRow(key)));
@@ -3284,9 +3311,7 @@ public class TestFromClientSide {
}
private void assertNResult(Result result, byte [] row,
- byte [][] families, byte [][] qualifiers, byte [][] values,
- int [][] idxs)
- throws Exception {
+ byte [][] families, byte [][] qualifiers, byte [][] values, int [][] idxs) {
assertTrue("Expected row [" + Bytes.toString(row) + "] " +
"Got row [" + Bytes.toString(result.getRow()) +"]",
equals(row, result.getRow()));
@@ -3318,8 +3343,7 @@ public class TestFromClientSide {
private void assertNResult(Result result, byte [] row,
byte [] family, byte [] qualifier, long [] stamps, byte [][] values,
- int start, int end)
- throws IOException {
+ int start, int end) {
assertTrue("Expected row [" + Bytes.toString(row) + "] " +
"Got row [" + Bytes.toString(result.getRow()) +"]",
equals(row, result.getRow()));
@@ -3353,8 +3377,7 @@ public class TestFromClientSide {
*/
private void assertDoubleResult(Result result, byte [] row,
byte [] familyA, byte [] qualifierA, byte [] valueA,
- byte [] familyB, byte [] qualifierB, byte [] valueB)
- throws Exception {
+ byte [] familyB, byte [] qualifierB, byte [] valueB) {
assertTrue("Expected row [" + Bytes.toString(row) + "] " +
"Got row [" + Bytes.toString(result.getRow()) +"]",
equals(row, result.getRow()));
@@ -3384,8 +3407,7 @@ public class TestFromClientSide {
}
private void assertSingleResult(Result result, byte [] row, byte [] family,
- byte [] qualifier, byte [] value)
- throws Exception {
+ byte [] qualifier, byte [] value) {
assertTrue("Expected row [" + Bytes.toString(row) + "] " +
"Got row [" + Bytes.toString(result.getRow()) +"]",
equals(row, result.getRow()));
@@ -3423,8 +3445,7 @@ public class TestFromClientSide {
}
private void assertSingleResult(Result result, byte [] row, byte [] family,
- byte [] qualifier, long ts, byte [] value)
- throws Exception {
+ byte [] qualifier, long ts, byte [] value) {
assertTrue("Expected row [" + Bytes.toString(row) + "] " +
"Got row [" + Bytes.toString(result.getRow()) +"]",
equals(row, result.getRow()));
@@ -3515,7 +3536,7 @@ public class TestFromClientSide {
@Test
public void testDuplicateVersions() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
long [] STAMPS = makeStamps(20);
byte [][] VALUES = makeNAscii(VALUE, 20);
@@ -3738,7 +3759,7 @@ public class TestFromClientSide {
@Test
public void testUpdates() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table hTable = TEST_UTIL.createTable(tableName, FAMILY, 10)) {
// Write a column with values at timestamp 1, 2 and 3
@@ -3788,7 +3809,7 @@ public class TestFromClientSide {
@Test
public void testUpdatesWithMajorCompaction() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table hTable = TEST_UTIL.createTable(tableName, FAMILY, 10);
Admin admin = TEST_UTIL.getAdmin()) {
@@ -3849,7 +3870,7 @@ public class TestFromClientSide {
@Test
public void testMajorCompactionBetweenTwoUpdates() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table hTable = TEST_UTIL.createTable(tableName, FAMILY, 10);
Admin admin = TEST_UTIL.getAdmin()) {
@@ -3916,7 +3937,7 @@ public class TestFromClientSide {
@Test
public void testGet_EmptyTable() throws IOException {
- try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+ try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
Get get = new Get(ROW);
get.addFamily(FAMILY);
Result r = table.get(get);
@@ -3926,7 +3947,7 @@ public class TestFromClientSide {
@Test
public void testGet_NullQualifier() throws IOException {
- try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+ try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE);
table.put(put);
@@ -3950,7 +3971,7 @@ public class TestFromClientSide {
@Test
public void testGet_NonExistentRow() throws IOException {
- try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+ try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE);
table.put(put);
@@ -3978,7 +3999,7 @@ public class TestFromClientSide {
final byte [] row1 = Bytes.toBytes("row1");
final byte [] row2 = Bytes.toBytes("row2");
final byte [] value = Bytes.toBytes("abcd");
- try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
+ try (Table table = TEST_UTIL.createTable(name.getTableName(),
new byte[][] { CONTENTS_FAMILY, SMALL_FAMILY })) {
Put put = new Put(row1);
put.addColumn(CONTENTS_FAMILY, null, value);
@@ -4017,7 +4038,7 @@ public class TestFromClientSide {
public void testPutNoCF() throws IOException {
final byte[] BAD_FAM = Bytes.toBytes("BAD_CF");
final byte[] VAL = Bytes.toBytes(100);
- try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+ try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
boolean caughtNSCFE = false;
try {
@@ -4037,7 +4058,7 @@ public class TestFromClientSide {
final byte[] SMALL_FAMILY = Bytes.toBytes("smallfam");
final int NB_BATCH_ROWS = 10;
final byte[] value = Bytes.toBytes("abcd");
- try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
+ try (Table table = TEST_UTIL.createTable(name.getTableName(),
new byte[][] {CONTENTS_FAMILY, SMALL_FAMILY })) {
ArrayList<Put> rowsUpdate = new ArrayList<Put>();
for (int i = 0; i < NB_BATCH_ROWS; i++) {
@@ -4067,7 +4088,7 @@ public class TestFromClientSide {
final byte[] SMALL_FAMILY = Bytes.toBytes("smallfam");
final byte[] value = Bytes.toBytes("abcd");
final int NB_BATCH_ROWS = 10;
- try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
+ try (Table table = TEST_UTIL.createTable(name.getTableName(),
new byte[][] { CONTENTS_FAMILY, SMALL_FAMILY })) {
ArrayList<Put> rowsUpdate = new ArrayList<Put>();
for (int i = 0; i < NB_BATCH_ROWS * 10; i++) {
@@ -4130,7 +4151,7 @@ public class TestFromClientSide {
final byte [] FAM1 = Bytes.toBytes("fam1");
final byte [] FAM2 = Bytes.toBytes("fam2");
// Open table
- try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
+ try (Table table = TEST_UTIL.createTable(name.getTableName(),
new byte [][] {FAM1, FAM2})) {
// Insert some values
Put put = new Put(ROW);
@@ -4213,9 +4234,10 @@ public class TestFromClientSide {
@Test
public void testListTables() throws IOException, InterruptedException {
- final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
- final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
- final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
+ final String testTableName = name.getTableName().toString();
+ final TableName tableName1 = TableName.valueOf(testTableName + "1");
+ final TableName tableName2 = TableName.valueOf(testTableName + "2");
+ final TableName tableName3 = TableName.valueOf(testTableName + "3");
TableName [] tables = new TableName[] { tableName1, tableName2, tableName3 };
for (int i = 0; i < tables.length; i++) {
TEST_UTIL.createTable(tables[i], FAMILY);
@@ -4244,7 +4266,7 @@ public class TestFromClientSide {
*/
@Test
public void testUnmanagedHConnection() throws IOException {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
Table t = conn.getTable(tableName);
@@ -4260,7 +4282,13 @@ public class TestFromClientSide {
*/
@Test
public void testUnmanagedHConnectionReconnect() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ Configuration conf = TEST_UTIL.getConfiguration();
+ Class registryImpl = conf.getClass(
+ HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
+ // This test does not make sense for MasterRegistry since it stops the only master in the
+ // cluster and starts a new master without populating the underlying config for the connection.
+ Assume.assumeFalse(registryImpl.equals(MasterRegistry.class));
+ final TableName tableName = name.getTableName();
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
try (Table t = conn.getTable(tableName); Admin admin = conn.getAdmin()) {
@@ -4290,8 +4318,9 @@ public class TestFromClientSide {
@Test
public void testMiscHTableStuff() throws IOException {
- final TableName tableAname = TableName.valueOf(name.getMethodName() + "A");
- final TableName tableBname = TableName.valueOf(name.getMethodName() + "B");
+ final String testTableName = name.getTableName().toString();
+ final TableName tableAname = TableName.valueOf(testTableName + "A");
+ final TableName tableBname = TableName.valueOf(testTableName + "B");
final byte[] attrName = Bytes.toBytes("TESTATTR");
final byte[] attrValue = Bytes.toBytes("somevalue");
byte[] value = Bytes.toBytes("value");
@@ -4340,8 +4369,9 @@ public class TestFromClientSide {
// add a user attribute to HTD
desc.setValue(attrName, attrValue);
// add a user attribute to HCD
- for (HColumnDescriptor c : desc.getFamilies())
+ for (HColumnDescriptor c : desc.getFamilies()) {
c.setValue(attrName, attrValue);
+ }
// update metadata for all regions of this table
admin.modifyTable(desc);
// enable the table
@@ -4365,7 +4395,7 @@ public class TestFromClientSide {
@Test
public void testGetClosestRowBefore() throws IOException, InterruptedException {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
final byte[] firstRow = Bytes.toBytes("row111");
final byte[] secondRow = Bytes.toBytes("row222");
final byte[] thirdRow = Bytes.toBytes("row333");
@@ -4492,7 +4522,7 @@ public class TestFromClientSide {
@Test
public void testMultiRowMutation() throws Exception {
LOG.info("Starting testMultiRowMutation");
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
final byte [] ROW1 = Bytes.toBytes("testRow1");
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
@@ -4524,7 +4554,7 @@ public class TestFromClientSide {
@Test
public void testRowMutation() throws Exception {
LOG.info("Starting testRowMutation");
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b") };
RowMutations arm = new RowMutations(ROW);
@@ -4574,7 +4604,7 @@ public class TestFromClientSide {
@Test
public void testBatchAppendWithReturnResultFalse() throws Exception {
LOG.info("Starting testBatchAppendWithReturnResultFalse");
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
Append append1 = new Append(Bytes.toBytes("row1"));
append1.setReturnResults(false);
@@ -4598,7 +4628,7 @@ public class TestFromClientSide {
@Test
public void testAppend() throws Exception {
LOG.info("Starting testAppend");
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
byte[] v1 = Bytes.toBytes("42");
byte[] v2 = Bytes.toBytes("23");
@@ -4690,7 +4720,7 @@ public class TestFromClientSide {
@Test
public void testClientPoolRoundRobin() throws IOException {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
int poolSize = 3;
int numVersions = poolSize * 2;
@@ -4728,7 +4758,7 @@ public class TestFromClientSide {
@Ignore ("Flakey: HBASE-8989") @Test
public void testClientPoolThreadLocal() throws IOException {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
int poolSize = Integer.MAX_VALUE;
int numVersions = 3;
@@ -4814,7 +4844,7 @@ public class TestFromClientSide {
final byte [] anotherrow = Bytes.toBytes("anotherrow");
final byte [] value2 = Bytes.toBytes("abcd");
- try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+ try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
Put put1 = new Put(ROW);
put1.addColumn(FAMILY, QUALIFIER, VALUE);
@@ -4852,7 +4882,7 @@ public class TestFromClientSide {
@Test
public void testCheckAndMutateWithTimeRange() throws IOException {
- try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+ try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
final long ts = System.currentTimeMillis() / 2;
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
@@ -4948,7 +4978,7 @@ public class TestFromClientSide {
final byte [] value3 = Bytes.toBytes("cccc");
final byte [] value4 = Bytes.toBytes("dddd");
- try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+ try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
Put put2 = new Put(ROW);
put2.addColumn(FAMILY, QUALIFIER, value2);
@@ -5030,7 +5060,7 @@ public class TestFromClientSide {
public void testCheckAndDelete() throws IOException {
final byte [] value1 = Bytes.toBytes("aaaa");
- try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
+ try (Table table = TEST_UTIL.createTable(name.getTableName(),
FAMILY)) {
Put put = new Put(ROW);
@@ -5053,7 +5083,7 @@ public class TestFromClientSide {
final byte [] value3 = Bytes.toBytes("cccc");
final byte [] value4 = Bytes.toBytes("dddd");
- try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
+ try (Table table = TEST_UTIL.createTable(name.getTableName(),
FAMILY)) {
Put put2 = new Put(ROW);
@@ -5143,9 +5173,9 @@ public class TestFromClientSide {
* Test ScanMetrics
*/
@Test
- @SuppressWarnings ("unused")
+ @SuppressWarnings({"unused", "checkstyle:EmptyBlock"})
public void testScanMetrics() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
// Set up test table:
// Create table:
@@ -5198,7 +5228,6 @@ public class TestFromClientSide {
// the end of the scanner. So this is asking for 2 of the 3 rows we inserted.
for (Result result : scanner.next(numRecords - 1)) {
}
-
ScanMetrics scanMetrics = scanner.getScanMetrics();
assertEquals("Did not access all the regions in the table", numOfRegions,
scanMetrics.countOfRegions.get());
@@ -5279,7 +5308,7 @@ public class TestFromClientSide {
*/
@Test
public void testCacheOnWriteEvictOnClose() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [] data = Bytes.toBytes("data");
try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
@@ -5403,7 +5432,7 @@ public class TestFromClientSide {
*/
public void testNonCachedGetRegionLocation() throws Exception {
// Test Initialization.
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [] family1 = Bytes.toBytes("f1");
byte [] family2 = Bytes.toBytes("f2");
try (Table table = TEST_UTIL.createTable(tableName, new byte[][] {family1, family2}, 10);
@@ -5452,7 +5481,7 @@ public class TestFromClientSide {
// Test Initialization.
byte [] startKey = Bytes.toBytes("ddc");
byte [] endKey = Bytes.toBytes("mmm");
- TableName tableName = TableName.valueOf(name.getMethodName());
+ TableName tableName = name.getTableName();
TEST_UTIL.createMultiRegionTable(tableName, new byte[][] { FAMILY }, 10);
int numOfRegions = -1;
@@ -5522,7 +5551,7 @@ public class TestFromClientSide {
@Test
public void testJira6912() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table foo = TEST_UTIL.createTable(tableName, new byte[][] {FAMILY}, 10)) {
List<Put> puts = new ArrayList<Put>();
@@ -5551,7 +5580,7 @@ public class TestFromClientSide {
@Test
public void testScan_NullQualifier() throws IOException {
- try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+ try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE);
table.put(put);
@@ -5581,7 +5610,7 @@ public class TestFromClientSide {
@Test
public void testNegativeTimestamp() throws IOException {
- try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+ try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
try {
Put put = new Put(ROW, -1);
@@ -5642,7 +5671,7 @@ public class TestFromClientSide {
@Test
public void testRawScanRespectsVersions() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
byte[] row = Bytes.toBytes("row");
@@ -5716,7 +5745,7 @@ public class TestFromClientSide {
@Test
public void testEmptyFilterList() throws Exception {
// Test Initialization.
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
// Insert one row each region
@@ -5756,7 +5785,7 @@ public class TestFromClientSide {
@Test
public void testSmallScan() throws Exception {
// Test Initialization.
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
// Insert one row each region
@@ -5794,7 +5823,7 @@ public class TestFromClientSide {
@Test
public void testSuperSimpleWithReverseScan() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
Put put = new Put(Bytes.toBytes("0-b11111-0000000000000000000"));
put.addColumn(FAMILY, QUALIFIER, VALUE);
@@ -5839,7 +5868,7 @@ public class TestFromClientSide {
@Test
public void testFiltersWithReverseScan() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
byte[][] ROWS = makeN(ROW, 10);
byte[][] QUALIFIERS = {Bytes.toBytes("col0-<d2v1>-<d3v2>"),
@@ -5882,7 +5911,7 @@ public class TestFromClientSide {
@Test
public void testKeyOnlyFilterWithReverseScan() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
byte[][] ROWS = makeN(ROW, 10);
byte[][] QUALIFIERS = {Bytes.toBytes("col0-<d2v1>-<d3v2>"),
@@ -5923,7 +5952,7 @@ public class TestFromClientSide {
*/
@Test
public void testSimpleMissingWithReverseScan() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
byte[][] ROWS = makeN(ROW, 4);
@@ -5988,7 +6017,7 @@ public class TestFromClientSide {
@Test
public void testNullWithReverseScan() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
// Null qualifier (should work)
Put put = new Put(ROW);
@@ -6001,7 +6030,8 @@ public class TestFromClientSide {
}
// Use a new table
- try (Table ht = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName() + "2"), FAMILY)) {
+ try (Table ht =
+ TEST_UTIL.createTable(TableName.valueOf(name.getTableName().toString() + "2"), FAMILY)) {
// Empty qualifier, byte[0] instead of null (should work)
Put put = new Put(ROW);
put.addColumn(FAMILY, HConstants.EMPTY_BYTE_ARRAY, VALUE);
@@ -6027,7 +6057,7 @@ public class TestFromClientSide {
@Test
@SuppressWarnings("checkstyle:MethodLength")
public void testDeletesWithReverseScan() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte[][] ROWS = makeNAscii(ROW, 6);
byte[][] FAMILIES = makeNAscii(FAMILY, 3);
byte[][] VALUES = makeN(VALUE, 5);
@@ -6213,7 +6243,7 @@ public class TestFromClientSide {
@Test
public void testReversedScanUnderMultiRegions() throws Exception {
// Test Initialization.
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte[] maxByteArray = ConnectionUtils.MAX_BYTE_ARRAY;
byte[][] splitRows = new byte[][] { Bytes.toBytes("005"),
Bytes.add(Bytes.toBytes("005"), Bytes.multiple(maxByteArray, 16)),
@@ -6274,7 +6304,7 @@ public class TestFromClientSide {
@Test
public void testSmallReversedScanUnderMultiRegions() throws Exception {
// Test Initialization.
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte[][] splitRows = new byte[][]{
Bytes.toBytes("000"), Bytes.toBytes("002"), Bytes.toBytes("004"),
Bytes.toBytes("006"), Bytes.toBytes("008"), Bytes.toBytes("010")};
@@ -6494,7 +6524,7 @@ public class TestFromClientSide {
@Test
public void testDeleteSpecifiedVersionOfSpecifiedColumn() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte[][] VALUES = makeN(VALUE, 5);
long[] ts = {1000, 2000, 3000, 4000, 5000};
@@ -6540,7 +6570,7 @@ public class TestFromClientSide {
@Test
public void testDeleteLatestVersionOfSpecifiedColumn() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte[][] VALUES = makeN(VALUE, 5);
long[] ts = {1000, 2000, 3000, 4000, 5000};
@@ -6603,7 +6633,7 @@ public class TestFromClientSide {
@Test
public void testReadWithFilter() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 3)) {
byte[] VALUEA = Bytes.toBytes("value-a");
@@ -6690,7 +6720,7 @@ public class TestFromClientSide {
@Test
public void testCellUtilTypeMethods() throws IOException {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
final byte[] row = Bytes.toBytes("p");
@@ -6744,7 +6774,7 @@ public class TestFromClientSide {
@Test(expected = DoNotRetryIOException.class)
public void testCreateTableWithZeroRegionReplicas() throws Exception {
- TableName tableName = TableName.valueOf(name.getMethodName());
+ TableName tableName = name.getTableName();
TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("cf")))
.setRegionReplication(0)
@@ -6755,7 +6785,7 @@ public class TestFromClientSide {
@Test(expected = DoNotRetryIOException.class)
public void testModifyTableWithZeroRegionReplicas() throws Exception {
- TableName tableName = TableName.valueOf(name.getMethodName());
+ TableName tableName = name.getTableName();
TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("cf")))
.build();
@@ -6770,13 +6800,13 @@ public class TestFromClientSide {
@Test(timeout = 60000)
public void testModifyTableWithMemstoreData() throws Exception {
- TableName tableName = TableName.valueOf(name.getMethodName());
+ TableName tableName = name.getTableName();
createTableAndValidateTableSchemaModification(tableName, true);
}
@Test(timeout = 60000)
public void testDeleteCFWithMemstoreData() throws Exception {
- TableName tableName = TableName.valueOf(name.getMethodName());
+ TableName tableName = name.getTableName();
createTableAndValidateTableSchemaModification(tableName, false);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
index 37d0135..8845f9a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
@@ -17,14 +17,16 @@
*/
package org.apache.hadoop.hbase.client;
+import java.util.Arrays;
+import java.util.Collection;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
/**
* Test all client operations with a coprocessor that just implements the default flush/compact/scan
@@ -32,13 +34,24 @@ import org.junit.experimental.categories.Category;
*/
@Category({ LargeTests.class, ClientTests.class })
public class TestFromClientSideWithCoprocessor extends TestFromClientSide {
-
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFromClientSideWithCoprocessor.class);
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- initialize(MultiRowMutationEndpoint.class, NoOpScanPolicyObserver.class);
+ // Override the parameters from the parent class. We just want to run it for the default
+ // param combination.
+ @Parameterized.Parameters
+ public static Collection parameters() {
+ return Arrays.asList(new Object[][] {
+ { ZKConnectionRegistry.class, 1}
+ });
+ }
+
+ public TestFromClientSideWithCoprocessor(Class registry, int numHedgedReqs) throws Exception {
+ if (TEST_UTIL == null) {
+ // It is ok to initialize once because the test is parameterized for a single dimension.
+ initialize(registry, numHedgedReqs, NoOpScanPolicyObserver.class,
+ MultiRowMutationEndpoint.class);
+ }
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
new file mode 100644
index 0000000..335f968
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
+import static org.junit.Assert.assertEquals;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestMasterRegistry {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMasterRegistry.class);
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
+ StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
+ builder.numMasters(3).numRegionServers(3);
+ TEST_UTIL.startMiniCluster(builder.build());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Generates a string of dummy master addresses in host:port format. Every other hostname won't
+ * have a port number.
+ */
+ private static String generateDummyMastersList(int size) {
+ List<String> masters = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ masters.add(" localhost" + (i % 2 == 0 ? ":" + (1000 + i) : ""));
+ }
+ return String.join(",", masters);
+ }
+
+ /**
+ * Makes sure the master registry parses the master end points in the configuration correctly.
+ */
+ @Test public void testMasterAddressParsing() {
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ int numMasters = 10;
+ conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters));
+ try (MasterRegistry registry = new MasterRegistry(conf)) {
+ List<ServerName> parsedMasters = new ArrayList<>(registry.getParsedMasterServers());
+ // Half of them would be without a port, duplicates are removed.
+ assertEquals(numMasters/2 + 1, parsedMasters.size());
+ // Sort in the increasing order of port numbers.
+ Collections.sort(parsedMasters, Comparator.comparingInt(ServerName::getPort));
+ for (int i = 0; i < parsedMasters.size(); i++) {
+ ServerName sn = parsedMasters.get(i);
+ assertEquals("localhost", sn.getHostname());
+ if (i == parsedMasters.size() - 1) {
+ // Last entry should be the one with default port.
+ assertEquals(HConstants.DEFAULT_MASTER_PORT, sn.getPort());
+ } else {
+ assertEquals(1000 + (2 * i), sn.getPort());
+ }
+ }
+ }
+ }
+
+ @Test public void testRegistryRPCs() throws Exception {
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
+ for (int numHedgedReqs = 1; numHedgedReqs <=3; numHedgedReqs++) {
+ if (numHedgedReqs == 1) {
+ conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
+ } else {
+ conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
+ }
+ conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
+ try (MasterRegistry registry = new MasterRegistry(conf)) {
+ assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
+ assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
+ List<HRegionLocation> metaLocations =
+ Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
+ List<HRegionLocation> actualMetaLocations = activeMaster.getMetaRegionLocationCache()
+ .getMetaRegionLocations().get();
+ Collections.sort(metaLocations);
+ Collections.sort(actualMetaLocations);
+ assertEquals(actualMetaLocations, metaLocations);
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index c3da587..796ebb3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -29,9 +29,10 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -44,35 +45,38 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.TestTableName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
- * A client-side test, mostly testing scanners with various parameters.
+ * A client-side test, mostly testing scanners with various parameters. Parameterized on different
+ * registry implementations.
*/
@Category({MediumTests.class, ClientTests.class})
+@RunWith(Parameterized.class)
public class TestScannersFromClientSide {
@ClassRule
@@ -81,38 +85,80 @@ public class TestScannersFromClientSide {
private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class);
- private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static HBaseTestingUtility TEST_UTIL;
private static byte [] ROW = Bytes.toBytes("testRow");
private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte [] VALUE = Bytes.toBytes("testValue");
@Rule
- public TestName name = new TestName();
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- Configuration conf = TEST_UTIL.getConfiguration();
- conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
- TEST_UTIL.startMiniCluster(3);
- }
+ public TestTableName name = new TestTableName();
@AfterClass
public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
+ if (TEST_UTIL != null) {
+ TEST_UTIL.shutdownMiniCluster();
+ }
}
- @Before
- public void setUp() throws Exception {
- // Nothing to do.
+ @Parameterized.Parameters
+ public static Collection parameters() {
+ return Arrays.asList(new Object[][] {
+ { MasterRegistry.class, 1},
+ { MasterRegistry.class, 2},
+ { ZKConnectionRegistry.class, 1}
+ });
}
/**
- * @throws java.lang.Exception
+ * JUnit does not provide an easy way to run a hook after each parameterized run. Without that
+ * there is no easy way to restart the test cluster after each parameterized run. Annotation
+ * BeforeParam does not work either because it runs before parameterization and hence does not
+ * have access to the test parameters (which is weird).
+ *
+ * This *hack* checks if the current instance of test cluster configuration has the passed
+ * parameterized configs. In such a case, we can just reuse the cluster for test and do not need
+ * to initialize from scratch. While this is a hack, it saves a ton of time for the full
+ * test and de-flakes it.
*/
- @After
- public void tearDown() throws Exception {
- // Nothing to do.
+ private static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) {
+ // initialize() is called for every unit test, however we only want to reset the cluster state
+ // at the end of every parameterized run.
+ if (TEST_UTIL == null) {
+ return false;
+ }
+ Configuration conf = TEST_UTIL.getConfiguration();
+ Class confClass = conf.getClass(
+ HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
+ int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
+ HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
+ return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
+ }
+
+ public TestScannersFromClientSide(Class registryImpl, int numHedgedReqs) throws Exception {
+ if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
+ return;
+ }
+ if (TEST_UTIL != null) {
+ // We reached the end of a parameterized run, clean up the cluster.
+ TEST_UTIL.shutdownMiniCluster();
+ }
+ TEST_UTIL = new HBaseTestingUtility();
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
+ conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
+ ConnectionRegistry.class);
+ if (numHedgedReqs == 1) {
+ conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
+ } else {
+ Preconditions.checkArgument(numHedgedReqs > 1);
+ conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
+ }
+ conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
+ StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
+ // Multiple masters needed only when hedged reads for master registry are enabled.
+ builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3);
+ TEST_UTIL.startMiniCluster(builder.build());
}
/**
@@ -120,7 +166,7 @@ public class TestScannersFromClientSide {
*/
@Test
public void testScanBatch() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
Table ht = TEST_UTIL.createTable(tableName, FAMILY);
@@ -190,7 +236,7 @@ public class TestScannersFromClientSide {
@Test
public void testMaxResultSizeIsSetToDefault() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
Table ht = TEST_UTIL.createTable(tableName, FAMILY);
// The max result size we expect the scan to use by default.
@@ -259,7 +305,7 @@ public class TestScannersFromClientSide {
@Test
public void testSmallScan() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
int numRows = 10;
byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
@@ -292,7 +338,8 @@ public class TestScannersFromClientSide {
/**
* Run through a variety of test configurations with a small scan
*/
- private void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception {
+ private void testSmallScan(
+ Table table, boolean reversed, int rows, int columns) throws Exception {
Scan baseScan = new Scan();
baseScan.setReversed(reversed);
baseScan.setSmall(true);
@@ -334,7 +381,7 @@ public class TestScannersFromClientSide {
*/
@Test
public void testGetMaxResults() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
@@ -408,8 +455,8 @@ public class TestScannersFromClientSide {
kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
}
for (int i=0; i < 2; i++) {
- kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
- }
+ kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
+ }
for (int i=10; i < 20; i++) {
kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
}
@@ -452,7 +499,7 @@ public class TestScannersFromClientSide {
*/
@Test
public void testScanMaxResults() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] ROWS = HTestConst.makeNAscii(ROW, 2);
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
@@ -500,7 +547,7 @@ public class TestScannersFromClientSide {
*/
@Test
public void testGetRowOffset() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
@@ -519,7 +566,9 @@ public class TestScannersFromClientSide {
KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
put.add(kv);
// skipping first two kvs
- if (i < 2) continue;
+ if (i < 2) {
+ continue;
+ }
kvListExp.add(kv);
}
ht.put(put);
@@ -590,7 +639,7 @@ public class TestScannersFromClientSide {
@Test
public void testScanRawDeleteFamilyVersion() throws Exception {
- TableName tableName = TableName.valueOf(name.getMethodName());
+ TableName tableName = name.getTableName();
TEST_UTIL.createTable(tableName, FAMILY);
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.set(RPC_CODEC_CONF_KEY, "");
@@ -618,7 +667,7 @@ public class TestScannersFromClientSide {
*/
@Test
public void testScanOnReopenedRegion() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName = name.getTableName();
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2);
Table ht = TEST_UTIL.createTable(tableName, FAMILY);
@@ -693,8 +742,9 @@ public class TestScannersFromClientSide {
LOG.info(msg);
LOG.info("Expected count: " + expKvList.size());
LOG.info("Actual count: " + result.size());
- if (expKvList.isEmpty())
+ if (expKvList.isEmpty()) {
return;
+ }
int i = 0;
for (Cell kv : result.rawCells()) {
@@ -715,7 +765,7 @@ public class TestScannersFromClientSide {
@Test
public void testReadExpiredDataForRawScan() throws IOException {
- TableName tableName = TableName.valueOf(name.getMethodName());
+ TableName tableName = name.getTableName();
long ts = System.currentTimeMillis() - 10000;
byte[] value = Bytes.toBytes("expired");
try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
@@ -735,7 +785,7 @@ public class TestScannersFromClientSide {
@Test
public void testScanWithColumnsAndFilterAndVersion() throws IOException {
- TableName tableName = TableName.valueOf(name.getMethodName());
+ TableName tableName = name.getTableName();
try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) {
for (int i = 0; i < 4; i++) {
Put put = new Put(ROW);
@@ -757,7 +807,7 @@ public class TestScannersFromClientSide {
@Test
public void testScanWithSameStartRowStopRow() throws IOException {
- TableName tableName = TableName.valueOf(name.getMethodName());
+ TableName tableName = name.getTableName();
try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
@@ -794,7 +844,7 @@ public class TestScannersFromClientSide {
@Test
public void testReverseScanWithFlush() throws Exception {
- TableName tableName = TableName.valueOf(name.getMethodName());
+ TableName tableName = name.getTableName();
final int BATCH_SIZE = 10;
final int ROWS_TO_INSERT = 100;
final byte[] LARGE_VALUE = generateHugeValue(128 * 1024);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index ac0d356..2797df3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -30,22 +30,31 @@ import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Assume;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@@ -54,14 +63,6 @@ import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseReq
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.util.StringUtils;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
* Some basic ipc tests.
@@ -232,7 +233,6 @@ public abstract class AbstractTestIPC {
/**
* Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
* remoteAddress set to its Call Object
- * @throws ServiceException
*/
@Test
public void testRpcServerForNotNullRemoteAddressInCallObject()
@@ -363,6 +363,104 @@ public abstract class AbstractTestIPC {
}
}
+ /**
+ * Tests the various request fan out values using a simple RPC hedged across a mix of running and
+ * failing servers.
+ */
+ @Test
+ public void testHedgedAsyncEcho() throws Exception {
+ // Hedging is not supported for blocking connection types.
+ Assume.assumeFalse(this instanceof TestBlockingIPC);
+ List<RpcServer> rpcServers = new ArrayList<>();
+ List<InetSocketAddress> addresses = new ArrayList<>();
+ // Create a mix of running and failing servers.
+ final int numRunningServers = 5;
+ final int numFailingServers = 3;
+ final int numServers = numRunningServers + numFailingServers;
+ for (int i = 0; i < numRunningServers; i++) {
+ RpcServer rpcServer = createRpcServer(null, "testRpcServer" + i,
+ Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
+ SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
+ new FifoRpcScheduler(CONF, 1));
+ rpcServer.start();
+ addresses.add(rpcServer.getListenerAddress());
+ rpcServers.add(rpcServer);
+ }
+ for (int i = 0; i < numFailingServers; i++) {
+ RpcServer rpcServer = createTestFailingRpcServer(null, "testFailingRpcServer" + i,
+ Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
+ SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
+ new FifoRpcScheduler(CONF, 1));
+ rpcServer.start();
+ addresses.add(rpcServer.getListenerAddress());
+ rpcServers.add(rpcServer);
+ }
+ Configuration conf = HBaseConfiguration.create();
+ try (AbstractRpcClient<?> client = createRpcClient(conf)) {
+ // Try out various fan out values starting from 1 -> numServers.
+ for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
+ // Update the client's underlying conf, should be ok for the test.
+ LOG.debug("Testing with request fan out: " + reqFanOut);
+ conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
+ Interface stub = newStub(client, addresses);
+ BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();
+ stub.echo(new HBaseRpcControllerImpl(),
+ EchoRequestProto.newBuilder().setMessage("hello").build(), done);
+ TestProtos.EchoResponseProto responseProto = done.get();
+ assertNotNull(responseProto);
+ assertEquals("hello", responseProto.getMessage());
+ LOG.debug("Ended test with request fan out: " + reqFanOut);
+ }
+ } finally {
+ for (RpcServer rpcServer: rpcServers) {
+ rpcServer.stop();
+ }
+ }
+ }
+
+ @Test
+ public void testHedgedAsyncTimeouts() throws Exception {
+ // Hedging is not supported for blocking connection types.
+ Assume.assumeFalse(this instanceof TestBlockingIPC);
+ List<RpcServer> rpcServers = new ArrayList<>();
+ List<InetSocketAddress> addresses = new ArrayList<>();
+ final int numServers = 3;
+ for (int i = 0; i < numServers; i++) {
+ RpcServer rpcServer = createRpcServer(null, "testTimeoutRpcServer" + i,
+ Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
+ SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
+ new FifoRpcScheduler(CONF, 1));
+ rpcServer.start();
+ addresses.add(rpcServer.getListenerAddress());
+ rpcServers.add(rpcServer);
+ }
+ Configuration conf = HBaseConfiguration.create();
+ int timeout = 100;
+ int pauseTime = 1000;
+ try (AbstractRpcClient<?> client = createRpcClient(conf)) {
+ // Try out various fan out values starting from 1 -> numServers.
+ for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
+ // Update the client's underlying conf, should be ok for the test.
+ LOG.debug("Testing with request fan out: " + reqFanOut);
+ conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
+ Interface stub = newStub(client, addresses);
+ HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+ pcrc.setCallTimeout(timeout);
+ BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
+ stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(pauseTime).build(), callback);
+ assertNull(callback.get());
+ // Make sure the controller has the right exception propagated.
+ assertTrue(pcrc.getFailed() instanceof CallTimeoutException);
+ LOG.debug("Ended test with request fan out: " + reqFanOut);
+ }
+ } finally {
+ for (RpcServer rpcServer: rpcServers) {
+ rpcServer.stop();
+ }
+ }
+ }
+
+
@Test
public void testAsyncRemoteError() throws IOException {
AbstractRpcClient<?> client = createRpcClient(CONF);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
index d8a2d34..6adfa46 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
@@ -17,21 +17,23 @@
*/
package org.apache.hadoop.hbase.ipc;
-import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
-
+import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -41,8 +43,6 @@ import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseReq
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.Threads;
@InterfaceAudience.Private
public class TestProtobufRpcServiceImpl implements BlockingInterface {
@@ -67,6 +67,17 @@ public class TestProtobufRpcServiceImpl implements BlockingInterface {
User.getCurrent(), 0));
}
+ public static Interface newStub(RpcClient client, List<InetSocketAddress> addrs)
+ throws IOException {
+ Set<ServerName> serverNames = new HashSet<>();
+ for (InetSocketAddress addr: addrs) {
+ serverNames.add(ServerName.valueOf(
+ addr.getHostName(), addr.getPort(), System.currentTimeMillis()));
+ }
+ return TestProtobufRpcProto.newStub(client.createHedgedRpcChannel(
+ serverNames, User.getCurrent(), 0));
+ }
+
@Override
public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
throws ServiceException {