You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/05/01 22:57:39 UTC

svn commit: r1332816 - in /hbase/branches/0.92: CHANGES.txt src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Author: larsh
Date: Tue May  1 20:57:39 2012
New Revision: 1332816

URL: http://svn.apache.org/viewvc?rev=1332816&view=rev
Log:
HBASE-5897 prePut coprocessor hook causing substantial CPU usage

Modified:
    hbase/branches/0.92/CHANGES.txt
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Modified: hbase/branches/0.92/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/CHANGES.txt?rev=1332816&r1=1332815&r2=1332816&view=diff
==============================================================================
--- hbase/branches/0.92/CHANGES.txt (original)
+++ hbase/branches/0.92/CHANGES.txt Tue May  1 20:57:39 2012
@@ -56,6 +56,7 @@ Release 0.92.2 - Unreleased
    HBASE-5893  Allow spaces in coprocessor conf (aka trim() className) (Matteo Bertozzi)
    HBASE-5611  Replayed edits from regions that failed to open during recovery aren't removed from
                the global MemStore size (Jieshan)
+   HBASE-5897  prePut coprocessor hook causing substantial CPU usage (Todd)
 
   IMPROVEMENTS
    HBASE-5592  Make it easier to get a table from shell (Ben West)

Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1332816&r1=1332815&r2=1332816&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue May  1 20:57:39 2012
@@ -1680,10 +1680,12 @@ public class HRegion implements HeapSize
     T[] operations;
     int nextIndexToProcess = 0;
     OperationStatus[] retCodeDetails;
+    WALEdit[] walEditsFromCoprocessors;
 
     public BatchOperationInProgress(T[] operations) {
       this.operations = operations;
       this.retCodeDetails = new OperationStatus[operations.length];
+      this.walEditsFromCoprocessors = new WALEdit[operations.length];
       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
     }
 
@@ -1720,14 +1722,20 @@ public class HRegion implements HeapSize
     BatchOperationInProgress<Pair<Put, Integer>> batchOp =
       new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks);
 
+    boolean initialized = false;
+
     while (!batchOp.isDone()) {
       checkReadOnly();
       checkResources();
 
       long newSize;
       startRegionOperation();
-      this.writeRequestsCount.increment();
       try {
+        if (!initialized) {
+          this.writeRequestsCount.increment();
+          doPrePutHook(batchOp);
+          initialized = true;
+        }
         long addedSize = doMiniBatchPut(batchOp);
         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
       } finally {
@@ -1740,23 +1748,32 @@ public class HRegion implements HeapSize
     return batchOp.retCodeDetails;
   }
 
-  @SuppressWarnings("unchecked")
-  private long doMiniBatchPut(
-      BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
-
-    WALEdit walEdit = new WALEdit();
+  private void doPrePutHook(BatchOperationInProgress<Pair<Put, Integer>> batchOp)
+      throws IOException {
     /* Run coprocessor pre hook outside of locks to avoid deadlock */
+    WALEdit walEdit = new WALEdit();
     if (coprocessorHost != null) {
       for (int i = 0; i < batchOp.operations.length; i++) {
         Pair<Put, Integer> nextPair = batchOp.operations[i];
         Put put = nextPair.getFirst();
         if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) {
           // pre hook says skip this Put
-          // mark as success and skip below
+          // mark as success and skip in doMiniBatchPut
           batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
         }
+        if (!walEdit.isEmpty()) {
+          batchOp.walEditsFromCoprocessors[i] = walEdit;
+          walEdit = new WALEdit();
+        }
       }
     }
+  }
+
+  @SuppressWarnings("unchecked")
+  private long doMiniBatchPut(
+      BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
+
+    WALEdit walEdit = new WALEdit();
 
     long now = EnvironmentEdgeManager.currentTimeMillis();
     byte[] byteNow = Bytes.toBytes(now);
@@ -1852,6 +1869,13 @@ public class HRegion implements HeapSize
 
         Put p = batchOp.operations[i].getFirst();
         if (!p.getWriteToWAL()) continue;
+        // Add WAL edits by CP
+        WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
+        if (fromCP != null) {
+          for (KeyValue kv : fromCP.getKeyValues()) {
+            walEdit.add(kv);
+          }
+        }
         addFamilyMapToWALEdit(familyMaps[i], walEdit);
       }