You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jy...@apache.org on 2014/05/23 08:33:34 UTC
git commit: HBASE-11048 Support setting custom priority per client RPC
Repository: hbase
Updated Branches:
refs/heads/master 0b883059e -> c61cb7fb5
HBASE-11048 Support setting custom priority per client RPC
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c61cb7fb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c61cb7fb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c61cb7fb
Branch: refs/heads/master
Commit: c61cb7fb55124547a36a6ef56afaec43676039f8
Parents: 0b88305
Author: Jesse Yates <jy...@apache.org>
Authored: Thu May 22 23:26:54 2014 -0700
Committer: Jesse Yates <jy...@apache.org>
Committed: Thu May 22 23:33:22 2014 -0700
----------------------------------------------------------------------
.../hadoop/hbase/client/AsyncProcess.java | 8 +-
.../hadoop/hbase/client/ClientScanner.java | 46 +++--
.../client/ClientSmallReversedScanner.java | 2 +-
.../hadoop/hbase/client/ClientSmallScanner.java | 28 ++-
.../hadoop/hbase/client/ConnectionManager.java | 5 +-
.../org/apache/hadoop/hbase/client/HTable.java | 41 ++--
.../hbase/client/MultiServerCallable.java | 9 +-
.../hbase/client/ReversedClientScanner.java | 5 +-
.../hbase/client/ReversedScannerCallable.java | 20 +-
.../hadoop/hbase/client/ScannerCallable.java | 16 +-
.../DelegatingPayloadCarryingRpcController.java | 58 ++++++
.../hadoop/hbase/ipc/RpcControllerFactory.java | 59 ++++++
.../hadoop/hbase/client/TestAsyncProcess.java | 5 +-
.../regionserver/wal/WALEditsReplaySink.java | 5 +-
.../hbase/client/HConnectionTestingUtility.java | 6 +-
.../hbase/client/TestRpcControllerFactory.java | 203 +++++++++++++++++++
16 files changed, 450 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index b31e5e3..433322f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.htrace.Trace;
@@ -120,6 +121,7 @@ class AsyncProcess {
protected final ClusterConnection hConnection;
protected final RpcRetryingCallerFactory rpcCallerFactory;
+ protected final RpcControllerFactory rpcFactory;
protected final BatchErrors globalErrors;
protected final ExecutorService pool;
@@ -188,7 +190,7 @@ class AsyncProcess {
}
public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
- RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors) {
+ RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) {
if (hc == null) {
throw new IllegalArgumentException("HConnection cannot be null.");
}
@@ -242,8 +244,8 @@ class AsyncProcess {
serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
}
-
this.rpcCallerFactory = rpcCaller;
+ this.rpcFactory = rpcFactory;
}
private ExecutorService getPool(ExecutorService pool) {
@@ -950,7 +952,7 @@ class AsyncProcess {
@VisibleForTesting
protected MultiServerCallable<Row> createCallable(final ServerName server,
TableName tableName, final MultiAction<Row> multi) {
- return new MultiServerCallable<Row>(hConnection, tableName, server, multi);
+ return new MultiServerCallable<Row>(hConnection, tableName, server, this.rpcFactory, multi);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 574d937..05bde70 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
@@ -66,6 +67,7 @@ public class ClientScanner extends AbstractClientScanner {
protected final int scannerTimeout;
protected boolean scanMetricsPublished = false;
protected RpcRetryingCaller<Result []> caller;
+ protected RpcControllerFactory rpcControllerFactory;
/**
* Create a new ClientScanner for the specified table. An HConnection will be
@@ -93,27 +95,36 @@ public class ClientScanner extends AbstractClientScanner {
/**
- * Create a new ClientScanner for the specified table
- * Note that the passed {@link Scan}'s start row maybe changed changed.
- *
- * @param conf The {@link Configuration} to use.
- * @param scan {@link Scan} to use in this scanner
- * @param tableName The table that we wish to scan
- * @param connection Connection identifying the cluster
- * @throws IOException
- */
+ * @deprecated use
+ * {@link #ClientScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)}
+ */
+ @Deprecated
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
HConnection connection) throws IOException {
- this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf));
+ this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf),
+ RpcControllerFactory.instantiate(conf));
}
/**
- * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)}
+ * @deprecated Use
+ * {@link #ClientScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)}
*/
@Deprecated
public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName,
HConnection connection) throws IOException {
- this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf));
+ this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf),
+ RpcControllerFactory.instantiate(conf));
+ }
+
+ /**
+ * @deprecated Use
+ * {@link #ClientScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)
+ * instead.
+ */
+ @Deprecated
+ public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
+ HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
+ this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
}
/**
@@ -126,7 +137,8 @@ public class ClientScanner extends AbstractClientScanner {
* @throws IOException
*/
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
- HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
+ HConnection connection, RpcRetryingCallerFactory rpcFactory,
+ RpcControllerFactory controllerFactory) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Scan table=" + tableName
+ ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
@@ -159,7 +171,8 @@ public class ClientScanner extends AbstractClientScanner {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
}
- this.caller = rpcFactory.<Result[]> newCaller();
+ this.caller = rpcFactory.<Result[]> newCaller();
+ this.rpcControllerFactory = controllerFactory;
initializeScannerInConstruction();
}
@@ -277,8 +290,9 @@ public class ClientScanner extends AbstractClientScanner {
protected ScannerCallable getScannerCallable(byte [] localStartKey,
int nbRows) {
scan.setStartRow(localStartKey);
- ScannerCallable s = new ScannerCallable(getConnection(),
- getTable(), scan, this.scanMetrics);
+ ScannerCallable s =
+ new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
+ this.rpcControllerFactory);
s.setCaching(nbRows);
return s;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
index 17b1110..c707e45 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
@@ -109,7 +109,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
}
smallScanCallable = ClientSmallScanner.getSmallScanCallable(
- scan, getConnection(), getTable(), localStartKey, cacheNum);
+ scan, getConnection(), getTable(), localStartKey, cacheNum, this.rpcControllerFactory);
if (this.scanMetrics != null && skipRowOfFirstResult == null) {
this.scanMetrics.countOfRegions.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
index dd20f0a..1297ee4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@@ -85,10 +86,21 @@ public class ClientSmallScanner extends ClientScanner {
*/
public ClientSmallScanner(final Configuration conf, final Scan scan,
final TableName tableName, HConnection connection) throws IOException {
- this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf));
+ this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf),
+ new RpcControllerFactory(conf));
}
/**
+ * @deprecated use
+ * {@link #ClientSmallScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)
+ * instead
+ */
+ public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
+ HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
+ this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
+ }
+
+ /**
* Create a new ShortClientScanner for the specified table Note that the
* passed {@link Scan}'s start row maybe changed changed.
*
@@ -99,10 +111,10 @@ public class ClientSmallScanner extends ClientScanner {
* @param rpcFactory
* @throws IOException
*/
- public ClientSmallScanner(final Configuration conf, final Scan scan,
- final TableName tableName, HConnection connection,
- RpcRetryingCallerFactory rpcFactory) throws IOException {
- super(conf, scan, tableName, connection, rpcFactory);
+ public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
+ HConnection connection, RpcRetryingCallerFactory rpcFactory,
+ RpcControllerFactory controllerFactory) throws IOException {
+ super(conf, scan, tableName, connection, rpcFactory, controllerFactory);
}
@Override
@@ -154,7 +166,7 @@ public class ClientSmallScanner extends ClientScanner {
+ Bytes.toStringBinary(localStartKey) + "'");
}
smallScanCallable = getSmallScanCallable(
- scan, getConnection(), getTable(), localStartKey, cacheNum);
+ scan, getConnection(), getTable(), localStartKey, cacheNum, rpcControllerFactory);
if (this.scanMetrics != null && skipRowOfFirstResult == null) {
this.scanMetrics.countOfRegions.incrementAndGet();
}
@@ -163,14 +175,14 @@ public class ClientSmallScanner extends ClientScanner {
static RegionServerCallable<Result[]> getSmallScanCallable(
final Scan sc, HConnection connection, TableName table,
- byte[] localStartKey, final int cacheNum) {
+ byte[] localStartKey, final int cacheNum, final RpcControllerFactory rpcControllerFactory) {
sc.setStartRow(localStartKey);
RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>(
connection, table, sc.getStartRow()) {
public Result[] call(int callTimeout) throws IOException {
ScanRequest request = RequestConverter.buildScanRequest(getLocation()
.getRegionInfo().getRegionName(), sc, cacheNum, true);
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index a1848dd..53d6690 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
@@ -2200,8 +2201,8 @@ class ConnectionManager {
// For tests to override.
protected AsyncProcess createAsyncProcess(Configuration conf) {
// No default pool available.
- return new AsyncProcess(
- this, conf, this.batchPool, RpcRetryingCallerFactory.instantiate(conf), false);
+ return new AsyncProcess(this, conf, this.batchPool,
+ RpcRetryingCallerFactory.instantiate(conf), false, RpcControllerFactory.instantiate(conf));
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 49d9354..13025b2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -146,6 +147,7 @@ public class HTable implements HTableInterface {
/** The Async process for batch */
protected AsyncProcess multiAp;
private RpcRetryingCallerFactory rpcCallerFactory;
+ private RpcControllerFactory rpcControllerFactory;
/**
* Creates an object to access a HBase table.
@@ -362,8 +364,9 @@ public class HTable implements HTableInterface {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
+ this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
// puts need to track errors globally due to how the APIs currently work.
- ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true);
+ ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
multiAp = this.connection.getAsyncProcess();
this.maxKeyValueSize = this.configuration.getInt(
@@ -725,7 +728,7 @@ public class HTable implements HTableInterface {
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
tableName, row) {
public Result call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
@@ -763,10 +766,10 @@ public class HTable implements HTableInterface {
if (scan.isSmall()) {
return new ClientSmallScanner(getConfiguration(), scan, getName(),
- this.connection, this.rpcCallerFactory);
+ this.connection, this.rpcCallerFactory, this.rpcControllerFactory);
} else {
- return new ClientScanner(getConfiguration(), scan,
- getName(), this.connection);
+ return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
+ this.rpcCallerFactory, this.rpcControllerFactory);
}
}
@@ -801,7 +804,7 @@ public class HTable implements HTableInterface {
public Result call(int callTimeout) throws IOException {
ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
@@ -902,7 +905,7 @@ public class HTable implements HTableInterface {
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
tableName, delete.getRow()) {
public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
@@ -1042,7 +1045,7 @@ public class HTable implements HTableInterface {
RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
public Void call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
@@ -1076,7 +1079,7 @@ public class HTable implements HTableInterface {
RegionServerCallable<Result> callable =
new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
public Result call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {
@@ -1107,7 +1110,7 @@ public class HTable implements HTableInterface {
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
getName(), increment.getRow()) {
public Result call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {
@@ -1118,8 +1121,8 @@ public class HTable implements HTableInterface {
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
- }
- };
+ }
+ };
return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
}
@@ -1170,7 +1173,7 @@ public class HTable implements HTableInterface {
RegionServerCallable<Long> callable =
new RegionServerCallable<Long>(connection, getName(), row) {
public Long call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {
@@ -1200,7 +1203,7 @@ public class HTable implements HTableInterface {
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) {
public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
@@ -1257,7 +1260,7 @@ public class HTable implements HTableInterface {
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) {
public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
@@ -1285,7 +1288,7 @@ public class HTable implements HTableInterface {
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) {
public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
@@ -1761,8 +1764,10 @@ public class HTable implements HTableInterface {
final List<String> callbackErrorServers = new ArrayList<String>();
Object[] results = new Object[execs.size()];
- AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool,
- RpcRetryingCallerFactory.instantiate(configuration), true);
+ AsyncProcess asyncProcess =
+ new AsyncProcess(connection, configuration, pool,
+ RpcRetryingCallerFactory.instantiate(configuration), true,
+ RpcControllerFactory.instantiate(configuration));
AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
new Callback<ClientProtos.CoprocessorServiceResult>() {
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 47d8d54..cc26ecf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -31,14 +31,15 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import com.google.protobuf.ServiceException;
@@ -51,10 +52,12 @@ import com.google.protobuf.ServiceException;
class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
private final MultiAction<R> multiAction;
private final boolean cellBlock;
+ private RpcControllerFactory rpcFactory;
MultiServerCallable(final HConnection connection, final TableName tableName,
- final ServerName location, final MultiAction<R> multi) {
+ final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi) {
super(connection, tableName, null);
+ this.rpcFactory = rpcFactory;
this.multiAction = multi;
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
// Using region info from parent HRegionLocation would be a mistake for this class; so
@@ -115,7 +118,7 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
// Controller optionally carries cell data over the proxy/service boundary and also
// optionally ferries cell response data back out again.
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
+ PayloadCarryingRpcController controller = rpcFactory.newController(cells);
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
ClientProtos.MultiResponse responseProto;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index d6e17ae..727eeca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -125,8 +125,9 @@ public class ReversedClientScanner extends ClientScanner {
protected ScannerCallable getScannerCallable(byte[] localStartKey,
int nbRows, byte[] locateStartRow) {
scan.setStartRow(localStartKey);
- ScannerCallable s = new ReversedScannerCallable(getConnection(),
- getTable(), scan, this.scanMetrics, locateStartRow);
+ ScannerCallable s =
+ new ReversedScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
+ locateStartRow, this.rpcControllerFactory);
s.setCaching(nbRows);
return s;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
index a974b01..f05e381 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
@@ -29,8 +29,11 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
+import com.google.protobuf.RpcController;
+
/**
* A reversed ScannerCallable which supports backward scanning.
*/
@@ -45,17 +48,28 @@ public class ReversedScannerCallable extends ScannerCallable {
protected final byte[] locateStartRow;
/**
- *
* @param connection
* @param tableName
* @param scan
* @param scanMetrics
* @param locateStartRow The start row for locating regions
+ * @param rpcFactory to create an {@link RpcController} to talk to the regionserver
*/
+ public ReversedScannerCallable(HConnection connection, TableName tableName, Scan scan,
+ ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory) {
+ super(connection, tableName, scan, scanMetrics, rpcFactory);
+ this.locateStartRow = locateStartRow;
+ }
+
+ /**
+ * @deprecated use
+ * {@link #ReversedScannerCallable(HConnection, TableName, Scan, ScanMetrics, byte[], RpcControllerFactory)}
+ */
+ @Deprecated
public ReversedScannerCallable(HConnection connection, TableName tableName,
Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) {
- super(connection, tableName, scan, scanMetrics);
- this.locateStartRow = locateStartRow;
+ this(connection, tableName, scan, scanMetrics, locateStartRow, RpcControllerFactory
+ .instantiate(connection.getConfiguration()));
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 798352c..bb29903 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.DNS;
+import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.google.protobuf.TextFormat;
@@ -81,22 +83,25 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
// indicate if it is a remote server call
protected boolean isRegionServerRemote = true;
private long nextCallSeq = 0;
+ private RpcControllerFactory rpcFactory;
/**
* @param connection which connection
* @param tableName table callable is on
* @param scan the scan to execute
- * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable
- * won't collect metrics
+ * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
+ * metrics
+ * @param rpcControllerFactory factory to use when creating {@link RpcController}
*/
public ScannerCallable (HConnection connection, TableName tableName, Scan scan,
- ScanMetrics scanMetrics) {
+ ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
super(connection, tableName, scan.getStartRow());
this.scan = scan;
this.scanMetrics = scanMetrics;
Configuration conf = connection.getConfiguration();
logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
+ this.rpcFactory = rpcControllerFactory;
}
/**
@@ -105,7 +110,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
@Deprecated
public ScannerCallable (HConnection connection, final byte [] tableName, Scan scan,
ScanMetrics scanMetrics) {
- this(connection, TableName.valueOf(tableName), scan, scanMetrics);
+ this(connection, TableName.valueOf(tableName), scan, scanMetrics, RpcControllerFactory
+ .instantiate(connection.getConfiguration()));
}
/**
@@ -161,7 +167,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
incRPCcallsMetrics();
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
ScanResponse response = null;
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ PayloadCarryingRpcController controller = rpcFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java
new file mode 100644
index 0000000..9f23770
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.TableName;
+
+/**
+ * Simple delegating controller for use with the {@link RpcControllerFactory} to help override
+ * standard behavior of a {@link PayloadCarryingRpcController}.
+ */
+public class DelegatingPayloadCarryingRpcController extends PayloadCarryingRpcController {
+ private PayloadCarryingRpcController delegate;
+
+ public DelegatingPayloadCarryingRpcController(PayloadCarryingRpcController delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public CellScanner cellScanner() {
+ return delegate.cellScanner();
+ }
+
+ @Override
+ public void setCellScanner(final CellScanner cellScanner) {
+ delegate.setCellScanner(cellScanner);
+ }
+
+ @Override
+ public void setPriority(int priority) {
+ delegate.setPriority(priority);
+ }
+
+ @Override
+ public void setPriority(final TableName tn) {
+ delegate.setPriority(tn);
+ }
+
+ @Override
+ public int getPriority() {
+ return delegate.getPriority();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
new file mode 100644
index 0000000..2ffab8d
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * Factory to create a {@link PayloadCarryingRpcController}
+ */
+public class RpcControllerFactory {
+
+ public static final String CUSTOM_CONTROLLER_CONF_KEY = "hbase.rpc.controllerfactory.class";
+ protected final Configuration conf;
+
+ public RpcControllerFactory(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public PayloadCarryingRpcController newController() {
+ return new PayloadCarryingRpcController();
+ }
+
+ public PayloadCarryingRpcController newController(final CellScanner cellScanner) {
+ return new PayloadCarryingRpcController(cellScanner);
+ }
+
+ public PayloadCarryingRpcController newController(final List<CellScannable> cellIterables) {
+ return new PayloadCarryingRpcController(cellIterables);
+ }
+
+
+ public static RpcControllerFactory instantiate(Configuration configuration) {
+ String rpcControllerFactoryClazz =
+ configuration.get(CUSTOM_CONTROLLER_CONF_KEY,
+ RpcControllerFactory.class.getName());
+ return ReflectionUtils.instantiateWithCustomCtor(rpcControllerFactoryClazz,
+ new Class[] { Configuration.class }, new Object[] { configuration });
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 5445f4d..c31e451 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
@@ -125,14 +126,14 @@ public class TestAsyncProcess {
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
- new RpcRetryingCallerFactory(conf), false);
+ new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
}
public MyAsyncProcess(
ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
- new RpcRetryingCallerFactory(conf), useGlobalErrors);
+ new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index ffb79c1..1c08fac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
@@ -69,6 +70,7 @@ public class WALEditsReplaySink {
private final AtomicLong totalReplayedEdits = new AtomicLong();
private final boolean skipErrors;
private final int replayTimeout;
+ private RpcControllerFactory rpcControllerFactory;
/**
* Create a sink for WAL log entries replay
@@ -87,6 +89,7 @@ public class WALEditsReplaySink {
HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS);
// a single replay operation time out and default is 60 seconds
this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000);
+ this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
}
/**
@@ -211,7 +214,7 @@ public class WALEditsReplaySink {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
try {
remoteSvr.replay(controller, p.getFirst());
} catch (ServiceException se) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index 35471a9..5a86ab5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.mockito.Mockito;
/**
@@ -121,8 +122,9 @@ public class HConnectionTestingUtility {
}
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
- Mockito.when(c.getAsyncProcess()).thenReturn(new AsyncProcess(
- c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false));
+ Mockito.when(c.getAsyncProcess()).thenReturn(
+ new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
+ RpcControllerFactory.instantiate(conf)));
Mockito.doNothing().when(c).incCount();
Mockito.doNothing().when(c).decCount();
return c;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c61cb7fb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
new file mode 100644
index 0000000..02c2ef8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService;
+import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+@Category(MediumTests.class)
+public class TestRpcControllerFactory {
+
+ public static class StaticRpcControllerFactory extends RpcControllerFactory {
+
+ public StaticRpcControllerFactory(Configuration conf) {
+ super(conf);
+ }
+
+ public PayloadCarryingRpcController newController() {
+ return new CountingRpcController(super.newController());
+ }
+
+ public PayloadCarryingRpcController newController(final CellScanner cellScanner) {
+ return new CountingRpcController(super.newController(cellScanner));
+ }
+
+ public PayloadCarryingRpcController newController(final List<CellScannable> cellIterables) {
+ return new CountingRpcController(super.newController(cellIterables));
+ }
+ }
+
+ public static class CountingRpcController extends DelegatingPayloadCarryingRpcController {
+
+ private static AtomicInteger INT_PRIORITY = new AtomicInteger();
+ private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
+
+ public CountingRpcController(PayloadCarryingRpcController delegate) {
+ super(delegate);
+ }
+
+ @Override
+ public void setPriority(int priority) {
+ super.setPriority(priority);
+ INT_PRIORITY.incrementAndGet();
+ }
+
+ @Override
+ public void setPriority(TableName tn) {
+ super.setPriority(tn);
+ // ignore counts for system tables - it could change and we really only want to check on what
+ // the client should change
+ if (!tn.isSystemTable()) {
+ TABLE_PRIORITY.incrementAndGet();
+ }
+
+ }
+ }
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ // load an endpoint so we have an endpoint to test - it doesn't matter which one, but
+ // this is already in tests, so we can just use it.
+ Configuration conf = UTIL.getConfiguration();
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ ProtobufCoprocessorService.class.getName());
+
+ UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * check some of the methods and make sure we are incrementing each time. Its a bit tediuous to
+ * cover all methods here and really is a bit brittle since we can always add new methods but
+ * won't be sure to add them here. So we just can cover the major ones.
+ * @throws Exception on failure
+ */
+ @Test
+ public void testCountController() throws Exception {
+ Configuration conf = new Configuration(UTIL.getConfiguration());
+ // setup our custom controller
+ conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
+ StaticRpcControllerFactory.class.getName());
+
+ TableName name = TableName.valueOf("testcustomcontroller");
+ UTIL.createTable(name, fam1).close();
+
+ // change one of the connection properties so we get a new HConnection with our configuration
+ conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1);
+
+ HTable table = new HTable(conf, name);
+ table.setAutoFlushTo(false);
+ byte[] row = Bytes.toBytes("row");
+ Put p = new Put(row);
+ p.add(fam1, fam1, Bytes.toBytes("val0"));
+ table.put(p);
+ table.flushCommits();
+ Integer counter = 1;
+ counter = verifyCount(counter);
+
+ Delete d = new Delete(row);
+ d.deleteColumn(fam1, fam1);
+ table.delete(d);
+ counter = verifyCount(counter);
+
+ Put p2 = new Put(row);
+ p2.add(fam1, Bytes.toBytes("qual"), Bytes.toBytes("val1"));
+ table.batch(Lists.newArrayList(p, p2), new Object[2]);
+ // this only goes to a single server, so we don't need to change the count here
+ counter = verifyCount(counter);
+
+ Append append = new Append(row);
+ append.add(fam1, fam1, Bytes.toBytes("val2"));
+ table.append(append);
+ counter = verifyCount(counter);
+
+ // and check the major lookup calls as well
+ Get g = new Get(row);
+ table.get(g);
+ counter = verifyCount(counter);
+
+ ResultScanner scan = table.getScanner(fam1);
+ scan.next();
+ scan.close();
+ counter = verifyCount(counter);
+
+ Get g2 = new Get(row);
+ table.get(Lists.newArrayList(g, g2));
+ // same server, so same as above for not changing count
+ counter = verifyCount(counter);
+
+ // make sure all the scanner types are covered
+ Scan scanInfo = new Scan(row);
+ // regular small
+ scanInfo.setSmall(true);
+ counter = doScan(table, scanInfo, counter);
+
+ // reversed, small
+ scanInfo.setReversed(true);
+ counter = doScan(table, scanInfo, counter);
+
+ // reversed, regular
+ scanInfo.setSmall(false);
+ counter = doScan(table, scanInfo, counter);
+
+ table.close();
+ }
+
+ int doScan(HTable table, Scan scan, int expectedCount) throws IOException {
+ ResultScanner results = table.getScanner(scan);
+ results.next();
+ results.close();
+ return verifyCount(expectedCount);
+ }
+
+ int verifyCount(Integer counter) {
+ assertEquals(counter.intValue(), CountingRpcController.TABLE_PRIORITY.get());
+ assertEquals(0, CountingRpcController.INT_PRIORITY.get());
+ return counter + 1;
+ }
+}
\ No newline at end of file