You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2023/03/15 15:07:59 UTC
[hbase] branch branch-2 updated: HBASE-27652 Client-side lock contention around Configuration when using read replica regions (#5036)
This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new dbb78388e5e HBASE-27652 Client-side lock contention around Configuration when using read replica regions (#5036)
dbb78388e5e is described below
commit dbb78388e5e32bd08409f9a670c8feadce82627c
Author: Nick Dimiduk <nd...@apache.org>
AuthorDate: Wed Mar 15 16:07:48 2023 +0100
HBASE-27652 Client-side lock contention around Configuration when using read replica regions (#5036)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../hbase/client/ConnectionImplementation.java | 8 ++---
.../org/apache/hadoop/hbase/client/HTable.java | 4 +--
.../hadoop/hbase/client/HTableMultiplexer.java | 4 ++-
.../hbase/client/RpcRetryingCallerFactory.java | 38 +++++++++++++---------
.../client/RpcRetryingCallerWithReadReplicas.java | 3 +-
.../hbase/client/ScannerCallableWithReplicas.java | 16 ++++++---
.../hadoop/hbase/client/SecureBulkLoadClient.java | 4 +--
.../hadoop/hbase/client/TestAsyncProcess.java | 9 +++--
.../TestAsyncProcessWithRegionException.java | 7 ++--
.../hadoop/hbase/client/TestClientScanner.java | 10 ++++--
...HRegionServerBulkLoadWithOldSecureEndpoint.java | 3 +-
.../RegionReplicaReplicationEndpoint.java | 5 +--
.../hadoop/hbase/tool/LoadIncrementalHFiles.java | 2 +-
.../hbase/client/HConnectionTestingUtility.java | 11 ++++---
.../hbase/client/TestConnectionImplementation.java | 4 +--
.../hbase/client/TestHBaseAdminNoCluster.java | 5 ++-
.../hbase/client/TestReplicaWithCluster.java | 3 +-
.../hbase/quotas/TestLowLatencySpaceQuotas.java | 5 ++-
.../hbase/quotas/TestSpaceQuotaOnBulkLoad.java | 9 +++--
.../hadoop/hbase/quotas/TestSpaceQuotas.java | 9 +++--
.../regionserver/TestHRegionServerBulkLoad.java | 3 +-
.../TestHRegionServerBulkLoadWithOldClient.java | 3 +-
...stRegionReplicaReplicationEndpointNoMaster.java | 5 +--
23 files changed, 111 insertions(+), 59 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 0cf154bd2eb..2340f1f545a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -344,8 +344,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
- this.rpcCallerFactory =
- RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats, this.metrics);
+ this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, connectionConfig,
+ interceptor, this.stats, this.metrics);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
// Do we publish the status?
@@ -2250,8 +2250,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
@Override
public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
- return RpcRetryingCallerFactory.instantiate(conf, this.interceptor, this.getStatisticsTracker(),
- metrics);
+ return RpcRetryingCallerFactory.instantiate(conf, connectionConfig, this.interceptor,
+ this.getStatisticsTracker(), metrics);
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 80325abd7f9..44761c2fbf1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -1305,8 +1305,8 @@ public class HTable implements Table {
final List<String> callbackErrorServers = new ArrayList<>();
Object[] results = new Object[execs.size()];
- AsyncProcess asyncProcess = new AsyncProcess(
- connection, configuration, RpcRetryingCallerFactory.instantiate(configuration,
+ AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
+ RpcRetryingCallerFactory.instantiate(configuration, connConfiguration,
connection.getStatisticsTracker(), connection.getConnectionMetrics()),
RpcControllerFactory.instantiate(configuration));
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index 6b54149f9f9..e81be5bcb25 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -423,8 +423,10 @@ public class HTableMultiplexer {
this.addr = addr;
this.multiplexer = htableMultiplexer;
this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
+ final ConnectionConfiguration connectionConfig =
+ conn != null ? conn.getConnectionConfiguration() : new ConnectionConfiguration(conf);
RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf,
- conn == null ? null : conn.getConnectionMetrics());
+ connectionConfig, conn == null ? null : conn.getConnectionMetrics());
RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index 3e8545f6a38..c062ad43e25 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import com.google.errorprone.annotations.RestrictedApi;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -29,20 +30,18 @@ public class RpcRetryingCallerFactory {
/** Configuration key for a custom {@link RpcRetryingCaller} */
public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class";
- protected final Configuration conf;
private final ConnectionConfiguration connectionConf;
private final RetryingCallerInterceptor interceptor;
private final int startLogErrorsCnt;
private final MetricsConnection metrics;
- public RpcRetryingCallerFactory(Configuration conf) {
- this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
+ public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf) {
+ this(conf, connectionConf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
}
- public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor,
- MetricsConnection metrics) {
- this.conf = conf;
- this.connectionConf = new ConnectionConfiguration(conf);
+ public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf,
+ RetryingCallerInterceptor interceptor, MetricsConnection metrics) {
+ this.connectionConf = connectionConf;
startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
this.interceptor = interceptor;
@@ -71,30 +70,39 @@ public class RpcRetryingCallerFactory {
interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout(), metrics);
}
+ @RestrictedApi(explanation = "Should only be called on process initialization", link = "",
+ allowedOnPath = ".*/(HRegionServer|LoadIncrementalHFiles|SecureBulkLoadClient)\\.java")
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
MetricsConnection metrics) {
- return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null,
- metrics);
+ return instantiate(configuration, new ConnectionConfiguration(configuration), metrics);
}
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
- ServerStatisticTracker stats, MetricsConnection metrics) {
- return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats,
- metrics);
+ ConnectionConfiguration connectionConf, MetricsConnection metrics) {
+ return instantiate(configuration, connectionConf,
+ RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, metrics);
}
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
- RetryingCallerInterceptor interceptor, ServerStatisticTracker stats,
+ ConnectionConfiguration connectionConf, ServerStatisticTracker stats,
MetricsConnection metrics) {
+ return instantiate(configuration, connectionConf,
+ RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats, metrics);
+ }
+
+ public static RpcRetryingCallerFactory instantiate(Configuration configuration,
+ ConnectionConfiguration connectionConf, RetryingCallerInterceptor interceptor,
+ ServerStatisticTracker stats, MetricsConnection metrics) {
String clazzName = RpcRetryingCallerFactory.class.getName();
String rpcCallerFactoryClazz =
configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
RpcRetryingCallerFactory factory;
if (rpcCallerFactoryClazz.equals(clazzName)) {
- factory = new RpcRetryingCallerFactory(configuration, interceptor, metrics);
+ factory = new RpcRetryingCallerFactory(configuration, connectionConf, interceptor, metrics);
} else {
factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
- new Class[] { Configuration.class }, new Object[] { configuration });
+ new Class[] { Configuration.class, ConnectionConfiguration.class },
+ new Object[] { configuration, connectionConf });
}
return factory;
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 5b2208c1cc7..30718742ef7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -81,7 +81,8 @@ public class RpcRetryingCallerWithReadReplicas {
this.operationTimeout = operationTimeout;
this.rpcTimeout = rpcTimeout;
this.timeBeforeReplicas = timeBeforeReplicas;
- this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf);
+ this.rpcRetryingCallerFactory =
+ new RpcRetryingCallerFactory(conf, cConnection.getConnectionConfiguration());
}
/**
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 27cc4d15126..227ad849c84 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -55,7 +55,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class);
volatile ScannerCallable currentScannerCallable;
AtomicBoolean replicaSwitched = new AtomicBoolean(false);
- final ClusterConnection cConnection;
+ private final ClusterConnection cConnection;
protected final ExecutorService pool;
protected final int timeBeforeReplicas;
private final Scan scan;
@@ -175,12 +175,15 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
}
regionReplication = rl.size();
}
- // allocate a boundedcompletion pool of some multiple of number of replicas.
- // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
+ // allocate a bounded-completion pool of some multiple of number of replicas.
+ // We want to accommodate some RPCs for redundant replica scans (but are still in progress)
+ final ConnectionConfiguration connectionConfig = cConnection != null
+ ? cConnection.getConnectionConfiguration()
+ : new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf);
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
new ResultBoundedCompletionService<>(
RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf,
- cConnection == null ? null : cConnection.getConnectionMetrics()),
+ connectionConfig, cConnection == null ? null : cConnection.getConnectionMetrics()),
pool, regionReplication * 5);
AtomicBoolean done = new AtomicBoolean(false);
@@ -382,9 +385,12 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// and we can't invoke it multiple times at the same time)
this.caller = ScannerCallableWithReplicas.this.caller;
if (scan.getConsistency() == Consistency.TIMELINE) {
+ final ConnectionConfiguration connectionConfig = cConnection != null
+ ? cConnection.getConnectionConfiguration()
+ : new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf);
this.caller =
RpcRetryingCallerFactory
- .instantiate(ScannerCallableWithReplicas.this.conf,
+ .instantiate(ScannerCallableWithReplicas.this.conf, connectionConfig,
cConnection == null ? null : cConnection.getConnectionMetrics())
.<Result[]> newCaller();
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index 1838d78eb9f..825a58e7bdd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -69,7 +69,7 @@ public class SecureBulkLoadClient {
return response.getBulkToken();
}
};
- return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null)
+ return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
.<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
} catch (Throwable throwable) {
throw new IOException(throwable);
@@ -91,7 +91,7 @@ public class SecureBulkLoadClient {
return null;
}
};
- RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null).<Void> newCaller()
+ RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null).<Void> newCaller()
.callWithRetries(callable, Integer.MAX_VALUE);
} catch (Throwable throwable) {
throw new IOException(throwable);
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index cbb02f59a07..b0ddd83efd3 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -184,13 +184,15 @@ public class TestAsyncProcess {
}
public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
- super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
+ super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
+ new RpcControllerFactory(conf));
service = Executors.newFixedThreadPool(5);
this.conf = conf;
}
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
- super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
+ super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
+ new RpcControllerFactory(conf));
service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
new CountingThreadFactory(nbThreads));
}
@@ -1702,7 +1704,8 @@ public class TestAsyncProcess {
static class AsyncProcessForThrowableCheck extends AsyncProcess {
public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
- super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
+ super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
+ new RpcControllerFactory(conf));
}
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
index 98c13761262..e63215c8ea5 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
@@ -68,6 +68,8 @@ public class TestAsyncProcessWithRegionException {
private static final Result EMPTY_RESULT = Result.create(null, true);
private static final IOException IOE = new IOException("YOU CAN'T PASS");
private static final Configuration CONF = new Configuration();
+ private static final ConnectionConfiguration CONNECTION_CONFIG =
+ new ConnectionConfiguration(CONF);
private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW");
private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW");
@@ -175,7 +177,7 @@ public class TestAsyncProcessWithRegionException {
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
Mockito.when(hc.getConfiguration()).thenReturn(CONF);
- Mockito.when(hc.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF));
+ Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG);
setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION));
setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION));
Mockito
@@ -196,7 +198,8 @@ public class TestAsyncProcessWithRegionException {
private final ExecutorService service = Executors.newFixedThreadPool(5);
MyAsyncProcess(ClusterConnection hc, Configuration conf) {
- super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
+ super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
+ new RpcControllerFactory(conf));
}
public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
index ffc9caa27d7..16f4f687dfe 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
@@ -70,6 +70,7 @@ public class TestClientScanner {
Scan scan;
ExecutorService pool;
Configuration conf;
+ ConnectionConfiguration connectionConfig;
ClusterConnection clusterConn;
RpcRetryingCallerFactory rpcFactory;
@@ -86,7 +87,9 @@ public class TestClientScanner {
pool = Executors.newSingleThreadExecutor();
scan = new Scan();
conf = new Configuration();
+ connectionConfig = new ConnectionConfiguration(conf);
Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
+ Mockito.when(clusterConn.getConnectionConfiguration()).thenReturn(connectionConfig);
}
@After
@@ -473,7 +476,7 @@ public class TestClientScanner {
// Mock a caller which calls the callable for ScannerCallableWithReplicas,
// but throws an exception for the actual scanner calls via callWithRetries.
- rpcFactory = new MockRpcRetryingCallerFactory(conf);
+ rpcFactory = new MockRpcRetryingCallerFactory(conf, connectionConfig);
conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
MockRpcRetryingCallerFactory.class.getName());
@@ -496,8 +499,9 @@ public class TestClientScanner {
public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory {
- public MockRpcRetryingCallerFactory(Configuration conf) {
- super(conf);
+ public MockRpcRetryingCallerFactory(Configuration conf,
+ ConnectionConfiguration connectionConf) {
+ super(conf, connectionConf);
}
@Override
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
index 60aa57c7e68..5ffd7b92b79 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
@@ -125,7 +125,8 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
return null;
}
};
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
+ RpcRetryingCallerFactory factory =
+ new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
caller.callWithRetries(callable, Integer.MAX_VALUE);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index cf0f69372d3..754811ce0e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -390,8 +390,9 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
this.sink = sink;
this.connection = connection;
this.operationTimeout = operationTimeout;
- this.rpcRetryingCallerFactory = RpcRetryingCallerFactory
- .instantiate(connection.getConfiguration(), connection.getConnectionMetrics());
+ this.rpcRetryingCallerFactory =
+ RpcRetryingCallerFactory.instantiate(connection.getConfiguration(),
+ connection.getConnectionConfiguration(), connection.getConnectionMetrics());
this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
this.pool = pool;
this.tableDescriptors = tableDescriptors;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index 0343c14808a..76b528cc950 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -872,7 +872,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
List<LoadQueueItem> toRetry = new ArrayList<>();
try {
Configuration conf = getConf();
- byte[] region = RpcRetryingCallerFactory.instantiate(conf, null, null).<byte[]> newCaller()
+ byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller()
.callWithRetries(serviceCallable, Integer.MAX_VALUE);
if (region == null) {
LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index 139d8bf8b66..a1ff19c2faa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -62,12 +62,14 @@ public class HConnectionTestingUtility {
*/
public static ClusterConnection getMockedConnection(final Configuration conf)
throws ZooKeeperConnectionException {
+ ConnectionConfiguration connectionConfig = new ConnectionConfiguration(conf);
ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class);
Mockito.when(connection.getConfiguration()).thenReturn(conf);
+ Mockito.when(connection.getConnectionConfiguration()).thenReturn(connectionConfig);
Mockito.when(connection.getRpcControllerFactory())
.thenReturn(Mockito.mock(RpcControllerFactory.class));
// we need a real retrying caller
- RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf);
+ RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf, connectionConfig);
Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory);
return connection;
}
@@ -123,11 +125,12 @@ public class HConnectionTestingUtility {
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
AsyncProcess asyncProcess = new AsyncProcess(c, conf,
- RpcRetryingCallerFactory.instantiate(conf, c.getConnectionMetrics()),
+ RpcRetryingCallerFactory.instantiate(conf, connectionConfiguration, c.getConnectionMetrics()),
RpcControllerFactory.instantiate(conf));
Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess);
- Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(RpcRetryingCallerFactory
- .instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null));
+ Mockito.when(c.getNewRpcRetryingCallerFactory(conf))
+ .thenReturn(RpcRetryingCallerFactory.instantiate(conf, connectionConfiguration,
+ RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null));
Mockito.when(c.getRpcControllerFactory()).thenReturn(Mockito.mock(RpcControllerFactory.class));
Table t = Mockito.mock(Table.class);
Mockito.when(c.getTable((TableName) Mockito.any())).thenReturn(t);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java
index 71a56c19418..fd80682d7d3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java
@@ -1138,8 +1138,8 @@ public class TestConnectionImplementation {
private final Class<? extends HBaseServerException> exceptionClass;
- public ThrowingCallerFactory(Configuration conf) {
- super(conf);
+ public ThrowingCallerFactory(Configuration conf, ConnectionConfiguration connectionConfig) {
+ super(conf, connectionConfig);
this.exceptionClass =
conf.getClass("testSpecialPauseException", null, HBaseServerException.class);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java
index 30f235258c6..dd39e192130 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java
@@ -272,6 +272,8 @@ public class TestHBaseAdminNoCluster {
ClusterConnection connection = mock(ClusterConnection.class);
when(connection.getConfiguration()).thenReturn(configuration);
+ ConnectionConfiguration connectionConfig = new ConnectionConfiguration(configuration);
+ when(connection.getConnectionConfiguration()).thenReturn(connectionConfig);
MasterKeepAliveConnection masterAdmin = mock(MasterKeepAliveConnection.class, new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
@@ -287,7 +289,8 @@ public class TestHBaseAdminNoCluster {
when(rpcControllerFactory.newController()).thenReturn(mock(HBaseRpcController.class));
// we need a real retrying caller
- RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(configuration);
+ RpcRetryingCallerFactory callerFactory =
+ new RpcRetryingCallerFactory(configuration, connectionConfig);
when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory);
Admin admin = null;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 448152b454d..1646a3b81a3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -504,7 +504,8 @@ public class TestReplicaWithCluster {
return null;
}
};
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
+ RpcRetryingCallerFactory factory =
+ new RpcRetryingCallerFactory(HTU.getConfiguration(), conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.newCaller();
caller.callWithRetries(callable, 10000);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestLowLatencySpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestLowLatencySpaceQuotas.java
index 3bd7243e5c3..9da13aa5a25 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestLowLatencySpaceQuotas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestLowLatencySpaceQuotas.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClientServiceCallable;
+import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
@@ -236,7 +237,9 @@ public class TestLowLatencySpaceQuotas {
totalSize += file.getLen();
}
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
+ final ClusterConnection clusterConn = (ClusterConnection) conn;
+ RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(),
+ clusterConn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
caller.callWithRetries(callable, Integer.MAX_VALUE);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaOnBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaOnBulkLoad.java
index bae13cce052..0d2dc99e211 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaOnBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaOnBulkLoad.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClientServiceCallable;
+import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -98,7 +99,9 @@ public class TestSpaceQuotaOnBulkLoad {
// The table is now in violation. Try to do a bulk load
ClientServiceCallable<Void> callable = helper.generateFileToLoad(tableName, 1, 50);
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
+ ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
+ RpcRetryingCallerFactory factory =
+ new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
try {
caller.callWithRetries(callable, Integer.MAX_VALUE);
@@ -157,7 +160,9 @@ public class TestSpaceQuotaOnBulkLoad {
LOG.debug(file.getPath() + " -> " + file.getLen() + "B");
}
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
+ ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
+ RpcRetryingCallerFactory factory =
+ new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
try {
caller.callWithRetries(callable, Integer.MAX_VALUE);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
index 99d575eb529..7fad509d81f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ClientServiceCallable;
+import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Increment;
@@ -242,7 +243,9 @@ public class TestSpaceQuotas {
// The table is now in violation. Try to do a bulk load
ClientServiceCallable<Void> callable = helper.generateFileToLoad(tableName, 1, 50);
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
+ ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
+ RpcRetryingCallerFactory factory =
+ new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.newCaller();
try {
caller.callWithRetries(callable, Integer.MAX_VALUE);
@@ -301,7 +304,9 @@ public class TestSpaceQuotas {
LOG.debug(file.getPath() + " -> " + file.getLen() + "B");
}
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
+ ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
+ RpcRetryingCallerFactory factory =
+ new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.newCaller();
try {
caller.callWithRetries(callable, Integer.MAX_VALUE);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index 354e0124b5b..751d782f601 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -230,7 +230,8 @@ public class TestHRegionServerBulkLoad {
return null;
}
};
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
+ RpcRetryingCallerFactory factory =
+ new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
caller.callWithRetries(callable, Integer.MAX_VALUE);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
index 69e4384f566..b4e00a90df2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
@@ -112,7 +112,8 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
return null;
}
};
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
+ RpcRetryingCallerFactory factory =
+ new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
caller.callWithRetries(callable, Integer.MAX_VALUE);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index 7d588712c33..91e04fe6e10 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -209,8 +209,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
locations.getRegionLocation(1), locations.getRegionLocation(1).getRegionInfo(), row,
Lists.newArrayList(entry), new AtomicLong());
- RpcRetryingCallerFactory factory = RpcRetryingCallerFactory
- .instantiate(connection.getConfiguration(), connection.getConnectionMetrics());
+ RpcRetryingCallerFactory factory =
+ RpcRetryingCallerFactory.instantiate(connection.getConfiguration(),
+ connection.getConnectionConfiguration(), connection.getConnectionMetrics());
factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000);
}
}