You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jy...@apache.org on 2016/03/05 20:16:17 UTC
[3/3] hbase git commit: HBASE-14703 HTable.mutateRow does not collect
stats (Heng Chen)
HBASE-14703 HTable.mutateRow does not collect stats (Heng Chen)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ef712df9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ef712df9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ef712df9
Branch: refs/heads/master
Commit: ef712df944b0745892bc13bcecdfd6e358a71b66
Parents: d083e4f
Author: Jesse Yates <jy...@apache.org>
Authored: Fri Mar 4 19:07:59 2016 -0800
Committer: Jesse Yates <jy...@apache.org>
Committed: Sat Mar 5 11:01:45 2016 -0800
----------------------------------------------------------------------
.../hadoop/hbase/client/AsyncProcess.java | 114 +-
.../org/apache/hadoop/hbase/client/HTable.java | 151 +-
.../hadoop/hbase/client/MetricsConnection.java | 10 +-
.../hadoop/hbase/client/MultiResponse.java | 57 +-
.../hbase/client/MultiServerCallable.java | 17 +-
.../client/PayloadCarryingServerCallable.java | 48 +
.../hadoop/hbase/client/ResultStatsUtil.java | 12 +-
.../hbase/client/RetryingTimeTracker.java | 57 +
.../hbase/client/RpcRetryingCallerFactory.java | 6 -
.../hbase/client/RpcRetryingCallerImpl.java | 34 +-
.../hbase/client/ServerStatisticTracker.java | 3 +-
.../hadoop/hbase/client/StatisticTrackable.java | 33 +
.../client/StatsTrackingRpcRetryingCaller.java | 77 -
.../hadoop/hbase/protobuf/ProtobufUtil.java | 4 +-
.../hbase/protobuf/ResponseConverter.java | 28 +-
.../hadoop/hbase/client/TestAsyncProcess.java | 18 +-
.../hbase/protobuf/generated/ClientProtos.java | 1452 +++++++++++++++++-
hbase-protocol/src/main/protobuf/Client.proto | 8 +-
.../hadoop/hbase/regionserver/HRegion.java | 4 +-
.../hbase/regionserver/RSRpcServices.java | 68 +-
.../hadoop/hbase/client/TestCheckAndMutate.java | 9 +-
.../hadoop/hbase/client/TestClientPushback.java | 29 +
.../hadoop/hbase/client/TestFromClientSide.java | 8 +-
.../hadoop/hbase/client/TestReplicasClient.java | 5 +-
24 files changed, 1891 insertions(+), 361 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 65c15ce..7b0016c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.Trace;
@@ -428,7 +429,7 @@ class AsyncProcess {
if (retainedActions.isEmpty()) return NO_REQS_RESULT;
return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
- locationErrors, locationErrorRows, actionsByServer, pool);
+ locationErrors, locationErrorRows, actionsByServer, pool);
}
<CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
@@ -444,7 +445,7 @@ class AsyncProcess {
int originalIndex = locationErrorRows.get(i);
Row row = retainedActions.get(originalIndex).getAction();
ars.manageError(originalIndex, row,
- Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
+ Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
}
}
ars.sendMultiAction(actionsByServer, 1, null, false);
@@ -546,9 +547,13 @@ class AsyncProcess {
*/
public <CResult> AsyncRequestFuture submitAll(TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
- return submitAll(null, tableName, rows, callback, results);
+ return submitAll(null, tableName, rows, callback, results, null, timeout);
}
+ public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
+ List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
+ return submitAll(pool, tableName, rows, callback, results, null, timeout);
+ }
/**
* Submit immediately the list of rows, whatever the server status. Kept for backward
* compatibility: it allows to be used with the batch interface that return an array of objects.
@@ -560,7 +565,8 @@ class AsyncProcess {
* @param results Optional array to return the results thru; backward compat.
*/
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
- List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
+ List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
+ PayloadCarryingServerCallable callable, int curTimeout) {
List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
// The position will be used by the processBatch to match the object array returned.
@@ -579,7 +585,8 @@ class AsyncProcess {
actions.add(action);
}
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
- tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
+ tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null,
+ callable, curTimeout);
ars.groupAndSendMultiAction(actions, 1);
return ars;
}
@@ -711,11 +718,11 @@ class AsyncProcess {
private final MultiAction<Row> multiAction;
private final int numAttempt;
private final ServerName server;
- private final Set<MultiServerCallable<Row>> callsInProgress;
+ private final Set<PayloadCarryingServerCallable> callsInProgress;
private SingleServerRequestRunnable(
MultiAction<Row> multiAction, int numAttempt, ServerName server,
- Set<MultiServerCallable<Row>> callsInProgress) {
+ Set<PayloadCarryingServerCallable> callsInProgress) {
this.multiAction = multiAction;
this.numAttempt = numAttempt;
this.server = server;
@@ -725,19 +732,22 @@ class AsyncProcess {
@Override
public void run() {
MultiResponse res;
- MultiServerCallable<Row> callable = null;
+ PayloadCarryingServerCallable callable = currentCallable;
try {
- callable = createCallable(server, tableName, multiAction);
+ // setup the callable based on the actions, if we don't have one already from the request
+ if (callable == null) {
+ callable = createCallable(server, tableName, multiAction);
+ }
+ RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
try {
- RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
- if (callsInProgress != null) callsInProgress.add(callable);
- res = caller.callWithoutRetries(callable, timeout);
-
+ if (callsInProgress != null) {
+ callsInProgress.add(callable);
+ }
+ res = caller.callWithoutRetries(callable, currentCallTotalTimeout);
if (res == null) {
// Cancelled
return;
}
-
} catch (IOException e) {
// The service itself failed . It may be an error coming from the communication
// layer, but, as well, a functional error raised by the server.
@@ -771,7 +781,7 @@ class AsyncProcess {
private final BatchErrors errors;
private final ConnectionImplementation.ServerErrorTracker errorsByServer;
private final ExecutorService pool;
- private final Set<MultiServerCallable<Row>> callsInProgress;
+ private final Set<PayloadCarryingServerCallable> callsInProgress;
private final TableName tableName;
@@ -798,10 +808,12 @@ class AsyncProcess {
private final int[] replicaGetIndices;
private final boolean hasAnyReplicaGets;
private final long nonceGroup;
+ private PayloadCarryingServerCallable currentCallable;
+ private int currentCallTotalTimeout;
public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
ExecutorService pool, boolean needResults, Object[] results,
- Batch.Callback<CResult> callback) {
+ Batch.Callback<CResult> callback, PayloadCarryingServerCallable callable, int timeout) {
this.pool = pool;
this.callback = callback;
this.nonceGroup = nonceGroup;
@@ -865,13 +877,16 @@ class AsyncProcess {
this.replicaGetIndices = null;
}
this.callsInProgress = !hasAnyReplicaGets ? null :
- Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
+ Collections.newSetFromMap(
+ new ConcurrentHashMap<PayloadCarryingServerCallable, Boolean>());
this.errorsByServer = createServerErrorTracker();
this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
+ this.currentCallable = callable;
+ this.currentCallTotalTimeout = timeout;
}
- public Set<MultiServerCallable<Row>> getCallsInProgress() {
+ public Set<PayloadCarryingServerCallable> getCallsInProgress() {
return callsInProgress;
}
@@ -1275,11 +1290,15 @@ class AsyncProcess {
int failureCount = 0;
boolean canRetry = true;
- // Go by original action.
+ Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
+ updateStats(server, results);
+
int failed = 0, stopped = 0;
+ // Go by original action.
for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
byte[] regionName = regionEntry.getKey();
- Map<Integer, Object> regionResults = responses.getResults().get(regionName);
+ Map<Integer, Object> regionResults = results.get(regionName) == null
+ ? null : results.get(regionName).result;
if (regionResults == null) {
if (!responses.getExceptions().containsKey(regionName)) {
LOG.error("Server sent us neither results nor exceptions for "
@@ -1308,7 +1327,7 @@ class AsyncProcess {
}
++failureCount;
Retry retry = manageError(sentAction.getOriginalIndex(), row,
- canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)result, server);
+ canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable) result, server);
if (retry == Retry.YES) {
toReplay.add(sentAction);
} else if (retry == Retry.NO_OTHER_SUCCEEDED) {
@@ -1317,24 +1336,11 @@ class AsyncProcess {
++failed;
}
} else {
-
- if (AsyncProcess.this.connection.getConnectionMetrics() != null) {
- AsyncProcess.this.connection.getConnectionMetrics().
- updateServerStats(server, regionName, result);
- }
-
- // update the stats about the region, if its a user table. We don't want to slow down
- // updates to meta tables, especially from internal updates (master, etc).
- if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
- result = ResultStatsUtil.updateStats(result,
- AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
- }
-
if (callback != null) {
try {
//noinspection unchecked
// TODO: would callback expect a replica region name if it gets one?
- this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
+ this.callback.update(regionName, sentAction.getAction().getRow(), (CResult) result);
} catch (Throwable t) {
LOG.error("User callback threw an exception for "
+ Bytes.toStringBinary(regionName) + ", ignoring", t);
@@ -1384,7 +1390,6 @@ class AsyncProcess {
}
}
}
-
if (toReplay.isEmpty()) {
logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
} else {
@@ -1438,8 +1443,8 @@ class AsyncProcess {
boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
int index = action.getOriginalIndex();
if (results == null) {
- decActionCounter(index);
- return; // Simple case, no replica requests.
+ decActionCounter(index);
+ return; // Simple case, no replica requests.
}
state = trySetResultSimple(index, action.getAction(), false, result, null, isStale);
if (state == null) {
@@ -1618,7 +1623,7 @@ class AsyncProcess {
throw new InterruptedIOException(iex.getMessage());
} finally {
if (callsInProgress != null) {
- for (MultiServerCallable<Row> clb : callsInProgress) {
+ for (PayloadCarryingServerCallable clb : callsInProgress) {
clb.cancel();
}
}
@@ -1675,13 +1680,38 @@ class AsyncProcess {
}
}
+ private void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
+ boolean metrics = AsyncProcess.this.connection.getConnectionMetrics() != null;
+ boolean stats = AsyncProcess.this.connection.getStatisticsTracker() != null;
+ if (!stats && !metrics) {
+ return;
+ }
+ for (Map.Entry<byte[], MultiResponse.RegionResult> regionStats : results.entrySet()) {
+ byte[] regionName = regionStats.getKey();
+ ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat();
+ ResultStatsUtil.updateStats(AsyncProcess.this.connection.getStatisticsTracker(), server,
+ regionName, stat);
+ ResultStatsUtil.updateStats(AsyncProcess.this.connection.getConnectionMetrics(),
+ server, regionName, stat);
+ }
+ }
+
+ protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
+ TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
+ Batch.Callback<CResult> callback, Object[] results, boolean needResults,
+ PayloadCarryingServerCallable callable, int curTimeout) {
+ return new AsyncRequestFutureImpl<CResult>(
+ tableName, actions, nonceGroup, getPool(pool), needResults,
+ results, callback, callable, curTimeout);
+ }
+
@VisibleForTesting
/** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
- return new AsyncRequestFutureImpl<CResult>(
- tableName, actions, nonceGroup, getPool(pool), needResults, results, callback);
+ return createAsyncRequestFuture(
+ tableName, actions, nonceGroup, pool, callback, results, needResults, null, timeout);
}
/**
@@ -1697,7 +1727,7 @@ class AsyncProcess {
* Create a caller. Isolated to be easily overridden in the tests.
*/
@VisibleForTesting
- protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
+ protected RpcRetryingCaller<MultiResponse> createCaller(PayloadCarryingServerCallable callable) {
return rpcCallerFactory.<MultiResponse> newCaller();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index ce5a44c..33fd94e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
@@ -263,7 +265,8 @@ public class HTable implements HTableInterface {
*/
@Override
public HTableDescriptor getTableDescriptor() throws IOException {
- HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, operationTimeout);
+ HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection,
+ rpcCallerFactory, operationTimeout);
if (htd != null) {
return new UnmodifyableHTableDescriptor(htd);
}
@@ -450,10 +453,10 @@ public class HTable implements HTableInterface {
// Call that takes into account the replica
RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
- rpcControllerFactory, tableName, this.connection, get, pool,
- tableConfiguration.getRetriesNumber(),
- operationTimeout,
- tableConfiguration.getPrimaryCallTimeoutMicroSecond());
+ rpcControllerFactory, tableName, this.connection, get, pool,
+ tableConfiguration.getRetriesNumber(),
+ operationTimeout,
+ tableConfiguration.getPrimaryCallTimeoutMicroSecond());
return callable.call();
}
@@ -587,35 +590,47 @@ public class HTable implements HTableInterface {
*/
@Override
public void mutateRow(final RowMutations rm) throws IOException {
- RegionServerCallable<Void> callable =
- new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
- @Override
- public Void call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- try {
- RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
- getLocation().getRegionInfo().getRegionName(), rm);
- regionMutationBuilder.setAtomic(true);
- MultiRequest request =
- MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
- ClientProtos.MultiResponse response = getStub().multi(controller, request);
- ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
- if (res.hasException()) {
- Throwable ex = ProtobufUtil.toException(res.getException());
- if(ex instanceof IOException) {
- throw (IOException)ex;
+ final RetryingTimeTracker tracker = new RetryingTimeTracker();
+ PayloadCarryingServerCallable<MultiResponse> callable =
+ new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
+ rpcControllerFactory) {
+ @Override
+ public MultiResponse call(int callTimeout) throws IOException {
+ tracker.start();
+ controller.setPriority(tableName);
+ int remainingTime = tracker.getRemainingTime(callTimeout);
+ if (remainingTime == 0) {
+ throw new DoNotRetryIOException("Timeout for mutate row");
+ }
+ controller.setCallTimeout(remainingTime);
+ try {
+ RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
+ getLocation().getRegionInfo().getRegionName(), rm);
+ regionMutationBuilder.setAtomic(true);
+ MultiRequest request =
+ MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
+ ClientProtos.MultiResponse response = getStub().multi(controller, request);
+ ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
+ if (res.hasException()) {
+ Throwable ex = ProtobufUtil.toException(res.getException());
+ if (ex instanceof IOException) {
+ throw (IOException) ex;
+ }
+ throw new IOException("Failed to mutate row: " +
+ Bytes.toStringBinary(rm.getRow()), ex);
}
- throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
+ return ResponseConverter.getResults(request, response, controller.cellScanner());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
}
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
}
- return null;
- }
- };
- rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
+ };
+ AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
+ null, null, callable, operationTimeout);
+ ars.waitUntilDone();
+ if (ars.hasError()) {
+ throw ars.getErrors();
+ }
}
/**
@@ -860,37 +875,55 @@ public class HTable implements HTableInterface {
*/
@Override
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
- final CompareOp compareOp, final byte [] value, final RowMutations rm)
- throws IOException {
- RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(connection, getName(), row) {
- @Override
- public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- try {
- CompareType compareType = CompareType.valueOf(compareOp.name());
- MultiRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), compareType, rm);
- ClientProtos.MultiResponse response = getStub().multi(controller, request);
- ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
- if (res.hasException()) {
- Throwable ex = ProtobufUtil.toException(res.getException());
- if(ex instanceof IOException) {
- throw (IOException)ex;
- }
- throw new IOException("Failed to checkAndMutate row: "+
- Bytes.toStringBinary(rm.getRow()), ex);
+ final CompareOp compareOp, final byte [] value, final RowMutations rm)
+ throws IOException {
+ final RetryingTimeTracker tracker = new RetryingTimeTracker();
+ PayloadCarryingServerCallable<MultiResponse> callable =
+ new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
+ rpcControllerFactory) {
+ @Override
+ public MultiResponse call(int callTimeout) throws IOException {
+ tracker.start();
+ controller.setPriority(tableName);
+ int remainingTime = tracker.getRemainingTime(callTimeout);
+ if (remainingTime == 0) {
+ throw new DoNotRetryIOException("Timeout for mutate row");
+ }
+ controller.setCallTimeout(remainingTime);
+ try {
+ CompareType compareType = CompareType.valueOf(compareOp.name());
+ MultiRequest request = RequestConverter.buildMutateRequest(
+ getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+ new BinaryComparator(value), compareType, rm);
+ ClientProtos.MultiResponse response = getStub().multi(controller, request);
+ ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
+ if (res.hasException()) {
+ Throwable ex = ProtobufUtil.toException(res.getException());
+ if(ex instanceof IOException) {
+ throw (IOException)ex;
}
- return Boolean.valueOf(response.getProcessed());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ throw new IOException("Failed to checkAndMutate row: "+
+ Bytes.toStringBinary(rm.getRow()), ex);
}
+ return ResponseConverter.getResults(request, response, controller.cellScanner());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
}
- };
- return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
+ }
+ };
+ /**
+ * Currently, we use one array to store 'processed' flag which is returned by server.
+ * It is excessive to send such a large array, but that is required by the framework right now
+ * */
+ Object[] results = new Object[rm.getMutations().size()];
+ AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
+ null, results, callable, operationTimeout);
+ ars.waitUntilDone();
+ if (ars.hasError()) {
+ throw ars.getErrors();
+ }
+
+ return ((Result)results[0]).getExists();
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index 6a292bc..400f505 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -52,7 +52,7 @@ import static com.codahale.metrics.MetricRegistry.name;
* {@link #shutdown()} to terminate the thread pools they allocate.
*/
@InterfaceAudience.Private
-public class MetricsConnection {
+public class MetricsConnection implements StatisticTrackable {
/** Set this key to {@code true} to enable metrics collection of client requests. */
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
@@ -199,9 +199,15 @@ public class MetricsConnection {
}
Result result = (Result) r;
ClientProtos.RegionLoadStats stats = result.getStats();
- if(stats == null){
+ if (stats == null) {
return;
}
+ updateRegionStats(serverName, regionName, stats);
+ }
+
+ @Override
+ public void updateRegionStats(ServerName serverName, byte[] regionName,
+ ClientProtos.RegionLoadStats stats) {
String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
ConcurrentMap<byte[], RegionStats> rsStats = null;
if (serverStats.containsKey(serverName)) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
index 089ccff..79a9ed3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -33,8 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes;
public class MultiResponse {
// map of regionName to map of Results by the original index for that Result
- private Map<byte[], Map<Integer, Object>> results =
- new TreeMap<byte[], Map<Integer, Object>>(Bytes.BYTES_COMPARATOR);
+ private Map<byte[], RegionResult> results = new TreeMap<>(Bytes.BYTES_COMPARATOR);
/**
* The server can send us a failure for the region itself, instead of individual failure.
@@ -52,8 +52,8 @@ public class MultiResponse {
*/
public int size() {
int size = 0;
- for (Map<?,?> c : results.values()) {
- size += c.size();
+ for (RegionResult result: results.values()) {
+ size += result.size();
}
return size;
}
@@ -66,16 +66,7 @@ public class MultiResponse {
* @param resOrEx the result or error; will be empty for successful Put and Delete actions.
*/
public void add(byte[] regionName, int originalIndex, Object resOrEx) {
- Map<Integer, Object> rs = results.get(regionName);
- if (rs == null) {
- rs = new HashMap<Integer, Object>();
- results.put(regionName, rs);
- }
- rs.put(originalIndex, resOrEx);
- }
-
- public Map<byte[], Map<Integer, Object>> getResults() {
- return results;
+ getResult(regionName).addResult(originalIndex, resOrEx);
}
public void addException(byte []regionName, Throwable ie){
@@ -92,4 +83,42 @@ public class MultiResponse {
public Map<byte[], Throwable> getExceptions() {
return exceptions;
}
+
+ public void addStatistic(byte[] regionName, ClientProtos.RegionLoadStats stat) {
+ getResult(regionName).setStat(stat);
+ }
+
+ private RegionResult getResult(byte[] region){
+ RegionResult rs = results.get(region);
+ if (rs == null) {
+ rs = new RegionResult();
+ results.put(region, rs);
+ }
+ return rs;
+ }
+
+ public Map<byte[], RegionResult> getResults(){
+ return this.results;
+ }
+
+ static class RegionResult{
+ Map<Integer, Object> result = new HashMap<>();
+ ClientProtos.RegionLoadStats stat;
+
+ public void addResult(int index, Object result){
+ this.result.put(index, result);
+ }
+
+ public void setStat(ClientProtos.RegionLoadStats stat){
+ this.stat = stat;
+ }
+
+ public int size() {
+ return this.result.size();
+ }
+
+ public ClientProtos.RegionLoadStats getStat() {
+ return this.stat;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 85b401e..f78f348 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -50,21 +49,19 @@ import com.google.protobuf.ServiceException;
* {@link RegionServerCallable} that goes against multiple regions.
* @param <R>
*/
-class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> implements Cancellable {
+class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse> {
private final MultiAction<R> multiAction;
private final boolean cellBlock;
- private final PayloadCarryingRpcController controller;
MultiServerCallable(final ClusterConnection connection, final TableName tableName,
final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi) {
- super(connection, tableName, null);
+ super(connection, tableName, null, rpcFactory);
this.multiAction = multi;
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
// Using region info from parent HRegionLocation would be a mistake for this class; so
// we will store the server here, and throw if someone tries to obtain location/regioninfo.
this.location = new HRegionLocation(null, location);
this.cellBlock = isCellBlock();
- controller = rpcFactory.newController();
}
@Override
@@ -133,16 +130,6 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> impleme
return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
}
- @Override
- public void cancel() {
- controller.startCancel();
- }
-
- @Override
- public boolean isCancelled() {
- return controller.isCanceled();
- }
-
/**
* @return True if we should send data in cellblocks. This is an expensive call. Cache the
* result if you can rather than call each time.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
new file mode 100644
index 0000000..d94f069
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+
+/**
+ * This class is used to unify HTable calls with AsyncProcess Framework.
+ * HTable can use AsyncProcess directly though this class.
+ */
+@InterfaceAudience.Private
+public abstract class PayloadCarryingServerCallable<T>
+ extends RegionServerCallable<T> implements Cancellable {
+ protected PayloadCarryingRpcController controller;
+
+ public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
+ RpcControllerFactory rpcControllerFactory) {
+ super(connection, tableName, row);
+ this.controller = rpcControllerFactory.newController();
+ }
+
+ @Override
+ public void cancel() {
+ controller.startCancel();
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return controller.isCanceled();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
index 3caa63e..6537d79 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java
@@ -55,13 +55,17 @@ public final class ResultStatsUtil {
return r;
}
- if (regionName != null) {
- serverStats.updateRegionStats(server, regionName, stats);
- }
-
+ updateStats(serverStats, server, regionName, stats);
return r;
}
+ public static void updateStats(StatisticTrackable tracker, ServerName server, byte[] regionName,
+ ClientProtos.RegionLoadStats stats) {
+ if (regionName != null && stats != null && tracker != null) {
+ tracker.updateRegionStats(server, regionName, stats);
+ }
+ }
+
public static <T> T updateStats(T r, ServerStatisticTracker stats,
HRegionLocation regionLocation) {
byte[] regionName = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
new file mode 100644
index 0000000..24288e6
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Tracks the amount of time remaining for an operation.
+ */
+class RetryingTimeTracker {
+
+ private long globalStartTime = -1;
+
+ public void start() {
+ if (this.globalStartTime < 0) {
+ this.globalStartTime = EnvironmentEdgeManager.currentTime();
+ }
+ }
+
+ public int getRemainingTime(int callTimeout) {
+ if (callTimeout <= 0) {
+ return 0;
+ } else {
+ if (callTimeout == Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ }
+ int remainingTime = (int) (
+ callTimeout -
+ (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
+ if (remainingTime < 1) {
+ // If there is no time left, we're trying anyway. It's too late.
+ // 0 means no timeout, and it's not the intent here. So we secure both cases by
+ // resetting to the minimum.
+ remainingTime = 1;
+ }
+ return remainingTime;
+ }
+ }
+
+ public long getStartTime() {
+ return this.globalStartTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index 0af8210..550812f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -67,12 +67,6 @@ public class RpcRetryingCallerFactory {
// is cheap as it does not require parsing a complex structure.
RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<T>(pause, retries, interceptor,
startLogErrorsCnt);
-
- // wrap it with stats, if we are tracking them
- if (enableBackPressure && this.stats != null) {
- caller = new StatsTrackingRpcRetryingCaller<T>(caller, this.stats);
- }
-
return caller;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
index 12abc6a..6ce4956 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
@@ -51,10 +51,6 @@ import com.google.protobuf.ServiceException;
public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
// LOG is being used in TestMultiRowRangeFilter, hence leaving it public
public static final Log LOG = LogFactory.getLog(RpcRetryingCallerImpl.class);
- /**
- * When we started making calls.
- */
- private long globalStartTime;
/** How many retries are allowed before we start to log */
private final int startLogErrorsCnt;
@@ -64,6 +60,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final RetryingCallerInterceptor interceptor;
private final RetryingCallerInterceptorContext context;
+ private final RetryingTimeTracker tracker;
public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) {
this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt);
@@ -76,23 +73,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
this.interceptor = interceptor;
context = interceptor.createEmptyContext();
this.startLogErrorsCnt = startLogErrorsCnt;
- }
-
- private int getRemainingTime(int callTimeout) {
- if (callTimeout <= 0) {
- return 0;
- } else {
- if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE;
- int remainingTime = (int) (callTimeout -
- (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
- if (remainingTime < 1) {
- // If there is no time left, we're trying anyway. It's too late.
- // 0 means no timeout, and it's not the intent here. So we secure both cases by
- // resetting to the minimum.
- remainingTime = 1;
- }
- return remainingTime;
- }
+ this.tracker = new RetryingTimeTracker();
}
@Override
@@ -108,21 +89,21 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
throws IOException, RuntimeException {
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
- this.globalStartTime = EnvironmentEdgeManager.currentTime();
+ tracker.start();
context.clear();
for (int tries = 0;; tries++) {
long expectedSleep;
try {
callable.prepare(tries != 0); // if called with false, check table status on ZK
interceptor.intercept(context.prepare(callable, tries));
- return callable.call(getRemainingTime(callTimeout));
+ return callable.call(tracker.getRemainingTime(callTimeout));
} catch (PreemptiveFastFailException e) {
throw e;
} catch (Throwable t) {
ExceptionUtil.rethrowIfInterrupt(t);
if (tries > startLogErrorsCnt) {
LOG.info("Call exception, tries=" + tries + ", maxAttempts=" + maxAttempts + ", started="
- + (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
+ + (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + " ms ago, "
+ "cancelled=" + cancelled.get() + ", msg="
+ callable.getExceptionMessageAdditionalDetail());
}
@@ -172,14 +153,13 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
* @return Calculate how long a single call took
*/
private long singleCallDuration(final long expectedSleep) {
- return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep;
+ return (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + expectedSleep;
}
@Override
public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
throws IOException, RuntimeException {
// The code of this method should be shared with withRetries.
- this.globalStartTime = EnvironmentEdgeManager.currentTime();
try {
callable.prepare(false);
return callable.call(callTimeout);
@@ -231,7 +211,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
@Override
public String toString() {
- return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime +
+ return "RpcRetryingCaller{" + "globalStartTime=" + tracker.getStartTime() +
", pause=" + pause + ", maxAttempts=" + maxAttempts + '}';
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
index d03ecf6..b8e7923 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
@@ -31,11 +31,12 @@ import java.util.concurrent.ConcurrentHashMap;
* Tracks the statistics for multiple regions
*/
@InterfaceAudience.Private
-public class ServerStatisticTracker {
+public class ServerStatisticTracker implements StatisticTrackable {
private final ConcurrentHashMap<ServerName, ServerStatistics> stats =
new ConcurrentHashMap<ServerName, ServerStatistics>();
+ @Override
public void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats
currentStats) {
ServerStatistics stat = stats.get(server);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java
new file mode 100644
index 0000000..7bb49e7
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticTrackable.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+
+/**
+ * Parent interface for an object to get updates about per-region statistics.
+ */
+@InterfaceAudience.Private
+public interface StatisticTrackable {
+ /**
+ * Update stats per region.
+ * */
+ void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats
+ stats);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
deleted file mode 100644
index e82f1e8..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-import java.io.IOException;
-
-/**
- * An {@link RpcRetryingCaller} that will update the per-region stats for the call on return,
- * if stats are available
- */
-@InterfaceAudience.Private
-public class StatsTrackingRpcRetryingCaller<T> implements RpcRetryingCaller<T> {
- private final ServerStatisticTracker stats;
- private final RpcRetryingCaller<T> delegate;
-
- public StatsTrackingRpcRetryingCaller(RpcRetryingCaller<T> delegate,
- ServerStatisticTracker stats) {
- this.delegate = delegate;
- this.stats = stats;
- }
-
- @Override
- public void cancel() {
- delegate.cancel();
- }
-
- @Override
- public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
- throws IOException, RuntimeException {
- T result = delegate.callWithRetries(callable, callTimeout);
- return updateStatsAndUnwrap(result, callable);
- }
-
- @Override
- public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
- throws IOException, RuntimeException {
- T result = delegate.callWithRetries(callable, callTimeout);
- return updateStatsAndUnwrap(result, callable);
- }
-
- private T updateStatsAndUnwrap(T result, RetryingCallable<T> callable) {
- // don't track stats about requests that aren't to regionservers
- if (!(callable instanceof RegionServerCallable)) {
- return result;
- }
-
- // mutli-server callables span multiple regions, so they don't have a location,
- // but they are region server callables, so we have to handle them when we process the
- // result in AsyncProcess#receiveMultiAction, not in here
- if (callable instanceof MultiServerCallable) {
- return result;
- }
-
- // update the stats for the single server callable
- RegionServerCallable<T> regionCallable = (RegionServerCallable) callable;
- HRegionLocation location = regionCallable.getLocation();
- return ResultStatsUtil.updateStats(result, stats, location);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 261a9aa..b052e63 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -193,8 +193,8 @@ public final class ProtobufUtil {
*/
private final static Cell[] EMPTY_CELL_ARRAY = new Cell[]{};
private final static Result EMPTY_RESULT = Result.create(EMPTY_CELL_ARRAY);
- private final static Result EMPTY_RESULT_EXISTS_TRUE = Result.create(null, true);
- private final static Result EMPTY_RESULT_EXISTS_FALSE = Result.create(null, false);
+ final static Result EMPTY_RESULT_EXISTS_TRUE = Result.create(null, true);
+ final static Result EMPTY_RESULT_EXISTS_FALSE = Result.create(null, false);
private final static Result EMPTY_RESULT_STALE = Result.create(EMPTY_CELL_ARRAY, null, true);
private final static Result EMPTY_RESULT_EXISTS_TRUE_STALE
= Result.create((Cell[])null, true, true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
index 177b1c7..421907d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
@@ -89,7 +89,7 @@ public final class ResponseConverter {
int requestRegionActionCount = request.getRegionActionCount();
int responseRegionActionResultCount = response.getRegionActionResultCount();
if (requestRegionActionCount != responseRegionActionResultCount) {
- throw new IllegalStateException("Request mutation count=" + responseRegionActionResultCount +
+ throw new IllegalStateException("Request mutation count=" + requestRegionActionCount +
" does not match response mutation result count=" + responseRegionActionResultCount);
}
@@ -125,21 +125,27 @@ public final class ResponseConverter {
responseValue = ProtobufUtil.toException(roe.getException());
} else if (roe.hasResult()) {
responseValue = ProtobufUtil.toResult(roe.getResult(), cells);
- // add the load stats, if we got any
- if (roe.hasLoadStats()) {
- ((Result) responseValue).addResults(roe.getLoadStats());
- }
} else if (roe.hasServiceResult()) {
responseValue = roe.getServiceResult();
- } else {
- // no result & no exception. Unexpected.
- throw new IllegalStateException("No result & no exception roe=" + roe +
- " for region " + actions.getRegion());
+ } else{
+ // Sometimes, the response is just "it was processed". Generally, this occurs for things
+ // like mutateRows where either we get back 'processed' (or not) and optionally some
+ // statistics about the regions we touched.
+ responseValue = response.getProcessed() ?
+ ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
+ ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
}
results.add(regionName, roe.getIndex(), responseValue);
}
}
+ if (response.hasRegionStatistics()) {
+ ClientProtos.MultiRegionLoadStats stats = response.getRegionStatistics();
+ for (int i = 0; i < stats.getRegionCount(); i++) {
+ results.addStatistic(stats.getRegion(i).getValue().toByteArray(), stats.getStat(i));
+ }
+ }
+
return results;
}
@@ -161,11 +167,9 @@ public final class ResponseConverter {
* @param r
* @return an action result builder
*/
- public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r,
- ClientProtos.RegionLoadStats stats) {
+ public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r) {
ResultOrException.Builder builder = ResultOrException.newBuilder();
if (r != null) builder.setResult(r);
- if(stats != null) builder.setLoadStats(stats);
return builder;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef712df9/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 645cc42..1003d24 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -185,10 +185,12 @@ public class TestAsyncProcess {
}
@Override
- protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
+ protected RpcRetryingCaller<MultiResponse> createCaller(
+ PayloadCarryingServerCallable callable) {
callsCt.incrementAndGet();
+ MultiServerCallable callable1 = (MultiServerCallable) callable;
final MultiResponse mr = createMultiResponse(
- callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
+ callable1.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
@Override
public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
if (Arrays.equals(FAILS, a.getAction().getRow())) {
@@ -227,7 +229,8 @@ public class TestAsyncProcess {
}
@Override
- public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
+ public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
+ int callTimeout)
throws IOException, RuntimeException {
throw e;
}
@@ -245,7 +248,8 @@ public class TestAsyncProcess {
}
@Override
- protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
+ protected RpcRetryingCaller<MultiResponse> createCaller(
+ PayloadCarryingServerCallable callable) {
callsCt.incrementAndGet();
return new CallerWithFailure(ioe);
}
@@ -282,7 +286,8 @@ public class TestAsyncProcess {
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(
- MultiServerCallable<Row> callable) {
+ PayloadCarryingServerCallable payloadCallable) {
+ MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable;
final MultiResponse mr = createMultiResponse(
callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
@Override
@@ -312,7 +317,8 @@ public class TestAsyncProcess {
return new RpcRetryingCallerImpl<MultiResponse>(100, 10, 9) {
@Override
- public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
+ public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
+ int callTimeout)
throws IOException, RuntimeException {
long sleep = -1;
if (isDefault) {