You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/03/02 06:35:30 UTC
[21/50] [abbrv] hbase git commit: HBASE-20084 Refactor the
RSRpcServices#doBatchOp
HBASE-20084 Refactor the RSRpcServices#doBatchOp
Signed-off-by: tedyu <yu...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/197bd790
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/197bd790
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/197bd790
Branch: refs/heads/HBASE-19064
Commit: 197bd790701553bd5c7de8b6af47500e0e028920
Parents: 7f6e971
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Mon Feb 26 20:49:05 2018 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Wed Feb 28 15:15:34 2018 +0800
----------------------------------------------------------------------
.../hbase/regionserver/RSRpcServices.java | 115 ++++++++++---------
1 file changed, 58 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/197bd790/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 7e01c9a..4dd826f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -121,6 +121,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.Permission;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.DNS;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
@@ -763,7 +764,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
// one at a time, we instead pass them in batch. Be aware that the corresponding
// ResultOrException instance that matches each Put or Delete is then added down in the
- // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched
+ // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
+ // deferred/batched
List<ClientProtos.Action> mutations = null;
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
IOException sizeIOE = null;
@@ -802,7 +804,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// use it for the response.
//
// This will create a copy in the builder.
- hasResultOrException = true;
NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
resultOrExceptionBuilder.setException(pair);
context.incrementResponseExceptionSize(pair.getSerializedSize());
@@ -829,29 +830,23 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
} else if (action.hasServiceCall()) {
hasResultOrException = true;
- try {
- com.google.protobuf.Message result =
- execServiceOnRegion(region, action.getServiceCall());
- ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
- ClientProtos.CoprocessorServiceResult.newBuilder();
- resultOrExceptionBuilder.setServiceResult(
- serviceResultBuilder.setValue(
- serviceResultBuilder.getValueBuilder()
- .setName(result.getClass().getName())
- // TODO: Copy!!!
- .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
- } catch (IOException ioe) {
- rpcServer.getMetrics().exception(ioe);
- NameBytesPair pair = ResponseConverter.buildException(ioe);
- resultOrExceptionBuilder.setException(pair);
- context.incrementResponseExceptionSize(pair.getSerializedSize());
- }
+ com.google.protobuf.Message result =
+ execServiceOnRegion(region, action.getServiceCall());
+ ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
+ ClientProtos.CoprocessorServiceResult.newBuilder();
+ resultOrExceptionBuilder.setServiceResult(
+ serviceResultBuilder.setValue(
+ serviceResultBuilder.getValueBuilder()
+ .setName(result.getClass().getName())
+ // TODO: Copy!!!
+ .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
} else if (action.hasMutation()) {
MutationType type = action.getMutation().getMutateType();
if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
!mutations.isEmpty()) {
// Flush out any Puts or Deletes already collected.
- doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false);
+ doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
+ spaceQuotaEnforcement);
mutations.clear();
}
switch (type) {
@@ -896,7 +891,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// Could get to here and there was no result and no exception. Presumes we added
// a Put or Delete to the collecting Mutations List for adding later. In this
// case the corresponding ResultOrException instance for the Put or Delete will be added
- // down in the doBatchOp method call rather than up here.
+ // down in the doNonAtomicBatchOp method call rather than up here.
} catch (IOException ie) {
rpcServer.getMetrics().exception(ie);
hasResultOrException = true;
@@ -911,18 +906,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
// Finish up any outstanding mutations
- if (mutations != null && !mutations.isEmpty()) {
- try {
- doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false);
- } catch (IOException ioe) {
- // TODO do the refactor to avoid this catch as it is useless
- // doBatchOp has handled the IOE for all non-atomic operations.
- rpcServer.getMetrics().exception(ioe);
- NameBytesPair pair = ResponseConverter.buildException(ioe);
- resultOrExceptionBuilder.setException(pair);
- context.incrementResponseExceptionSize(pair.getSerializedSize());
- builder.addResultOrException(resultOrExceptionBuilder.build());
- }
+ if (!CollectionUtils.isEmpty(mutations)) {
+ doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
}
return cellsToReturn;
}
@@ -943,6 +928,33 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
+ private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
+ final OperationQuota quota, final List<ClientProtos.Action> mutations,
+ final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement)
+ throws IOException {
+ // Just throw the exception. The exception will be caught and then added to region-level
+ // exception for RegionAction. Leaving the null to action result is ok since the null
+ // result is viewed as failure by hbase client. And the region-lever exception will be used
+ // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
+ // AsyncBatchRpcRetryingCaller#onComplete for more details.
+ doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true);
+ }
+
+ private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
+ final OperationQuota quota, final List<ClientProtos.Action> mutations,
+ final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
+ try {
+ doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false);
+ } catch (IOException e) {
+ // Set the exception for each action. The mutations in same RegionAction are group to
+ // different batch and then be processed individually. Hence, we don't set the region-level
+ // exception here for whole RegionAction.
+ for (Action mutation : mutations) {
+ builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
+ }
+ }
+ }
+
/**
* Execute a list of Put/Delete mutations.
*
@@ -1029,30 +1041,29 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
break;
}
}
- } catch (IOException ie) {
+ } finally {
int processedMutationIndex = 0;
for (Action mutation : mutations) {
// The non-null mArray[i] means the cell scanner has been read.
if (mArray[processedMutationIndex++] == null) {
skipCellsForMutation(mutation, cells);
}
- if (!atomic) {
- builder.addResultOrException(getResultOrException(ie, mutation.getIndex()));
- }
- }
- if (atomic) {
- throw ie;
}
+ updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
}
+ }
+
+ private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
+ boolean batchContainsDelete) {
if (regionServer.metricsRegionServer != null) {
long after = EnvironmentEdgeManager.currentTime();
if (batchContainsPuts) {
- regionServer.metricsRegionServer.updatePutBatch(
- region.getTableDescriptor().getTableName(), after - before);
+ regionServer.metricsRegionServer
+ .updatePutBatch(region.getTableDescriptor().getTableName(), after - starttime);
}
if (batchContainsDelete) {
- regionServer.metricsRegionServer.updateDeleteBatch(
- region.getTableDescriptor().getTableName(), after - before);
+ regionServer.metricsRegionServer
+ .updateDeleteBatch(region.getTableDescriptor().getTableName(), after - starttime);
}
}
}
@@ -1121,17 +1132,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return region.batchReplay(mutations.toArray(
new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
} finally {
- if (regionServer.metricsRegionServer != null) {
- long after = EnvironmentEdgeManager.currentTime();
- if (batchContainsPuts) {
- regionServer.metricsRegionServer.updatePutBatch(
- region.getTableDescriptor().getTableName(), after - before);
- }
- if (batchContainsDelete) {
- regionServer.metricsRegionServer.updateDeleteBatch(
- region.getTableDescriptor().getTableName(), after - before);
- }
- }
+ updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
}
}
@@ -2614,8 +2615,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
cellScanner, row, family, qualifier, op,
comparator, regionActionResultBuilder, spaceQuotaEnforcement);
} else {
- doBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
- cellScanner, spaceQuotaEnforcement, true);
+ doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
+ cellScanner, spaceQuotaEnforcement);
processed = Boolean.TRUE;
}
} catch (IOException e) {