You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/03/02 06:35:30 UTC

[21/50] [abbrv] hbase git commit: HBASE-20084 Refactor the RSRpcServices#doBatchOp

HBASE-20084 Refactor the RSRpcServices#doBatchOp

Signed-off-by: tedyu <yu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/197bd790
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/197bd790
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/197bd790

Branch: refs/heads/HBASE-19064
Commit: 197bd790701553bd5c7de8b6af47500e0e028920
Parents: 7f6e971
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Mon Feb 26 20:49:05 2018 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Wed Feb 28 15:15:34 2018 +0800

----------------------------------------------------------------------
 .../hbase/regionserver/RSRpcServices.java       | 115 ++++++++++---------
 1 file changed, 58 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/197bd790/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 7e01c9a..4dd826f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -121,6 +121,7 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
 import org.apache.hadoop.hbase.security.access.Permission;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CollectionUtils;
 import org.apache.hadoop.hbase.util.DNS;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
@@ -763,7 +764,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
     // one at a time, we instead pass them in batch.  Be aware that the corresponding
     // ResultOrException instance that matches each Put or Delete is then added down in the
-    // doBatchOp call.  We should be staying aligned though the Put and Delete are deferred/batched
+    // doNonAtomicBatchOp call.  We should be staying aligned though the Put and Delete are
+    // deferred/batched
     List<ClientProtos.Action> mutations = null;
     long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
     IOException sizeIOE = null;
@@ -802,7 +804,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           // use it for the response.
           //
           // This will create a copy in the builder.
-          hasResultOrException = true;
           NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
           resultOrExceptionBuilder.setException(pair);
           context.incrementResponseExceptionSize(pair.getSerializedSize());
@@ -829,29 +830,23 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           }
         } else if (action.hasServiceCall()) {
           hasResultOrException = true;
-          try {
-            com.google.protobuf.Message result =
-                execServiceOnRegion(region, action.getServiceCall());
-            ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
-                ClientProtos.CoprocessorServiceResult.newBuilder();
-            resultOrExceptionBuilder.setServiceResult(
-                serviceResultBuilder.setValue(
-                  serviceResultBuilder.getValueBuilder()
-                    .setName(result.getClass().getName())
-                    // TODO: Copy!!!
-                    .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
-          } catch (IOException ioe) {
-            rpcServer.getMetrics().exception(ioe);
-            NameBytesPair pair = ResponseConverter.buildException(ioe);
-            resultOrExceptionBuilder.setException(pair);
-            context.incrementResponseExceptionSize(pair.getSerializedSize());
-          }
+          com.google.protobuf.Message result =
+            execServiceOnRegion(region, action.getServiceCall());
+          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
+            ClientProtos.CoprocessorServiceResult.newBuilder();
+          resultOrExceptionBuilder.setServiceResult(
+            serviceResultBuilder.setValue(
+              serviceResultBuilder.getValueBuilder()
+                .setName(result.getClass().getName())
+                // TODO: Copy!!!
+                .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
         } else if (action.hasMutation()) {
           MutationType type = action.getMutation().getMutateType();
           if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
               !mutations.isEmpty()) {
             // Flush out any Puts or Deletes already collected.
-            doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false);
+            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
+              spaceQuotaEnforcement);
             mutations.clear();
           }
           switch (type) {
@@ -896,7 +891,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         // Could get to here and there was no result and no exception.  Presumes we added
         // a Put or Delete to the collecting Mutations List for adding later.  In this
         // case the corresponding ResultOrException instance for the Put or Delete will be added
-        // down in the doBatchOp method call rather than up here.
+        // down in the doNonAtomicBatchOp method call rather than up here.
       } catch (IOException ie) {
         rpcServer.getMetrics().exception(ie);
         hasResultOrException = true;
@@ -911,18 +906,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       }
     }
     // Finish up any outstanding mutations
-    if (mutations != null && !mutations.isEmpty()) {
-      try {
-        doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false);
-      } catch (IOException ioe) {
-        // TODO do the refactor to avoid this catch as it is useless
-        // doBatchOp has handled the IOE for all non-atomic operations.
-        rpcServer.getMetrics().exception(ioe);
-        NameBytesPair pair = ResponseConverter.buildException(ioe);
-        resultOrExceptionBuilder.setException(pair);
-        context.incrementResponseExceptionSize(pair.getSerializedSize());
-        builder.addResultOrException(resultOrExceptionBuilder.build());
-      }
+    if (!CollectionUtils.isEmpty(mutations)) {
+      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
     }
     return cellsToReturn;
   }
@@ -943,6 +928,33 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
+  private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
+    final OperationQuota quota, final List<ClientProtos.Action> mutations,
+    final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement)
+    throws IOException {
+    // Just throw the exception. The exception will be caught and then added to region-level
+    // exception for RegionAction. Leaving the null to action result is ok since the null
+    // result is viewed as failure by hbase client. And the region-lever exception will be used
+    // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
+    // AsyncBatchRpcRetryingCaller#onComplete for more details.
+    doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true);
+  }
+
+  private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
+    final OperationQuota quota, final List<ClientProtos.Action> mutations,
+    final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
+    try {
+      doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false);
+    } catch (IOException e) {
+      // Set the exception for each action. The mutations in same RegionAction are group to
+      // different batch and then be processed individually. Hence, we don't set the region-level
+      // exception here for whole RegionAction.
+      for (Action mutation : mutations) {
+        builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
+      }
+    }
+  }
+
   /**
    * Execute a list of Put/Delete mutations.
    *
@@ -1029,30 +1041,29 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
             break;
         }
       }
-    } catch (IOException ie) {
+    } finally {
       int processedMutationIndex = 0;
       for (Action mutation : mutations) {
         // The non-null mArray[i] means the cell scanner has been read.
         if (mArray[processedMutationIndex++] == null) {
           skipCellsForMutation(mutation, cells);
         }
-        if (!atomic) {
-          builder.addResultOrException(getResultOrException(ie, mutation.getIndex()));
-        }
-      }
-      if (atomic) {
-        throw ie;
       }
+      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
     }
+  }
+
+  private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
+    boolean batchContainsDelete) {
     if (regionServer.metricsRegionServer != null) {
       long after = EnvironmentEdgeManager.currentTime();
       if (batchContainsPuts) {
-        regionServer.metricsRegionServer.updatePutBatch(
-            region.getTableDescriptor().getTableName(), after - before);
+        regionServer.metricsRegionServer
+          .updatePutBatch(region.getTableDescriptor().getTableName(), after - starttime);
       }
       if (batchContainsDelete) {
-        regionServer.metricsRegionServer.updateDeleteBatch(
-            region.getTableDescriptor().getTableName(), after - before);
+        regionServer.metricsRegionServer
+          .updateDeleteBatch(region.getTableDescriptor().getTableName(), after - starttime);
       }
     }
   }
@@ -1121,17 +1132,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       return region.batchReplay(mutations.toArray(
         new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
     } finally {
-      if (regionServer.metricsRegionServer != null) {
-        long after = EnvironmentEdgeManager.currentTime();
-        if (batchContainsPuts) {
-          regionServer.metricsRegionServer.updatePutBatch(
-              region.getTableDescriptor().getTableName(), after - before);
-        }
-        if (batchContainsDelete) {
-          regionServer.metricsRegionServer.updateDeleteBatch(
-              region.getTableDescriptor().getTableName(), after - before);
-        }
-      }
+      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
     }
   }
 
@@ -2614,8 +2615,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                   cellScanner, row, family, qualifier, op,
                   comparator, regionActionResultBuilder, spaceQuotaEnforcement);
           } else {
-            doBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
-              cellScanner, spaceQuotaEnforcement, true);
+            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
+              cellScanner, spaceQuotaEnforcement);
             processed = Boolean.TRUE;
           }
         } catch (IOException e) {