You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/05/01 22:48:37 UTC

svn commit: r1332811 - /hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Author: larsh
Date: Tue May  1 20:48:37 2012
New Revision: 1332811

URL: http://svn.apache.org/viewvc?rev=1332811&view=rev
Log:
HBASE-5897 prePut coprocessor hook causing substantial CPU usage

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1332811&r1=1332810&r2=1332811&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue May  1 20:48:37 2012
@@ -1928,10 +1928,12 @@ public class HRegion implements HeapSize
     T[] operations;
     int nextIndexToProcess = 0;
     OperationStatus[] retCodeDetails;
+    WALEdit[] walEditsFromCoprocessors;
 
     public BatchOperationInProgress(T[] operations) {
       this.operations = operations;
       this.retCodeDetails = new OperationStatus[operations.length];
+      this.walEditsFromCoprocessors = new WALEdit[operations.length];
       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
     }
 
@@ -1968,14 +1970,21 @@ public class HRegion implements HeapSize
     BatchOperationInProgress<Pair<Put, Integer>> batchOp =
       new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks);
 
+    boolean initialized = false;
+
     while (!batchOp.isDone()) {
       checkReadOnly();
       checkResources();
 
       long newSize;
       startRegionOperation();
-      this.writeRequestsCount.increment();
+
       try {
+        if (!initialized) {
+          this.writeRequestsCount.increment(); 
+          doPrePutHook(batchOp);
+          initialized = true;
+        }
         long addedSize = doMiniBatchPut(batchOp);
         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
       } finally {
@@ -1988,33 +1997,42 @@ public class HRegion implements HeapSize
     return batchOp.retCodeDetails;
   }
 
+  private void doPrePutHook(BatchOperationInProgress<Pair<Put, Integer>> batchOp)
+      throws IOException {
+    /* Run coprocessor pre hook outside of locks to avoid deadlock */
+    WALEdit walEdit = new WALEdit();
+    if (coprocessorHost != null) {
+      for (int i = 0 ; i < batchOp.operations.length; i++) {
+        Pair<Put, Integer> nextPair = batchOp.operations[i];
+        Put put = nextPair.getFirst();
+        if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) {
+          // pre hook says skip this Put
+          // mark as success and skip in doMiniBatchPut
+          batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
+        }
+        if (!walEdit.isEmpty()) {
+          batchOp.walEditsFromCoprocessors[i] = walEdit;
+          walEdit = new WALEdit();
+        }
+      }
+    }
+  }
+
+
   @SuppressWarnings("unchecked")
   private long doMiniBatchPut(
       BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
 
-    final String tableName = getTableDesc().getNameAsString();
-
     // variable to note if all Put items are for the same CF -- metrics related
     boolean cfSetConsistent = true;
     
     //The set of columnFamilies first seen.
     Set<byte[]> cfSet = null;
+
+    WALEdit walEdit = new WALEdit();
     
     long startTimeMs = EnvironmentEdgeManager.currentTimeMillis();
 
-    WALEdit walEdit = new WALEdit();
-    /* Run coprocessor pre hook outside of locks to avoid deadlock */
-    if (coprocessorHost != null) {
-      for (int i = 0; i < batchOp.operations.length; i++) {
-        Pair<Put, Integer> nextPair = batchOp.operations[i];
-        Put put = nextPair.getFirst();
-        if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) {
-          // pre hook says skip this Put
-          // mark as success and skip below
-          batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
-        }
-      }
-    }
 
     MultiVersionConsistencyControl.WriteEntry w = null;
     long txid = 0;
@@ -2152,7 +2170,15 @@ public class HRegion implements HeapSize
 
         Put p = batchOp.operations[i].getFirst();
         if (!p.getWriteToWAL()) continue;
+        // Add WAL edits by CP
+        WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
+        if (fromCP != null) {
+          for (KeyValue kv : fromCP.getKeyValues()) {
+            walEdit.add(kv);
+          }
+        }
         addFamilyMapToWALEdit(familyMaps[i], walEdit);
+
       }
 
       // -------------------------