You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/07/17 07:34:00 UTC

svn commit: r1504002 - in /hbase/trunk: hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ hbase-ser...

Author: larsh
Date: Wed Jul 17 05:34:00 2013
New Revision: 1504002

URL: http://svn.apache.org/r1504002
Log:
HBASE-8877 Reentrant row locks (Dave Latham)

Modified:
    hbase/trunk/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java

Modified: hbase/trunk/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java?rev=1504002&r1=1504001&r2=1504002&view=diff
==============================================================================
--- hbase/trunk/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java (original)
+++ hbase/trunk/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java Wed Jul 17 05:34:00 2013
@@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.OperationStatus;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
@@ -146,20 +145,19 @@ public class BulkDeleteEndpoint extends 
           }
         }
         if (deleteRows.size() > 0) {
-          Pair<Mutation, Integer>[] deleteWithLockArr = new Pair[deleteRows.size()];
+          Mutation[] deleteArr = new Mutation[deleteRows.size()];
           int i = 0;
           for (List<KeyValue> deleteRow : deleteRows) {
-            Delete delete = createDeleteMutation(deleteRow, deleteType, timestamp);
-            deleteWithLockArr[i++] = new Pair<Mutation, Integer>(delete, null);
+            deleteArr[i++] = createDeleteMutation(deleteRow, deleteType, timestamp);
           }
-          OperationStatus[] opStatus = region.batchMutate(deleteWithLockArr);
+          OperationStatus[] opStatus = region.batchMutate(deleteArr);
           for (i = 0; i < opStatus.length; i++) {
             if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
               break;
             }
             totalRowsDeleted++;
             if (deleteType == DeleteType.VERSION) {
-              byte[] versionsDeleted = deleteWithLockArr[i].getFirst().getAttribute(
+              byte[] versionsDeleted = deleteArr[i].getAttribute(
                   NO_OF_VERSIONS_TO_DELETE);
               if (versionsDeleted != null) {
                 totalVersionsDeleted += Bytes.toInt(versionsDeleted);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1504002&r1=1504001&r2=1504002&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java Wed Jul 17 05:34:00 2013
@@ -253,12 +253,12 @@ public abstract class BaseRegionObserver
   
   @Override
   public void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
-      final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
   }
 
   @Override
   public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
-      final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
   }
 
   @Override

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1504002&r1=1504001&r2=1504002&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Wed Jul 17 05:34:00 2013
@@ -554,7 +554,7 @@ public interface RegionObserver extends 
    * @throws IOException if an error occurred on the coprocessor
    */
   void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
-      final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException;
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException;
 
   /**
    * This will be called after applying a batch of Mutations on a region. The Mutations are added to
@@ -564,7 +564,7 @@ public interface RegionObserver extends 
    * @throws IOException if an error occurred on the coprocessor
    */
   void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
-      final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException;
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException;
 
   /**
    * Called before checkAndPut

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1504002&r1=1504001&r2=1504002&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Jul 17 05:34:00 2013
@@ -34,7 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
-import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -138,6 +137,7 @@ import org.apache.hadoop.io.MultipleIOEx
 import org.apache.hadoop.util.StringUtils;
 import org.cliffc.high_scale_lib.Counter;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -223,12 +223,13 @@ public class HRegion implements HeapSize
   // Members
   //////////////////////////////////////////////////////////////////////////////
 
-  private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
-    new ConcurrentHashMap<HashedBytes, CountDownLatch>();
-  private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
-    new ConcurrentHashMap<Integer, HashedBytes>();
-  private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
-  static private Random rand = new Random();
+  // map from a locked row to the context for that lock including:
+  // - CountDownLatch for threads waiting on that row
+  // - the thread that owns the lock (allow reentrancy)
+  // - reference count of (reentrant) locks held by the thread
+  // - the row itself
+  private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
+      new ConcurrentHashMap<HashedBytes, RowLockContext>();
 
   protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
       Bytes.BYTES_RAWCOMPARATOR);
@@ -1764,7 +1765,7 @@ public class HRegion implements HeapSize
     try {
       delete.getRow();
       // All edits for the given row (across all column families) must happen atomically.
-      doBatchMutate(delete, null);
+      doBatchMutate(delete);
     } finally {
       closeRegionOperation();
     }
@@ -1787,7 +1788,7 @@ public class HRegion implements HeapSize
     delete.setFamilyMap(familyMap);
     delete.setClusterId(clusterId);
     delete.setDurability(durability);
-    doBatchMutate(delete, null);
+    doBatchMutate(delete);
   }
 
   /**
@@ -1862,7 +1863,7 @@ public class HRegion implements HeapSize
     this.writeRequestsCount.increment();
     try {
       // All edits for the given row (across all column families) must happen atomically.
-      doBatchMutate(put, null);
+      doBatchMutate(put);
     } finally {
       closeRegionOperation();
     }
@@ -1892,46 +1893,29 @@ public class HRegion implements HeapSize
   }
 
   /**
-   * Perform a batch put with no pre-specified locks
-   * @see HRegion#batchMutate(Pair[])
-   */
-  public OperationStatus[] put(Put[] puts) throws IOException {
-    @SuppressWarnings("unchecked")
-    Pair<Mutation, Integer> putsAndLocks[] = new Pair[puts.length];
-
-    for (int i = 0; i < puts.length; i++) {
-      putsAndLocks[i] = new Pair<Mutation, Integer>(puts[i], null);
-    }
-    return batchMutate(putsAndLocks);
-  }
-
-  /**
    * Perform a batch of mutations.
    * It supports only Put and Delete mutations and will ignore other types passed.
-   * @param mutationsAndLocks
-   *          the list of mutations paired with their requested lock IDs.
+   * @param mutations the list of mutations
    * @return an array of OperationStatus which internally contains the
    *         OperationStatusCode and the exceptionMessage if any.
    * @throws IOException
    */
-  public OperationStatus[] batchMutate(
-      Pair<Mutation, Integer>[] mutationsAndLocks) throws IOException {
-    return batchMutate(mutationsAndLocks, false);
+  public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
+    return batchMutate(mutations, false);
   }
 
   /**
    * Perform a batch of mutations.
    * It supports only Put and Delete mutations and will ignore other types passed.
-   * @param mutationsAndLocks
-   *          the list of mutations paired with their requested lock IDs.
+   * @param mutations the list of mutations
    * @return an array of OperationStatus which internally contains the
    *         OperationStatusCode and the exceptionMessage if any.
    * @throws IOException
    */
-  OperationStatus[] batchMutate(Pair<Mutation, Integer>[] mutationsAndLocks, boolean isReplay)
+  OperationStatus[] batchMutate(Mutation[] mutations, boolean isReplay)
       throws IOException {
-    BatchOperationInProgress<Pair<Mutation, Integer>> batchOp =
-      new BatchOperationInProgress<Pair<Mutation,Integer>>(mutationsAndLocks);
+    BatchOperationInProgress<Mutation> batchOp =
+      new BatchOperationInProgress<Mutation>(mutations);
 
     boolean initialized = false;
 
@@ -1969,14 +1953,13 @@ public class HRegion implements HeapSize
   }
 
 
-  private void doPreMutationHook(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp)
+  private void doPreMutationHook(BatchOperationInProgress<Mutation> batchOp)
       throws IOException {
     /* Run coprocessor pre hook outside of locks to avoid deadlock */
     WALEdit walEdit = new WALEdit();
     if (coprocessorHost != null) {
       for (int i = 0 ; i < batchOp.operations.length; i++) {
-        Pair<Mutation, Integer> nextPair = batchOp.operations[i];
-        Mutation m = nextPair.getFirst();
+        Mutation m = batchOp.operations[i];
         if (m instanceof Put) {
           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
             // pre hook says skip this Put
@@ -2005,7 +1988,7 @@ public class HRegion implements HeapSize
   }
 
   @SuppressWarnings("unchecked")
-  private long doMiniBatchMutation(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
+  private long doMiniBatchMutation(BatchOperationInProgress<Mutation> batchOp,
       boolean isInReplay) throws IOException {
 
     // variable to note if all Put items are for the same CF -- metrics related
@@ -2024,7 +2007,7 @@ public class HRegion implements HeapSize
     boolean locked = false;
 
     /** Keep track of the locks we hold so we can release them in finally clause */
-    List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
+    List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
     // reference family maps directly so coprocessors can mutate them if desired
     Map<byte[], List<? extends Cell>>[] familyMaps = new Map[batchOp.operations.length];
     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
@@ -2040,10 +2023,8 @@ public class HRegion implements HeapSize
       int numReadyToWrite = 0;
       long now = EnvironmentEdgeManager.currentTimeMillis();
       while (lastIndexExclusive < batchOp.operations.length) {
-        Pair<Mutation, Integer> nextPair = batchOp.operations[lastIndexExclusive];
-        Mutation mutation = nextPair.getFirst();
+        Mutation mutation = batchOp.operations[lastIndexExclusive];
         boolean isPutMutation = mutation instanceof Put;
-        Integer providedLockId = nextPair.getSecond();
 
         Map<byte[], List<? extends Cell>> familyMap = mutation.getFamilyMap();
         // store the family map reference to allow for mutations
@@ -2081,25 +2062,25 @@ public class HRegion implements HeapSize
           lastIndexExclusive++;
           continue;
         }
+        
         // If we haven't got any rows in our batch, we should block to
         // get the next one.
         boolean shouldBlock = numReadyToWrite == 0;
-        Integer acquiredLockId = null;
+        RowLock rowLock = null;
         try {
-          acquiredLockId = getLock(providedLockId, mutation.getRow(),
-              shouldBlock);
+          rowLock = getRowLock(mutation.getRow(), shouldBlock);
         } catch (IOException ioe) {
           LOG.warn("Failed getting lock in batch put, row="
-                  + Bytes.toStringBinary(mutation.getRow()), ioe);
+            + Bytes.toStringBinary(mutation.getRow()), ioe);
         }
-        if (acquiredLockId == null) {
+        if (rowLock == null) {
           // We failed to grab another lock
           assert !shouldBlock : "Should never fail to get lock when blocking";
           break; // stop acquiring more rows for this batch
+        } else {
+          acquiredRowLocks.add(rowLock);
         }
-        if (providedLockId == null) {
-          acquiredLocks.add(acquiredLockId);
-        }
+
         lastIndexExclusive++;
         numReadyToWrite++;
 
@@ -2141,7 +2122,7 @@ public class HRegion implements HeapSize
         if (batchOp.retCodeDetails[i].getOperationStatusCode()
             != OperationStatusCode.NOT_RUN) continue;
 
-        Mutation mutation = batchOp.operations[i].getFirst();
+        Mutation mutation = batchOp.operations[i];
         if (mutation instanceof Put) {
           updateKVTimestamps(familyMaps[i].values(), byteNow);
           noOfPuts++;
@@ -2162,8 +2143,8 @@ public class HRegion implements HeapSize
 
       // calling the pre CP hook for batch mutation
       if (!isInReplay && coprocessorHost != null) {
-        MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
-          new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
+        MiniBatchOperationInProgress<Mutation> miniBatchOp = 
+          new MiniBatchOperationInProgress<Mutation>(batchOp.operations, 
           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
       }
@@ -2198,7 +2179,7 @@ public class HRegion implements HeapSize
         }
         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
 
-        Mutation m = batchOp.operations[i].getFirst();
+        Mutation m = batchOp.operations[i];
         Durability tmpDur = getEffectiveDurability(m.getDurability());
         if (tmpDur.ordinal() > durability.ordinal()) {
           durability = tmpDur;
@@ -2221,10 +2202,10 @@ public class HRegion implements HeapSize
       // -------------------------
       // STEP 5. Append the edit to WAL. Do not sync wal.
       // -------------------------
-      Mutation first = batchOp.operations[firstIndex].getFirst();
+      Mutation mutation = batchOp.operations[firstIndex];
       if (walEdit.size() > 0) {
         txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
-               walEdit, first.getClusterId(), now, this.htableDescriptor);
+               walEdit, mutation.getClusterId(), now, this.htableDescriptor);
       }
 
       // -------------------------------
@@ -2234,12 +2215,8 @@ public class HRegion implements HeapSize
         this.updatesLock.readLock().unlock();
         locked = false;
       }
-      if (acquiredLocks != null) {
-        for (Integer toRelease : acquiredLocks) {
-          releaseRowLock(toRelease);
-        }
-        acquiredLocks = null;
-      }
+      releaseRowLocks(acquiredRowLocks);
+      
       // -------------------------
       // STEP 7. Sync wal.
       // -------------------------
@@ -2249,8 +2226,8 @@ public class HRegion implements HeapSize
       walSyncSuccessful = true;
       // calling the post CP hook for batch mutation
       if (!isInReplay && coprocessorHost != null) {
-        MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
-          new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
+        MiniBatchOperationInProgress<Mutation> miniBatchOp = 
+          new MiniBatchOperationInProgress<Mutation>(batchOp.operations, 
           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
         coprocessorHost.postBatchMutate(miniBatchOp);
       }
@@ -2274,7 +2251,7 @@ public class HRegion implements HeapSize
               != OperationStatusCode.SUCCESS) {
             continue;
           }
-          Mutation m = batchOp.operations[i].getFirst();
+          Mutation m = batchOp.operations[i];
           if (m instanceof Put) {
             coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
           } else {
@@ -2296,12 +2273,7 @@ public class HRegion implements HeapSize
       if (locked) {
         this.updatesLock.readLock().unlock();
       }
-
-      if (acquiredLocks != null) {
-        for (Integer toRelease : acquiredLocks) {
-          releaseRowLock(toRelease);
-        }
-      }
+      releaseRowLocks(acquiredRowLocks);
 
       // See if the column families were consistent through the whole thing.
       // if they were then keep them. If they were not then pass a null.
@@ -2378,8 +2350,8 @@ public class HRegion implements HeapSize
       checkFamily(family);
       get.addColumn(family, qualifier);
 
-      // Lock row
-      Integer lid = getLock(null, get.getRow(), true);
+      // Lock row - note that doBatchMutate will relock this row if called
+      RowLock rowLock = getRowLock(get.getRow());
       // wait for all previous transactions to complete (with lock held)
       mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
       List<KeyValue> result = null;
@@ -2425,27 +2397,23 @@ public class HRegion implements HeapSize
         if (matches) {
           // All edits for the given row (across all column families) must
           // happen atomically.
-          doBatchMutate((Mutation)w, lid);
+          doBatchMutate((Mutation)w);
           this.checkAndMutateChecksPassed.increment();
           return true;
         }
         this.checkAndMutateChecksFailed.increment();
         return false;
       } finally {
-        releaseRowLock(lid);
+        rowLock.release();
       }
     } finally {
       closeRegionOperation();
     }
   }
 
-  @SuppressWarnings("unchecked")
-  private void doBatchMutate(Mutation mutation, Integer lid) throws IOException,
+  private void doBatchMutate(Mutation mutation) throws IOException,
       org.apache.hadoop.hbase.exceptions.DoNotRetryIOException {
-    Pair<Mutation, Integer>[] mutateWithLocks = new Pair[] {
-      new Pair<Mutation, Integer>(mutation, lid)
-    };
-    OperationStatus[] batchMutate = this.batchMutate(mutateWithLocks);
+    OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation });
     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
@@ -2621,7 +2589,7 @@ public class HRegion implements HeapSize
     Put p = new Put(row);
     p.setFamilyMap(familyMap);
     p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
-    doBatchMutate(p, null);
+    doBatchMutate(p);
   }
 
   /**
@@ -2672,7 +2640,7 @@ public class HRegion implements HeapSize
    * called when a Put/Delete has updated memstore but subequently fails to update
    * the wal. This method is then invoked to rollback the memstore.
    */
-  private void rollbackMemstore(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
+  private void rollbackMemstore(BatchOperationInProgress<Mutation> batchOp,
                                 Map<byte[], List<? extends Cell>>[] familyMaps,
                                 int start, int end) {
     int kvsRolledback = 0;
@@ -3182,138 +3150,76 @@ public class HRegion implements HeapSize
   }
 
   /**
-   * Obtain a lock on the given row.  Blocks until success.
-   *
-   * I know it's strange to have two mappings:
-   * <pre>
-   *   ROWS  ==> LOCKS
-   * </pre>
-   * as well as
-   * <pre>
-   *   LOCKS ==> ROWS
-   * </pre>
-   * <p>It would be more memory-efficient to just have one mapping;
-   * maybe we'll do that in the future.
-   *
-   * @param row Name of row to lock.
-   * @throws IOException
-   * @return The id of the held lock.
-   */
-  public Integer obtainRowLock(final byte [] row) throws IOException {
-    startRegionOperation();
-    this.writeRequestsCount.increment();
-    try {
-      return internalObtainRowLock(row, true);
-    } finally {
-      closeRegionOperation();
-    }
-  }
-
-  /**
-   * Obtains or tries to obtain the given row lock.
+   * Tries to acquire a lock on the given row.
    * @param waitForLock if true, will block until the lock is available.
    *        Otherwise, just tries to obtain the lock and returns
-   *        null if unavailable.
+   *        false if unavailable.
+   * @return the row lock if acquired,
+   *   null if waitForLock was false and the lock was not acquired
+   * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
    */
-  private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
-  throws IOException {
+  public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
     checkRow(row, "row lock");
     startRegionOperation();
     try {
       HashedBytes rowKey = new HashedBytes(row);
-      CountDownLatch rowLatch = new CountDownLatch(1);
+      RowLockContext rowLockContext = new RowLockContext(rowKey);
 
       // loop until we acquire the row lock (unless !waitForLock)
       while (true) {
-        CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
-        if (existingLatch == null) {
+        RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
+        if (existingContext == null) {
+          // Row is not already locked by any thread, use newly created context.
+          break;
+        } else if (existingContext.ownedByCurrentThread()) {
+          // Row is already locked by current thread, reuse existing context instead.
+          rowLockContext = existingContext;
           break;
         } else {
-          // row already locked
+          // Row is already locked by some other thread, give up or wait for it
           if (!waitForLock) {
             return null;
           }
           try {
-            if (!existingLatch.await(this.rowLockWaitDuration,
-                            TimeUnit.MILLISECONDS)) {
-              throw new IOException("Timed out on getting lock for row="
-                  + Bytes.toStringBinary(row));
+            if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
+              throw new IOException("Timed out waiting for lock for row: " + rowKey);
             }
           } catch (InterruptedException ie) {
-            LOG.warn("internalObtainRowLock interrupted for row=" + Bytes.toStringBinary(row));
+            LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
             InterruptedIOException iie = new InterruptedIOException();
             iie.initCause(ie);
             throw iie;
           }
         }
       }
-
-      // loop until we generate an unused lock id
-      while (true) {
-        Integer lockId = lockIdGenerator.incrementAndGet();
-        HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
-        if (existingRowKey == null) {
-          return lockId;
-        } else {
-          // lockId already in use, jump generator to a new spot
-          lockIdGenerator.set(rand.nextInt());
-        }
-      }
+      
+      // allocate new lock for this thread
+      return rowLockContext.newLock();
     } finally {
       closeRegionOperation();
     }
   }
 
   /**
-   * Release the row lock!
-   * @param lockId  The lock ID to release.
-   */
-  public void releaseRowLock(final Integer lockId) {
-    if (lockId == null) return; // null lock id, do nothing
-    HashedBytes rowKey = lockIds.remove(lockId);
-    if (rowKey == null) {
-      LOG.warn("Release unknown lockId: " + lockId);
-      return;
-    }
-    CountDownLatch rowLatch = lockedRows.remove(rowKey);
-    if (rowLatch == null) {
-      LOG.error("Releases row not locked, lockId: " + lockId + " row: "
-          + rowKey);
-      return;
-    }
-    rowLatch.countDown();
-  }
-
-  /**
-   * See if row is currently locked.
-   * @param lockId
-   * @return boolean
+   * Acqures a lock on the given row.
+   * The same thread may acquire multiple locks on the same row.
+   * @return the acquired row lock
+   * @throws IOException if the lock could not be acquired after waiting
    */
-  boolean isRowLocked(final Integer lockId) {
-    return lockIds.containsKey(lockId);
+  public RowLock getRowLock(byte[] row) throws IOException {
+    return getRowLock(row, true);
   }
 
   /**
-   * Returns existing row lock if found, otherwise
-   * obtains a new row lock and returns it.
-   * @param lockid requested by the user, or null if the user didn't already hold lock
-   * @param row the row to lock
-   * @param waitForLock if true, will block until the lock is available, otherwise will
-   * simply return null if it could not acquire the lock.
-   * @return lockid or null if waitForLock is false and the lock was unavailable.
+   * If the given list of row locks is not null, releases all locks.
    */
-  public Integer getLock(Integer lockid, byte [] row, boolean waitForLock)
-  throws IOException {
-    Integer lid = null;
-    if (lockid == null) {
-      lid = internalObtainRowLock(row, waitForLock);
-    } else {
-      if (!isRowLocked(lockid)) {
-        throw new IOException("Invalid row lock");
+  public void releaseRowLocks(List<RowLock> rowLocks) {
+    if (rowLocks != null) {
+      for (RowLock rowLock : rowLocks) {
+        rowLock.release();
       }
-      lid = lockid;
+      rowLocks.clear();
     }
-    return lid;
   }
 
   /**
@@ -4583,24 +4489,19 @@ public class HRegion implements HeapSize
     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
     boolean locked = false;
     boolean walSyncSuccessful = false;
-    List<Integer> acquiredLocks = null;
+    List<RowLock> acquiredRowLocks = null;
     long addedSize = 0;
     List<KeyValue> mutations = new ArrayList<KeyValue>();
     Collection<byte[]> rowsToLock = processor.getRowsToLock();
     try {
       // 2. Acquire the row lock(s)
-      acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
+      acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
       for (byte[] row : rowsToLock) {
-        // Attempt to lock all involved rows, fail if one lock times out
-        Integer lid = getLock(null, row, true);
-        if (lid == null) {
-          throw new IOException("Failed to acquire lock on "
-              + Bytes.toStringBinary(row));
-        }
-        acquiredLocks.add(lid);
+        // Attempt to lock all involved rows, throw if any lock times out
+        acquiredRowLocks.add(getRowLock(row));
       }
       // 3. Region lock
-      lock(this.updatesLock.readLock(), acquiredLocks.size());
+      lock(this.updatesLock.readLock(), acquiredRowLocks.size());
       locked = true;
 
       long now = EnvironmentEdgeManager.currentTimeMillis();
@@ -4635,12 +4536,8 @@ public class HRegion implements HeapSize
           }
 
           // 9. Release row lock(s)
-          if (acquiredLocks != null) {
-            for (Integer lid : acquiredLocks) {
-              releaseRowLock(lid);
-            }
-            acquiredLocks = null;
-          }
+          releaseRowLocks(acquiredRowLocks);
+
           // 10. Sync edit log
           if (txid != 0) {
             syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
@@ -4665,12 +4562,8 @@ public class HRegion implements HeapSize
           this.updatesLock.readLock().unlock();
           locked = false;
         }
-        if (acquiredLocks != null) {
-          for (Integer lid : acquiredLocks) {
-            releaseRowLock(lid);
-          }
-        }
-
+        // release locks if some were acquired but another timed out
+        releaseRowLocks(acquiredRowLocks);
       }
 
       // 12. Run post-process hook
@@ -4765,125 +4658,129 @@ public class HRegion implements HeapSize
     startRegionOperation(Operation.APPEND);
     this.writeRequestsCount.increment();
     WriteEntry w = null;
+    RowLock rowLock = null;
     try {
-      Integer lid = getLock(null, row, true);
-      lock(this.updatesLock.readLock());
-      // wait for all prior MVCC transactions to finish - while we hold the row lock
-      // (so that we are guaranteed to see the latest state)
-      mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
-      // now start my own transaction
-      w = mvcc.beginMemstoreInsert();
+      rowLock = getRowLock(row);
       try {
-        long now = EnvironmentEdgeManager.currentTimeMillis();
-        // Process each family
-        for (Map.Entry<byte[], List<? extends Cell>> family : append.getFamilyMap().entrySet()) {
-
-          Store store = stores.get(family.getKey());
-          List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
-
-          // Get previous values for all columns in this family
-          Get get = new Get(row);
-          for (Cell cell : family.getValue()) {
-            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-            get.addColumn(family.getKey(), kv.getQualifier());
-          }
-          List<KeyValue> results = get(get, false);
-
-          // Iterate the input columns and update existing values if they were
-          // found, otherwise add new column initialized to the append value
-
-          // Avoid as much copying as possible. Every byte is copied at most
-          // once.
-          // Would be nice if KeyValue had scatter/gather logic
-          int idx = 0;
-          for (Cell cell : family.getValue()) {
-            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-            KeyValue newKV;
-            if (idx < results.size()
-                && results.get(idx).matchingQualifier(kv.getBuffer(),
-                    kv.getQualifierOffset(), kv.getQualifierLength())) {
-              KeyValue oldKv = results.get(idx);
-              // allocate an empty kv once
-              newKV = new KeyValue(row.length, kv.getFamilyLength(),
-                  kv.getQualifierLength(), now, KeyValue.Type.Put,
-                  oldKv.getValueLength() + kv.getValueLength());
-              // copy in the value
-              System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(),
-                  newKV.getBuffer(), newKV.getValueOffset(),
-                  oldKv.getValueLength());
-              System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
-                  newKV.getBuffer(),
-                  newKV.getValueOffset() + oldKv.getValueLength(),
-                  kv.getValueLength());
-              idx++;
-            } else {
-              // allocate an empty kv once
-              newKV = new KeyValue(row.length, kv.getFamilyLength(),
-                  kv.getQualifierLength(), now, KeyValue.Type.Put,
-                  kv.getValueLength());
-              // copy in the value
-              System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
-                  newKV.getBuffer(), newKV.getValueOffset(),
-                  kv.getValueLength());
+        lock(this.updatesLock.readLock());
+        // wait for all prior MVCC transactions to finish - while we hold the row lock
+        // (so that we are guaranteed to see the latest state)
+        mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+        // now start my own transaction
+        w = mvcc.beginMemstoreInsert();
+        try {
+          long now = EnvironmentEdgeManager.currentTimeMillis();
+          // Process each family
+          for (Map.Entry<byte[], List<? extends Cell>> family : append.getFamilyMap().entrySet()) {
+  
+            Store store = stores.get(family.getKey());
+            List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
+  
+            // Get previous values for all columns in this family
+            Get get = new Get(row);
+            for (Cell cell : family.getValue()) {
+              KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+              get.addColumn(family.getKey(), kv.getQualifier());
             }
-            // copy in row, family, and qualifier
-            System.arraycopy(kv.getBuffer(), kv.getRowOffset(),
-                newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength());
-            System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(),
-                newKV.getBuffer(), newKV.getFamilyOffset(),
-                kv.getFamilyLength());
-            System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
-                newKV.getBuffer(), newKV.getQualifierOffset(),
-                kv.getQualifierLength());
-
-            newKV.setMemstoreTS(w.getWriteNumber());
-            kvs.add(newKV);
-
-            // Append update to WAL
-            if (writeToWAL) {
-              if (walEdits == null) {
-                walEdits = new WALEdit();
+            List<KeyValue> results = get(get, false);
+  
+            // Iterate the input columns and update existing values if they were
+            // found, otherwise add new column initialized to the append value
+  
+            // Avoid as much copying as possible. Every byte is copied at most
+            // once.
+            // Would be nice if KeyValue had scatter/gather logic
+            int idx = 0;
+            for (Cell cell : family.getValue()) {
+              KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+              KeyValue newKV;
+              if (idx < results.size()
+                  && results.get(idx).matchingQualifier(kv.getBuffer(),
+                      kv.getQualifierOffset(), kv.getQualifierLength())) {
+                KeyValue oldKv = results.get(idx);
+                // allocate an empty kv once
+                newKV = new KeyValue(row.length, kv.getFamilyLength(),
+                    kv.getQualifierLength(), now, KeyValue.Type.Put,
+                    oldKv.getValueLength() + kv.getValueLength());
+                // copy in the value
+                System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(),
+                    newKV.getBuffer(), newKV.getValueOffset(),
+                    oldKv.getValueLength());
+                System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
+                    newKV.getBuffer(),
+                    newKV.getValueOffset() + oldKv.getValueLength(),
+                    kv.getValueLength());
+                idx++;
+              } else {
+                // allocate an empty kv once
+                newKV = new KeyValue(row.length, kv.getFamilyLength(),
+                    kv.getQualifierLength(), now, KeyValue.Type.Put,
+                    kv.getValueLength());
+                // copy in the value
+                System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
+                    newKV.getBuffer(), newKV.getValueOffset(),
+                    kv.getValueLength());
+              }
+              // copy in row, family, and qualifier
+              System.arraycopy(kv.getBuffer(), kv.getRowOffset(),
+                  newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength());
+              System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(),
+                  newKV.getBuffer(), newKV.getFamilyOffset(),
+                  kv.getFamilyLength());
+              System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
+                  newKV.getBuffer(), newKV.getQualifierOffset(),
+                  kv.getQualifierLength());
+  
+              newKV.setMemstoreTS(w.getWriteNumber());
+              kvs.add(newKV);
+  
+              // Append update to WAL
+              if (writeToWAL) {
+                if (walEdits == null) {
+                  walEdits = new WALEdit();
+                }
+                walEdits.add(newKV);
               }
-              walEdits.add(newKV);
             }
-          }
-
-          //store the kvs to the temporary memstore before writing HLog
-          tempMemstore.put(store, kvs);
-        }
-
-        // Actually write to WAL now
-        if (writeToWAL) {
-          // Using default cluster id, as this can only happen in the orginating
-          // cluster. A slave cluster receives the final value (not the delta)
-          // as a Put.
-          txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
-            walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
-            this.htableDescriptor);
-        } else {
-          recordMutationWithoutWal(append.getFamilyMap());
-        }
-
-        //Actually write to Memstore now
-        for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
-          Store store = entry.getKey();
-          if (store.getFamily().getMaxVersions() == 1) {
-            // upsert if VERSIONS for this CF == 1
-            size += store.upsert(entry.getValue(), getSmallestReadPoint());
+  
+            //store the kvs to the temporary memstore before writing HLog
+            tempMemstore.put(store, kvs);
+          }
+  
+          // Actually write to WAL now
+          if (writeToWAL) {
+            // Using default cluster id, as this can only happen in the orginating
+            // cluster. A slave cluster receives the final value (not the delta)
+            // as a Put.
+            txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
+              walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
+              this.htableDescriptor);
           } else {
-            // otherwise keep older versions around
-            for (Cell cell: entry.getValue()) {
-              KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-              size += store.add(kv);
+            recordMutationWithoutWal(append.getFamilyMap());
+          }
+  
+          //Actually write to Memstore now
+          for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
+            Store store = entry.getKey();
+            if (store.getFamily().getMaxVersions() == 1) {
+              // upsert if VERSIONS for this CF == 1
+              size += store.upsert(entry.getValue(), getSmallestReadPoint());
+            } else {
+              // otherwise keep older versions around
+              for (Cell cell: entry.getValue()) {
+                KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+                size += store.add(kv);
+              }
             }
+            allKVs.addAll(entry.getValue());
           }
-          allKVs.addAll(entry.getValue());
+          size = this.addAndGetGlobalMemstoreSize(size);
+          flush = isFlushSize(size);
+        } finally {
+          this.updatesLock.readLock().unlock();
         }
-        size = this.addAndGetGlobalMemstoreSize(size);
-        flush = isFlushSize(size);
       } finally {
-        this.updatesLock.readLock().unlock();
-        releaseRowLock(lid);
+        rowLock.release();
       }
       if (writeToWAL) {
         // sync the transaction log outside the rowlock
@@ -4936,100 +4833,103 @@ public class HRegion implements HeapSize
     this.writeRequestsCount.increment();
     WriteEntry w = null;
     try {
-      Integer lid = getLock(null, row, true);
-      lock(this.updatesLock.readLock());
-      // wait for all prior MVCC transactions to finish - while we hold the row lock
-      // (so that we are guaranteed to see the latest state)
-      mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
-      // now start my own transaction
-      w = mvcc.beginMemstoreInsert();
+      RowLock rowLock = getRowLock(row);
       try {
-        long now = EnvironmentEdgeManager.currentTimeMillis();
-        // Process each family
-        for (Map.Entry<byte [], List<? extends Cell>> family:
-            increment.getFamilyMap().entrySet()) {
-
-          Store store = stores.get(family.getKey());
-          List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
-
-          // Get previous values for all columns in this family
-          Get get = new Get(row);
-          for (Cell cell: family.getValue()) {
-            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-            get.addColumn(family.getKey(), kv.getQualifier());
-          }
-          get.setTimeRange(tr.getMin(), tr.getMax());
-          List<KeyValue> results = get(get, false);
-
-          // Iterate the input columns and update existing values if they were
-          // found, otherwise add new column initialized to the increment amount
-          int idx = 0;
-          for (Cell cell: family.getValue()) {
-            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-            long amount = Bytes.toLong(kv.getValue());
-            byte [] qualifier = kv.getQualifier();
-            if (idx < results.size() && results.get(idx).matchingQualifier(qualifier)) {
-              kv = results.get(idx);
-              if(kv.getValueLength() == Bytes.SIZEOF_LONG) {
-                amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG);
-              } else {
-                // throw DoNotRetryIOException instead of IllegalArgumentException
-                throw new org.apache.hadoop.hbase.exceptions.DoNotRetryIOException(
-                    "Attempted to increment field that isn't 64 bits wide");
-              }
-              idx++;
+        lock(this.updatesLock.readLock());
+        // wait for all prior MVCC transactions to finish - while we hold the row lock
+        // (so that we are guaranteed to see the latest state)
+        mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+        // now start my own transaction
+        w = mvcc.beginMemstoreInsert();
+        try {
+          long now = EnvironmentEdgeManager.currentTimeMillis();
+          // Process each family
+          for (Map.Entry<byte [], List<? extends Cell>> family:
+              increment.getFamilyMap().entrySet()) {
+  
+            Store store = stores.get(family.getKey());
+            List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
+  
+            // Get previous values for all columns in this family
+            Get get = new Get(row);
+            for (Cell cell: family.getValue()) {
+              KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+              get.addColumn(family.getKey(), kv.getQualifier());
             }
-
-            // Append new incremented KeyValue to list
-            KeyValue newKV =
-              new KeyValue(row, family.getKey(), qualifier, now, Bytes.toBytes(amount));
-            newKV.setMemstoreTS(w.getWriteNumber());
-            kvs.add(newKV);
-
-            // Prepare WAL updates
-            if (writeToWAL) {
-              if (walEdits == null) {
-                walEdits = new WALEdit();
+            get.setTimeRange(tr.getMin(), tr.getMax());
+            List<KeyValue> results = get(get, false);
+  
+            // Iterate the input columns and update existing values if they were
+            // found, otherwise add new column initialized to the increment amount
+            int idx = 0;
+            for (Cell cell: family.getValue()) {
+              KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+              long amount = Bytes.toLong(kv.getValue());
+              byte [] qualifier = kv.getQualifier();
+              if (idx < results.size() && results.get(idx).matchingQualifier(qualifier)) {
+                kv = results.get(idx);
+                if(kv.getValueLength() == Bytes.SIZEOF_LONG) {
+                  amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG);
+                } else {
+                  // throw DoNotRetryIOException instead of IllegalArgumentException
+                  throw new org.apache.hadoop.hbase.exceptions.DoNotRetryIOException(
+                      "Attempted to increment field that isn't 64 bits wide");
+                }
+                idx++;
+              }
+  
+              // Append new incremented KeyValue to list
+              KeyValue newKV =
+                new KeyValue(row, family.getKey(), qualifier, now, Bytes.toBytes(amount));
+              newKV.setMemstoreTS(w.getWriteNumber());
+              kvs.add(newKV);
+  
+              // Prepare WAL updates
+              if (writeToWAL) {
+                if (walEdits == null) {
+                  walEdits = new WALEdit();
+                }
+                walEdits.add(newKV);
               }
-              walEdits.add(newKV);
             }
-          }
-
-          //store the kvs to the temporary memstore before writing HLog
-          tempMemstore.put(store, kvs);
-        }
-
-        // Actually write to WAL now
-        if (writeToWAL) {
-          // Using default cluster id, as this can only happen in the orginating
-          // cluster. A slave cluster receives the final value (not the delta)
-          // as a Put.
-          txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
-              walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
-              this.htableDescriptor);
-        } else {
-          recordMutationWithoutWal(increment.getFamilyMap());
-        }
-        //Actually write to Memstore now
-        for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
-          Store store = entry.getKey();
-          if (store.getFamily().getMaxVersions() == 1) {
-            // upsert if VERSIONS for this CF == 1
-            size += store.upsert(entry.getValue(), getSmallestReadPoint());
+  
+            //store the kvs to the temporary memstore before writing HLog
+            tempMemstore.put(store, kvs);
+          }
+  
+          // Actually write to WAL now
+          if (writeToWAL) {
+            // Using default cluster id, as this can only happen in the orginating
+            // cluster. A slave cluster receives the final value (not the delta)
+            // as a Put.
+            txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
+                walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
+                this.htableDescriptor);
           } else {
-            // otherwise keep older versions around
-            for (Cell cell : entry.getValue()) {
-              KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-              size += store.add(kv);
+            recordMutationWithoutWal(increment.getFamilyMap());
+          }
+          //Actually write to Memstore now
+          for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
+            Store store = entry.getKey();
+            if (store.getFamily().getMaxVersions() == 1) {
+              // upsert if VERSIONS for this CF == 1
+              size += store.upsert(entry.getValue(), getSmallestReadPoint());
+            } else {
+              // otherwise keep older versions around
+              for (Cell cell : entry.getValue()) {
+                KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+                size += store.add(kv);
+              }
             }
+            allKVs.addAll(entry.getValue());
           }
-          allKVs.addAll(entry.getValue());
+          size = this.addAndGetGlobalMemstoreSize(size);
+          flush = isFlushSize(size);
+        } finally {
+          this.updatesLock.readLock().unlock();
         }
-        size = this.addAndGetGlobalMemstoreSize(size);
-        flush = isFlushSize(size);
       } finally {
-        this.updatesLock.readLock().unlock();
-        releaseRowLock(lid);
+        rowLock.release();
       }
       if (writeToWAL) {
         // sync the transaction log outside the rowlock
@@ -5069,22 +4969,32 @@ public class HRegion implements HeapSize
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
-      (12 * Bytes.SIZEOF_LONG) +
-      2 * Bytes.SIZEOF_BOOLEAN);
-
+      38 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+      (11 * Bytes.SIZEOF_LONG) +
+      4 * Bytes.SIZEOF_BOOLEAN);
+
+  // woefully out of date - currently missing:
+  // 1 x HashMap - coprocessorServiceHandlers
+  // 6 org.cliffc.high_scale_lib.Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
+  //   checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,
+  //   writeRequestsCount, updatesBlockedMs
+  // 1 x HRegion$WriteState - writestate
+  // 1 x RegionCoprocessorHost - coprocessorHost
+  // 1 x RegionSplitPolicy - splitPolicy
+  // 1 x MetricsRegion - metricsRegion
+  // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper
   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
       ClassSize.OBJECT + // closeLock
       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
       (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
-      ClassSize.ATOMIC_INTEGER + // lockIdGenerator
-      (3 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, lockIds, scannerReadPoints
+      (2 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, scannerReadPoints
       WriteState.HEAP_SIZE + // writestate
       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
       ClassSize.ARRAYLIST + // recentFlushes
       MultiVersionConsistencyControl.FIXED_SIZE // mvcc
       + ClassSize.TREEMAP // maxSeqIdInStores
+      + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
       ;
 
   @Override
@@ -5093,7 +5003,7 @@ public class HRegion implements HeapSize
     for (Store store : this.stores.values()) {
       heapSize += store.heapSize();
     }
-    // this does not take into account row locks, recent flushes, mvcc entries
+    // this does not take into account row locks, recent flushes, mvcc entries, and more
     return heapSize;
   }
 
@@ -5657,4 +5567,68 @@ public class HRegion implements HeapSize
      */
     void failedBulkLoad(byte[] family, String srcPath) throws IOException;
   }
+  
+  @VisibleForTesting class RowLockContext {
+    private final HashedBytes row;
+    private final CountDownLatch latch = new CountDownLatch(1);
+    private final Thread thread;
+    private int lockCount = 0;
+
+    RowLockContext(HashedBytes row) {
+      this.row = row;
+      this.thread = Thread.currentThread();
+    }
+    
+    boolean ownedByCurrentThread() {
+      return thread == Thread.currentThread();
+    }
+    
+    RowLock newLock() {
+      lockCount++;
+      return new RowLock(this);
+    }
+    
+    void releaseLock() {
+      if (!ownedByCurrentThread()) {
+        throw new IllegalArgumentException("Lock held by thread: " + thread
+          + " cannot be released by different thread: " + Thread.currentThread());
+      }
+      lockCount--;
+      if (lockCount == 0) {
+        // no remaining locks by the thread, unlock and allow other threads to access
+        RowLockContext existingContext = lockedRows.remove(row);
+        if (existingContext != this) {
+          throw new RuntimeException(
+              "Internal row lock state inconsistent, should not happen, row: " + row);
+        }
+        latch.countDown();
+      }
+    }
+  }
+  
+  /**
+   * Row lock held by a given thread.
+   * One thread may acquire multiple locks on the same row simultaneously.
+   * The locks must be released by calling release() from the same thread.
+   */
+  public class RowLock {
+    @VisibleForTesting final RowLockContext context;
+    private boolean released = false;
+    
+    @VisibleForTesting RowLock(RowLockContext context) {
+      this.context = context;
+    }
+    
+    /**
+     * Release the given lock.  If there are no remaining locks held by the current thread
+     * then unlock the row and allow other threads to acquire the lock.
+     * @throws IllegalArgumentException if called by a different thread than the lock owning thread
+     */
+    public void release() {
+      if (!released) {
+        context.releaseLock();
+        released = true;
+      }
+    }
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1504002&r1=1504001&r2=1504002&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Jul 17 05:34:00 2013
@@ -3956,8 +3956,7 @@ public class HRegionServer implements Cl
    */
   protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region,
       final List<MutationProto> mutations, final CellScanner cells, boolean isReplay) {
-    @SuppressWarnings("unchecked")
-    Pair<Mutation, Integer>[] mutationsWithLocks = new Pair[mutations.size()];
+    Mutation[] mArray = new Mutation[mutations.size()];
     long before = EnvironmentEdgeManager.currentTimeMillis();
     boolean batchContainsPuts = false, batchContainsDelete = false;
     try {
@@ -3974,7 +3973,7 @@ public class HRegionServer implements Cl
           mutation = ProtobufUtil.toDelete(m, cells);
           batchContainsDelete = true;
         }
-        mutationsWithLocks[i++] = new Pair<Mutation, Integer>(mutation, null);
+        mArray[i++] = mutation;
         builder.addResult(result);
       }
 
@@ -3983,7 +3982,7 @@ public class HRegionServer implements Cl
         cacheFlusher.reclaimMemStoreMemory();
       }
 
-      OperationStatus codes[] = region.batchMutate(mutationsWithLocks, isReplay);
+      OperationStatus codes[] = region.batchMutate(mArray);
       for (i = 0; i < codes.length; i++) {
         switch (codes[i].getOperationStatusCode()) {
           case BAD_FAMILY:

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1504002&r1=1504001&r2=1504002&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Wed Jul 17 05:34:00 2013
@@ -993,7 +993,7 @@ public class RegionCoprocessorHost
    * @throws IOException
    */
   public boolean preBatchMutate(
-      final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
     boolean bypass = false;
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     for (RegionEnvironment env : coprocessors) {
@@ -1018,7 +1018,7 @@ public class RegionCoprocessorHost
    * @throws IOException
    */
   public void postBatchMutate(
-      final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     for (RegionEnvironment env : coprocessors) {
       if (env.getInstance() instanceof RegionObserver) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1504002&r1=1504001&r2=1504002&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Wed Jul 17 05:34:00 2013
@@ -1061,7 +1061,7 @@ public class HBaseFsck extends Configure
         "You may need to restore the previously sidelined .META.");
       return false;
     }
-    meta.put(puts.toArray(new Put[0]));
+    meta.batchMutate(puts.toArray(new Put[0]));
     HRegion.closeHRegion(meta);
     LOG.info("Success! .META. table rebuilt.");
     LOG.info("Old .META. is moved into " + backupDir);

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1504002&r1=1504001&r2=1504002&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java Wed Jul 17 05:34:00 2013
@@ -407,7 +407,7 @@ public class SimpleRegionObserver extend
   
   @Override
   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
-      MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
     RegionCoprocessorEnvironment e = c.getEnvironment();
     assertNotNull(e);
     assertNotNull(e.getRegion());
@@ -417,7 +417,7 @@ public class SimpleRegionObserver extend
 
   @Override
   public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
-      final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
     RegionCoprocessorEnvironment e = c.getEnvironment();
     assertNotNull(e);
     assertNotNull(e.getRegion());

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java?rev=1504002&r1=1504001&r2=1504002&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java Wed Jul 17 05:34:00 2013
@@ -59,11 +59,8 @@ import org.apache.hadoop.hbase.io.HeapSi
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
-import org.apache.hadoop.hbase.util.Pair;
 import org.junit.experimental.categories.Category;
 
-import com.google.common.collect.Lists;
-
 
 /**
  * Testing of HRegion.incrementColumnValue, HRegion.increment,
@@ -528,16 +525,12 @@ public class TestAtomicOperation extends
     final MockHRegion region = (MockHRegion) TestHRegion.initHRegion(
         Bytes.toBytes(tableName), tableName, conf, Bytes.toBytes(family));
 
-    List<Pair<Mutation, Integer>> putsAndLocks = Lists.newArrayList();
     Put[] puts = new Put[1];
     Put put = new Put(Bytes.toBytes("r1"));
     put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
     puts[0] = put;
-    Pair<Mutation, Integer> pair = new Pair<Mutation, Integer>(puts[0], null);
-
-    putsAndLocks.add(pair);
-
-    region.batchMutate(putsAndLocks.toArray(new Pair[0]));
+    
+    region.batchMutate(puts);
     MultithreadedTestUtil.TestContext ctx =
       new MultithreadedTestUtil.TestContext(conf);
     ctx.addThread(new PutThread(ctx, region));
@@ -565,15 +558,12 @@ public class TestAtomicOperation extends
     }
 
     public void doWork() throws Exception {
-      List<Pair<Mutation, Integer>> putsAndLocks = Lists.newArrayList();
       Put[] puts = new Put[1];
       Put put = new Put(Bytes.toBytes("r1"));
       put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
       puts[0] = put;
-      Pair<Mutation, Integer> pair = new Pair<Mutation, Integer>(puts[0], null);
-      putsAndLocks.add(pair);
       testStep = TestStep.PUT_STARTED;
-      region.batchMutate(putsAndLocks.toArray(new Pair[0]));
+      region.batchMutate(puts);
     }
   }
 
@@ -607,43 +597,50 @@ public class TestAtomicOperation extends
     }
 
     @Override
-    public void releaseRowLock(Integer lockId) {
-      if (testStep == TestStep.INIT) {
-        super.releaseRowLock(lockId);
-        return;
+    public RowLock getRowLock(final byte[] row, boolean waitForLock) throws IOException {
+      if (testStep == TestStep.CHECKANDPUT_STARTED) {
+        latch.countDown();
       }
+      return new WrappedRowLock(super.getRowLock(row, waitForLock));
+    }
+    
+    public class WrappedRowLock extends RowLock {
 
-      if (testStep == TestStep.PUT_STARTED) {
-        try {
-          testStep = TestStep.PUT_COMPLETED;
-          super.releaseRowLock(lockId);
-          // put has been written to the memstore and the row lock has been released, but the
-          // MVCC has not been advanced.  Prior to fixing HBASE-7051, the following order of
-          // operations would cause the non-atomicity to show up:
-          // 1) Put releases row lock (where we are now)
-          // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
-          //    because the MVCC has not advanced
-          // 3) Put advances MVCC
-          // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
-          // (see below), and then wait some more to give the checkAndPut time to read the old
-          // value.
-          latch.await();
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
-      }
-      else if (testStep == TestStep.CHECKANDPUT_STARTED) {
-        super.releaseRowLock(lockId);
+      private WrappedRowLock(RowLock rowLock) {
+        super(rowLock.context);
       }
-    }
 
-    @Override
-    public Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException {
-      if (testStep == TestStep.CHECKANDPUT_STARTED) {
-        latch.countDown();
+      @Override
+      public void release() {
+        if (testStep == TestStep.INIT) {
+          super.release();
+          return;
+        }
+
+        if (testStep == TestStep.PUT_STARTED) {
+          try {
+            testStep = TestStep.PUT_COMPLETED;
+            super.release();
+            // put has been written to the memstore and the row lock has been released, but the
+            // MVCC has not been advanced.  Prior to fixing HBASE-7051, the following order of
+            // operations would cause the non-atomicity to show up:
+            // 1) Put releases row lock (where we are now)
+            // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
+            //    because the MVCC has not advanced
+            // 3) Put advances MVCC
+            // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
+            // (see below), and then wait some more to give the checkAndPut time to read the old
+            // value.
+            latch.await();
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        }
+        else if (testStep == TestStep.CHECKANDPUT_STARTED) {
+          super.release();
+        }
       }
-      return super.getLock(lockid, row, waitForLock);
     }
   }
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1504002&r1=1504001&r2=1504002&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Wed Jul 17 05:34:00 2013
@@ -72,7 +72,6 @@ import org.apache.hadoop.hbase.client.Du
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -96,6 +95,7 @@ import org.apache.hadoop.hbase.monitorin
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
+import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@@ -107,7 +107,6 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Assert;
@@ -764,7 +763,6 @@ public class TestHRegion extends HBaseTe
     }
   }
 
-  @SuppressWarnings("unchecked")
   public void testBatchPut() throws Exception {
     byte[] b = Bytes.toBytes(getName());
     byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
@@ -783,7 +781,7 @@ public class TestHRegion extends HBaseTe
         puts[i].add(cf, qual, val);
       }
 
-      OperationStatus[] codes = this.region.put(puts);
+      OperationStatus[] codes = this.region.batchMutate(puts);
       assertEquals(10, codes.length);
       for (int i = 0; i < 10; i++) {
         assertEquals(OperationStatusCode.SUCCESS, codes[i]
@@ -794,7 +792,7 @@ public class TestHRegion extends HBaseTe
 
       LOG.info("Next a batch put with one invalid family");
       puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
-      codes = this.region.put(puts);
+      codes = this.region.batchMutate(puts);
       assertEquals(10, codes.length);
       for (int i = 0; i < 10; i++) {
         assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
@@ -804,7 +802,7 @@ public class TestHRegion extends HBaseTe
       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
 
       LOG.info("Next a batch put that has to break into two batches to avoid a lock");
-      Integer lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
+      RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2"));
 
       MultithreadedTestUtil.TestContext ctx =
         new MultithreadedTestUtil.TestContext(conf);
@@ -813,7 +811,7 @@ public class TestHRegion extends HBaseTe
       TestThread putter = new TestThread(ctx) {
         @Override
         public void doWork() throws IOException {
-          retFromThread.set(region.put(puts));
+          retFromThread.set(region.batchMutate(puts));
         }
       };
       LOG.info("...starting put thread while holding lock");
@@ -829,7 +827,7 @@ public class TestHRegion extends HBaseTe
         }
       }
       LOG.info("...releasing row lock, which should let put thread continue");
-      region.releaseRowLock(lockedRow);
+      rowLock.release();
       LOG.info("...joining on thread");
       ctx.stop();
       LOG.info("...checking that next batch was synced");
@@ -840,29 +838,6 @@ public class TestHRegion extends HBaseTe
           OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
       }
 
-      LOG.info("Nexta, a batch put which uses an already-held lock");
-      lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
-      LOG.info("...obtained row lock");
-      List<Pair<Mutation, Integer>> putsAndLocks = Lists.newArrayList();
-      for (int i = 0; i < 10; i++) {
-        Pair<Mutation, Integer> pair = new Pair<Mutation, Integer>(puts[i], null);
-        if (i == 2) pair.setSecond(lockedRow);
-        putsAndLocks.add(pair);
-      }
-
-      codes = region.batchMutate(putsAndLocks.toArray(new Pair[0]));
-      LOG.info("...performed put");
-      for (int i = 0; i < 10; i++) {
-        assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
-          OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
-      }
-      // Make sure we didn't do an extra batch
-      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 5, source);
-
-      // Make sure we still hold lock
-      assertTrue(region.isRowLocked(lockedRow));
-      LOG.info("...releasing lock");
-      region.releaseRowLock(lockedRow);
     } finally {
       HRegion.closeHRegion(this.region);
        this.region = null;
@@ -891,7 +866,7 @@ public class TestHRegion extends HBaseTe
         puts[i].add(cf, qual, val);
       }
 
-      OperationStatus[] codes = this.region.put(puts);
+      OperationStatus[] codes = this.region.batchMutate(puts);
       assertEquals(10, codes.length);
       for (int i = 0; i < 10; i++) {
         assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i]

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java?rev=1504002&r1=1504001&r2=1504002&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java Wed Jul 17 05:34:00 2013
@@ -233,7 +233,7 @@ public class TestParallelPut extends HBa
         put.add(fam1, qual1, value);
         in[0] = put;
         try {
-          OperationStatus[] ret = region.put(in);
+          OperationStatus[] ret = region.batchMutate(in);
           assertEquals(1, ret.length);
           assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode());
           assertGet(rowkey, fam1, qual1, value);