You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/10/03 14:51:23 UTC

svn commit: r1393468 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase: client/HConnectionManager.java regionserver/HRegion.java

Author: mbautin
Date: Wed Oct  3 12:51:23 2012
New Revision: 1393468

URL: http://svn.apache.org/viewvc?rev=1393468&view=rev
Log:
[HBASE-6930] Avoid acquiring the same row lock repeatedly

Author: liyintang

Summary:
When processing the multiPut, multiMutations or multiDelete operations, each IPC handler thread tries to acquire a lock for each row key in these batches. If there are duplicated row keys in these batches, previously the IPC handler thread will repeatedly acquire the same row key again and again.

So the optimization is to sort each batch operation based on the row key in the client side,  and skip acquiring the same row lock repeatedly in the server side.

Test Plan: Will test it on the fbtrace cluster.

Reviewers: kannan

Reviewed By: kannan

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D590376

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1393468&r1=1393467&r2=1393468&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Oct  3 12:51:23 2012
@@ -2083,7 +2083,10 @@ public class HConnectionManager {
       // exception reporting.  We keep HRegionLocation to save on parsing.
       // Later below when we use lastServers, we'll pull what we need from
       // lastServers.
+      // Sort the puts based on the row key in order to optimize the row lock acquiring
+      // in the server side.
       List<Mutation> workingList = orig_list;
+      Collections.sort(workingList);
 
       for (int tries = 0;
            workingList != null && !workingList.isEmpty() && tries < numRetries;
@@ -2555,7 +2558,11 @@ public class HConnectionManager {
       int tries;
       long serverRequestedWaitTime = 0;
       int serverRequestedRetries = 0;
-
+      
+      // Sort the puts based on the row key in order to optimize the row lock acquiring
+      // in the server side.
+      Collections.sort(list);
+      
       for ( tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
         // If server requested wait. We will wait for that time, and start
         // again. Do not count this time/tries against the client retries.

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1393468&r1=1393467&r2=1393468&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Oct  3 12:51:23 2012
@@ -2014,6 +2014,10 @@ public class HRegion implements HeapSize
       // we acquire at least one.
       // ----------------------------------
       int numReadyToWrite = 0;
+      byte[] previousRow = null;
+      Integer previousLockID = null;
+      Integer currentLockID = null;
+      
       while (lastIndexExclusive < batchOp.operations.length) {
         Pair<Mutation, Integer> nextPair = batchOp.operations[lastIndexExclusive];
         Mutation op = nextPair.getFirst();
@@ -2024,22 +2028,30 @@ public class HRegion implements HeapSize
           checkFamilies(op.getFamilyMap().keySet());
           checkTimestamps(op, now);
         }
-
-        // If we haven't got any rows in our batch, we should block to
-        // get the next one.
-        boolean shouldBlock = numReadyToWrite == 0;
-        Integer acquiredLockId = getLock(providedLockId, op.getRow(), shouldBlock);
-        if (acquiredLockId == null) {
-          // We failed to grab another lock
-          assert !shouldBlock : "Should never fail to get lock when blocking";
-          break; // stop acquiring more rows for this batch
-        }
-        if (providedLockId == null) {
-          acquiredLocks.add(acquiredLockId);
+        
+        if (previousRow == null || !Bytes.equals(previousRow, op.getRow()) ||
+            (providedLockId != null && !previousLockID.equals(providedLockId))) {
+          // If we haven't got any rows in our batch, we should block to
+          // get the next one.
+          boolean shouldBlock = numReadyToWrite == 0;
+          currentLockID = getLock(providedLockId, op.getRow(), shouldBlock);
+          if (currentLockID == null) {
+            // We failed to grab another lock
+            assert !shouldBlock : "Should never fail to get lock when blocking";
+            break; // stop acquiring more rows for this batch
+          }
+          
+          if (providedLockId == null) {
+            acquiredLocks.add(currentLockID);
+          }
+          
+          // reset the previous row and lockID with the current one
+          previousRow = op.getRow();
+          previousLockID = currentLockID;
         }
         lastIndexExclusive++;
         numReadyToWrite++;
-
+        
         // if first time around, designate expected signature for metric
         // else, if all have been consistent so far, check if it still holds
         // all else, designate failure signature and mark as unclear
@@ -2115,12 +2127,11 @@ public class HRegion implements HeapSize
       success = true;
       return addedSize;
     } finally {
-      if (locked)
+      if (locked) {
         this.updatesLock.readLock().unlock();
-
-      for (Integer toRelease : acquiredLocks) {
-        releaseRowLock(toRelease);
       }
+      
+      releaseRowLocks(acquiredLocks);
 
       // do after lock
       long after = EnvironmentEdgeManager.currentTimeMillis();
@@ -2879,6 +2890,20 @@ public class HRegion implements HeapSize
       lockedRows.notifyAll();
     }
   }
+  
+  /**
+   * Release the row locks!
+   * @param lockidList The list of the lock ID to release.
+   */
+  void releaseRowLocks(final List<Integer> lockidList) {
+    synchronized (lockedRows) {
+      for (Integer lockid : lockidList) {
+        byte[] row = lockIds.remove(lockid);
+        lockedRows.remove(row);
+      }
+    }
+    lockedRows.notifyAll();
+  }
 
   /**
    * See if row is currently locked.