You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/07/03 05:53:13 UTC

svn commit: r1356566 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/metrics/ test/java/org/apache/hadoop/hbase/regionserver/

Author: tedyu
Date: Tue Jul  3 03:53:11 2012
New Revision: 1356566

URL: http://svn.apache.org/viewvc?rev=1356566&view=rev
Log:
HBASE-6284 HRegion#doMiniBatchMutation() (Anoop)


Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/OperationMetrics.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1356566&r1=1356565&r2=1356566&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Jul  3 03:53:11 2012
@@ -1778,15 +1778,14 @@ public class HRegion implements HeapSize
   }
 
   /**
-   * Setup a Delete object with correct timestamps.
-   * Caller should the row and region locks.
-   * @param delete
+   * Setup correct timestamps in the KVs in Delete object.
+   * Caller should have the row and region locks.
+   * @param familyMap
    * @param now
    * @throws IOException
    */
-  void prepareDeleteTimestamps(Delete delete, byte[] byteNow)
+  void prepareDeleteTimestamps(Map<byte[], List<KeyValue>> familyMap, byte[] byteNow)
       throws IOException {
-    Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
     for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
 
       byte[] family = e.getKey();
@@ -1855,7 +1854,7 @@ public class HRegion implements HeapSize
 
     updatesLock.readLock().lock();
     try {
-      prepareDeleteTimestamps(delete, byteNow);
+      prepareDeleteTimestamps(delete.getFamilyMap(), byteNow);
 
       if (writeToWAL) {
         // write/sync to WAL should happen before we touch memstore.
@@ -1986,27 +1985,27 @@ public class HRegion implements HeapSize
    */
   public OperationStatus[] put(Put[] puts) throws IOException {
     @SuppressWarnings("unchecked")
-    Pair<Put, Integer> putsAndLocks[] = new Pair[puts.length];
+    Pair<Mutation, Integer> putsAndLocks[] = new Pair[puts.length];
 
     for (int i = 0; i < puts.length; i++) {
-      putsAndLocks[i] = new Pair<Put, Integer>(puts[i], null);
+      putsAndLocks[i] = new Pair<Mutation, Integer>(puts[i], null);
     }
-    return put(putsAndLocks);
+    return batchMutate(putsAndLocks);
   }
 
   /**
-   * Perform a batch of puts.
-   *
+   * Perform a batch of mutations.
+   * It supports only Put and Delete mutations and will ignore other types passed.
    * @param putsAndLocks
-   *          the list of puts paired with their requested lock IDs.
+   *          the list of mutations paired with their requested lock IDs.
    * @return an array of OperationStatus which internally contains the
    *         OperationStatusCode and the exceptionMessage if any.
    * @throws IOException
    */
-  public OperationStatus[] put(
-      Pair<Put, Integer>[] putsAndLocks) throws IOException {
-    BatchOperationInProgress<Pair<Put, Integer>> batchOp =
-      new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks);
+  public OperationStatus[] batchMutate(
+      Pair<Mutation, Integer>[] mutationsAndLocks) throws IOException {
+    BatchOperationInProgress<Pair<Mutation, Integer>> batchOp =
+      new BatchOperationInProgress<Pair<Mutation,Integer>>(mutationsAndLocks);
 
     boolean initialized = false;
 
@@ -2020,10 +2019,10 @@ public class HRegion implements HeapSize
       try {
         if (!initialized) {
           this.writeRequestsCount.increment(); 
-          doPrePutHook(batchOp);
+          doPreMutationHook(batchOp);
           initialized = true;
         }
-        long addedSize = doMiniBatchPut(batchOp);
+        long addedSize = doMiniBatchMutation(batchOp);
         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
       } finally {
         closeRegionOperation();
@@ -2035,18 +2034,32 @@ public class HRegion implements HeapSize
     return batchOp.retCodeDetails;
   }
 
-  private void doPrePutHook(BatchOperationInProgress<Pair<Put, Integer>> batchOp)
+  private void doPreMutationHook(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp)
       throws IOException {
     /* Run coprocessor pre hook outside of locks to avoid deadlock */
     WALEdit walEdit = new WALEdit();
     if (coprocessorHost != null) {
       for (int i = 0 ; i < batchOp.operations.length; i++) {
-        Pair<Put, Integer> nextPair = batchOp.operations[i];
-        Put put = nextPair.getFirst();
-        if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) {
-          // pre hook says skip this Put
-          // mark as success and skip in doMiniBatchPut
-          batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
+        Pair<Mutation, Integer> nextPair = batchOp.operations[i];
+        Mutation m = nextPair.getFirst();
+        if (m instanceof Put) {
+          if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
+            // pre hook says skip this Put
+            // mark as success and skip in doMiniBatchMutation
+            batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
+          }
+        } else if (m instanceof Delete) {
+          if (coprocessorHost.preDelete((Delete) m, walEdit, m.getWriteToWAL())) {
+            // pre hook says skip this Delete
+            // mark as success and skip in doMiniBatchMutation
+            batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
+          }
+        } else {
+          // In case of passing Append mutations along with the Puts and Deletes in batchMutate
+          // mark the operation return code as failure so that it will not be considered in
+          // the doMiniBatchMutation
+          batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
+              "Put/Delete mutations only supported in batchMutate() now");
         }
         if (!walEdit.isEmpty()) {
           batchOp.walEditsFromCoprocessors[i] = walEdit;
@@ -2058,15 +2071,18 @@ public class HRegion implements HeapSize
 
 
   @SuppressWarnings("unchecked")
-  private long doMiniBatchPut(
-    BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
+  private long doMiniBatchMutation(
+    BatchOperationInProgress<Pair<Mutation, Integer>> batchOp) throws IOException {
 
     // variable to note if all Put items are for the same CF -- metrics related
-    boolean cfSetConsistent = true;
+    boolean putsCfSetConsistent = true;
+    //The set of columnFamilies first seen for Put.
+    Set<byte[]> putsCfSet = null;
+    // variable to note if all Delete items are for the same CF -- metrics related
+    boolean deletesCfSetConsistent = true;
+    //The set of columnFamilies first seen for Delete.
+    Set<byte[]> deletesCfSet = null;
     
-    //The set of columnFamilies first seen.
-    Set<byte[]> cfSet = null;
-
     WALEdit walEdit = new WALEdit();
     
     long startTimeMs = EnvironmentEdgeManager.currentTimeMillis();
@@ -2085,6 +2101,7 @@ public class HRegion implements HeapSize
     int firstIndex = batchOp.nextIndexToProcess;
     int lastIndexExclusive = firstIndex;
     boolean success = false;
+    int noOfPuts = 0, noOfDeletes = 0;
     try {
       // ------------------------------------
       // STEP 1. Try to acquire as many locks as we can, and ensure
@@ -2093,11 +2110,12 @@ public class HRegion implements HeapSize
       int numReadyToWrite = 0;
       long now = EnvironmentEdgeManager.currentTimeMillis();
       while (lastIndexExclusive < batchOp.operations.length) {
-        Pair<Put, Integer> nextPair = batchOp.operations[lastIndexExclusive];
-        Put put = nextPair.getFirst();
+        Pair<Mutation, Integer> nextPair = batchOp.operations[lastIndexExclusive];
+        Mutation mutation = nextPair.getFirst();
+        boolean isPutMutation = mutation instanceof Put;
         Integer providedLockId = nextPair.getSecond();
 
-        Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
+        Map<byte[], List<KeyValue>> familyMap = mutation.getFamilyMap();
         // store the family map reference to allow for mutations
         familyMaps[lastIndexExclusive] = familyMap;
 
@@ -2108,22 +2126,25 @@ public class HRegion implements HeapSize
           continue;
         }
 
-        // Check the families in the put. If bad, skip this one.
         try {
-          checkFamilies(familyMap.keySet());
-          checkTimestamps(put, now);
+          if (isPutMutation) {
+            // Check the families in the put. If bad, skip this one.
+            checkFamilies(familyMap.keySet());
+            checkTimestamps(mutation.getFamilyMap(), now);
+          } else {
+            prepareDelete((Delete) mutation);
+          }
         } catch (DoNotRetryIOException dnrioe) {
-          LOG.warn("No such column family in batch put", dnrioe);
+          LOG.warn("No such column family in batch mutation", dnrioe);
           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
               OperationStatusCode.SANITY_CHECK_FAILURE, dnrioe.getMessage());
           lastIndexExclusive++;
           continue;
         }
-
         // If we haven't got any rows in our batch, we should block to
         // get the next one.
         boolean shouldBlock = numReadyToWrite == 0;
-        Integer acquiredLockId = getLock(providedLockId, put.getRow(), shouldBlock);
+        Integer acquiredLockId = getLock(providedLockId, mutation.getRow(), shouldBlock);
         if (acquiredLockId == null) {
           // We failed to grab another lock
           assert !shouldBlock : "Should never fail to get lock when blocking";
@@ -2135,25 +2156,35 @@ public class HRegion implements HeapSize
         lastIndexExclusive++;
         numReadyToWrite++;
 
-        //If Column Families stay consistent through out all of the
-        //individual puts then metrics can be reported as a mutliput across
-        //column families in the first put.
-        if (cfSet == null) {
-          cfSet = put.getFamilyMap().keySet();
+        if (isPutMutation) {
+          // If Column Families stay consistent through out all of the
+          // individual puts then metrics can be reported as a mutliput across
+          // column families in the first put.
+          if (putsCfSet == null) {
+            putsCfSet = mutation.getFamilyMap().keySet();
+          } else {
+            putsCfSetConsistent = putsCfSetConsistent
+                && mutation.getFamilyMap().keySet().equals(putsCfSet);
+          }
         } else {
-          cfSetConsistent = cfSetConsistent && put.getFamilyMap().keySet().equals(cfSet);
+          if (deletesCfSet == null) {
+            deletesCfSet = mutation.getFamilyMap().keySet();
+          } else {
+            deletesCfSetConsistent = deletesCfSetConsistent
+                && mutation.getFamilyMap().keySet().equals(deletesCfSet);
+          }
         }
       }
 
       // we should record the timestamp only after we have acquired the rowLock,
-      // otherwise, newer puts are not guaranteed to have a newer timestamp
+      // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
       now = EnvironmentEdgeManager.currentTimeMillis();
       byte[] byteNow = Bytes.toBytes(now);
 
-      // Nothing to put -- an exception in the above such as NoSuchColumnFamily?
+      // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
       if (numReadyToWrite <= 0) return 0L;
 
-      // We've now grabbed as many puts off the list as we can
+      // We've now grabbed as many mutations off the list as we can
 
       // ------------------------------------
       // STEP 2. Update any LATEST_TIMESTAMP timestamps
@@ -2163,9 +2194,14 @@ public class HRegion implements HeapSize
         if (batchOp.retCodeDetails[i].getOperationStatusCode()
             != OperationStatusCode.NOT_RUN) continue;
 
-        updateKVTimestamps(
-            familyMaps[i].values(),
-            byteNow);
+        Mutation mutation = batchOp.operations[i].getFirst();
+        if (mutation instanceof Put) {
+          updateKVTimestamps(familyMaps[i].values(), byteNow);
+          noOfPuts++;
+        } else {
+          prepareDeleteTimestamps(familyMaps[i], byteNow);
+          noOfDeletes++;
+        }
       }
 
       this.updatesLock.readLock().lock();
@@ -2206,9 +2242,11 @@ public class HRegion implements HeapSize
         }
         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
 
-        Put p = batchOp.operations[i].getFirst();
-        if (!p.getWriteToWAL()) {
-          recordPutWithoutWal(p.getFamilyMap());
+        Mutation m = batchOp.operations[i].getFirst();
+        if (!m.getWriteToWAL()) {
+          if (m instanceof Put) {
+            recordPutWithoutWal(m.getFamilyMap());
+          }
           continue;
         }
         // Add WAL edits by CP
@@ -2225,7 +2263,7 @@ public class HRegion implements HeapSize
       // -------------------------
       // STEP 5. Append the edit to WAL. Do not sync wal.
       // -------------------------
-      Put first = batchOp.operations[firstIndex].getFirst();
+      Mutation first = batchOp.operations[firstIndex].getFirst();
       txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
                walEdit, first.getClusterId(), now, this.htableDescriptor);
 
@@ -2261,7 +2299,7 @@ public class HRegion implements HeapSize
 
       // ------------------------------------
       // STEP 9. Run coprocessor post hooks. This should be done after the wal is
-      // sycned so that the coprocessor contract is adhered to.
+      // synced so that the coprocessor contract is adhered to.
       // ------------------------------------
       if (coprocessorHost != null) {
         for (int i = firstIndex; i < lastIndexExclusive; i++) {
@@ -2270,8 +2308,12 @@ public class HRegion implements HeapSize
               != OperationStatusCode.SUCCESS) {
             continue;
           }
-          Put p = batchOp.operations[i].getFirst();
-          coprocessorHost.postPut(p, walEdit, p.getWriteToWAL());
+          Mutation m = batchOp.operations[i].getFirst();
+          if (m instanceof Put) {
+            coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
+          } else {
+            coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
+          }
         }
       }
 
@@ -2296,14 +2338,26 @@ public class HRegion implements HeapSize
       }
 
       // do after lock
-      final long endTimeMs = EnvironmentEdgeManager.currentTimeMillis();
+      final long netTimeMs = EnvironmentEdgeManager.currentTimeMillis() - startTimeMs;
       
-      //See if the column families were consistent through the whole thing.
-      //if they were then keep them.  If they were not then pass a null.
-      //null will be treated as unknown.
-      final Set<byte[]> keptCfs = cfSetConsistent ? cfSet : null;
-      this.opMetrics.updateMultiPutMetrics(keptCfs, endTimeMs - startTimeMs);
-
+      // See if the column families were consistent through the whole thing.
+      // if they were then keep them. If they were not then pass a null.
+      // null will be treated as unknown.
+      // Total time taken might be involving Puts and Deletes.
+      // Split the time for puts and deletes based on the total number of Puts and Deletes.
+      long timeTakenForPuts = 0;
+      if (noOfPuts > 0) {
+        // There were some Puts in the batch.
+        double noOfMutations = noOfPuts + noOfDeletes;
+        timeTakenForPuts = (long) (netTimeMs * (noOfPuts / noOfMutations));
+        final Set<byte[]> keptCfs = putsCfSetConsistent ? putsCfSet : null;
+        this.opMetrics.updateMultiPutMetrics(keptCfs, timeTakenForPuts);
+      }
+      if (noOfDeletes > 0) {
+        // There were some Deletes in the batch.
+        final Set<byte[]> keptCfs = deletesCfSetConsistent ? deletesCfSet : null;
+        this.opMetrics.updateMultiDeleteMetrics(keptCfs, netTimeMs - timeTakenForPuts);
+      }
       if (!success) {
         for (int i = firstIndex; i < lastIndexExclusive; i++) {
           if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
@@ -2612,10 +2666,10 @@ public class HRegion implements HeapSize
 
   /**
    * Remove all the keys listed in the map from the memstore. This method is
-   * called when a Put has updated memstore but subequently fails to update
+   * called when a Put/Delete has updated memstore but subequently fails to update
    * the wal. This method is then invoked to rollback the memstore.
    */
-  private void rollbackMemstore(BatchOperationInProgress<Pair<Put, Integer>> batchOp,
+  private void rollbackMemstore(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
                                 Map<byte[], List<KeyValue>>[] familyMaps,
                                 int start, int end) {
     int kvsRolledback = 0;
@@ -2656,9 +2710,6 @@ public class HRegion implements HeapSize
       checkFamily(family);
     }
   }
-  private void checkTimestamps(Put p, long now) throws DoNotRetryIOException {
-    checkTimestamps(p.getFamilyMap(), now);
-  }
 
   void checkTimestamps(final Map<byte[], List<KeyValue>> familyMap,
       long now) throws DoNotRetryIOException {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1356566&r1=1356565&r2=1356566&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Jul  3 03:53:11 2012
@@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.client.Ge
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.MultiAction;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.RowMutations;
@@ -3224,7 +3225,7 @@ public class  HRegionServer implements C
         mutateRows(region, mutates);
       } else {
         ActionResult.Builder resultBuilder = null;
-        List<Mutate> puts = new ArrayList<Mutate>();
+        List<Mutate> mutates = new ArrayList<Mutate>();
         for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
           requestCount.incrementAndGet();
           try {
@@ -3239,10 +3240,10 @@ public class  HRegionServer implements C
             } else if (actionUnion.hasMutate()) {
               Mutate mutate = actionUnion.getMutate();
               MutateType type = mutate.getMutateType();
-              if (type != MutateType.PUT) {
-                if (!puts.isEmpty()) {
-                  put(builder, region, puts);
-                  puts.clear();
+              if (type != MutateType.PUT && type != MutateType.DELETE) {
+                if (!mutates.isEmpty()) {
+                  doBatchOp(builder, region, mutates);
+                  mutates.clear();
                 } else if (!region.getRegionInfo().isMetaTable()) {
                   cacheFlusher.reclaimMemStoreMemory();
                 }
@@ -3256,17 +3257,13 @@ public class  HRegionServer implements C
                 r = increment(region, mutate);
                 break;
               case PUT:
-                puts.add(mutate);
+                mutates.add(mutate);
                 break;
               case DELETE:
-                Delete delete = ProtobufUtil.toDelete(mutate);
-                Integer lock = getLockFromId(delete.getLockId());
-                region.delete(delete, lock, delete.getWriteToWAL());
-                r = new Result();
+                mutates.add(mutate);
                 break;
-                default:
-                  throw new DoNotRetryIOException(
-                    "Unsupported mutate type: " + type.name());
+              default:
+                throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
               }
               if (r != null) {
                 result = ProtobufUtil.toResult(r);
@@ -3294,8 +3291,8 @@ public class  HRegionServer implements C
             builder.addResult(ResponseConverter.buildActionResult(ie));
           }
         }
-        if (!puts.isEmpty()) {
-          put(builder, region, puts);
+        if (!mutates.isEmpty()) {
+          doBatchOp(builder, region, mutates);
         }
       }
       return builder.build();
@@ -3755,16 +3752,16 @@ public class  HRegionServer implements C
   }
 
   /**
-   * Execute a list of put mutations.
+   * Execute a list of Put/Delete mutations.
    *
    * @param builder
    * @param region
-   * @param puts
+   * @param mutates
    */
-  protected void put(final MultiResponse.Builder builder,
-      final HRegion region, final List<Mutate> puts) {
+  protected void doBatchOp(final MultiResponse.Builder builder,
+      final HRegion region, final List<Mutate> mutates) {
     @SuppressWarnings("unchecked")
-    Pair<Put, Integer>[] putsWithLocks = new Pair[puts.size()];
+    Pair<Mutation, Integer>[] mutationsWithLocks = new Pair[mutates.size()];
 
     try {
       ActionResult.Builder resultBuilder = ActionResult.newBuilder();
@@ -3773,19 +3770,24 @@ public class  HRegionServer implements C
       ActionResult result = resultBuilder.build();
 
       int i = 0;
-      for (Mutate put : puts) {
-        Put p = ProtobufUtil.toPut(put);
-        Integer lock = getLockFromId(p.getLockId());
-        putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
+      for (Mutate m : mutates) {
+        Mutation mutation = null;
+        if (m.getMutateType() == MutateType.PUT) {
+          mutation = ProtobufUtil.toPut(m);
+        } else {
+          mutation = ProtobufUtil.toDelete(m);
+        }
+        Integer lock = getLockFromId(mutation.getLockId());
+        mutationsWithLocks[i++] = new Pair<Mutation, Integer>(mutation, lock);
         builder.addResult(result);
       }
 
-      requestCount.addAndGet(puts.size());
+      requestCount.addAndGet(mutates.size());
       if (!region.getRegionInfo().isMetaTable()) {
         cacheFlusher.reclaimMemStoreMemory();
       }
 
-      OperationStatus codes[] = region.put(putsWithLocks);
+      OperationStatus codes[] = region.batchMutate(mutationsWithLocks);
       for (i = 0; i < codes.length; i++) {
         if (codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
           result = ResponseConverter.buildActionResult(
@@ -3795,7 +3797,7 @@ public class  HRegionServer implements C
       }
     } catch (IOException ie) {
       ActionResult result = ResponseConverter.buildActionResult(ie);
-      for (int i = 0, n = puts.size(); i < n; i++) {
+      for (int i = 0, n = mutates.size(); i < n; i++) {
         builder.setResult(i, result);
       }
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java?rev=1356566&r1=1356565&r2=1356566&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java Tue Jul  3 03:53:11 2012
@@ -69,7 +69,7 @@ class MultiRowMutationProcessor extends 
       } else if (m instanceof Delete) {
         Delete d = (Delete) m;
         region.prepareDelete(d);
-        region.prepareDeleteTimestamps(d, byteNow);
+        region.prepareDeleteTimestamps(d.getFamilyMap(), byteNow);
       } else {
         throw new DoNotRetryIOException(
             "Action must be Put or Delete. But was: "

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/OperationMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/OperationMetrics.java?rev=1356566&r1=1356565&r2=1356566&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/OperationMetrics.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/OperationMetrics.java Tue Jul  3 03:53:11 2012
@@ -46,6 +46,7 @@ public class OperationMetrics {
   private static final String ICV_KEY = "incrementColumnValue_";
   private static final String INCREMENT_KEY = "increment_";
   private static final String MULTIPUT_KEY = "multiput_";
+  private static final String MULTIDELETE_KEY = "multidelete_";
   private static final String APPEND_KEY = "append_";
   
   /** Conf key controlling whether we should expose metrics.*/
@@ -102,6 +103,16 @@ public class OperationMetrics {
   }
 
   /**
+   * Update the stats associated with {@link HTable#delete(java.util.List)}.
+   *
+   * @param columnFamilies Set of CF's this multidelete is associated with
+   * @param value the time
+   */
+  public void updateMultiDeleteMetrics(Set<byte[]> columnFamilies, long value) {
+    doUpdateTimeVarying(columnFamilies, MULTIDELETE_KEY, value);
+  }
+  
+  /**
    * Update the metrics associated with a {@link Get}
    * 
    * @param columnFamilies

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1356566&r1=1356565&r2=1356566&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Tue Jul  3 03:53:11 2012
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.client.Ge
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -610,14 +611,14 @@ public class TestHRegion extends HBaseTe
       LOG.info("Nexta, a batch put which uses an already-held lock");
       lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
       LOG.info("...obtained row lock");
-      List<Pair<Put, Integer>> putsAndLocks = Lists.newArrayList();
+      List<Pair<Mutation, Integer>> putsAndLocks = Lists.newArrayList();
       for (int i = 0; i < 10; i++) {
-        Pair<Put, Integer> pair = new Pair<Put, Integer>(puts[i], null);
+        Pair<Mutation, Integer> pair = new Pair<Mutation, Integer>(puts[i], null);
         if (i == 2) pair.setSecond(lockedRow);
         putsAndLocks.add(pair);
       }
   
-      codes = region.put(putsAndLocks.toArray(new Pair[0]));
+      codes = region.batchMutate(putsAndLocks.toArray(new Pair[0]));
       LOG.info("...performed put");
       for (int i = 0; i < 10; i++) {
         assertEquals((i == 5) ? OperationStatusCode.SANITY_CHECK_FAILURE :