You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/12/01 01:08:09 UTC

hbase git commit: HBASE-18233 We shouldn't wait for readlock in doMiniBatchMutation in case of deadlock (Allan Yang)

Repository: hbase
Updated Branches:
  refs/heads/branch-1.4 21eb8ba6d -> 6c490625a


HBASE-18233 We shouldn't wait for readlock in doMiniBatchMutation in case of deadlock (Allan Yang)

This patch plus a sorting of the batch (HBASE-17924) fixes a regression
in Increment/CheckAndPut-style operations.

Signed-off-by: Yu Li <ca...@gmail.com>
Signed-off-by: Allan Yang <al...@163.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6c490625
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6c490625
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6c490625

Branch: refs/heads/branch-1.4
Commit: 6c490625aa102ed33a99352b7b308e2a9c2f3c35
Parents: 21eb8ba
Author: Michael Stack <st...@apache.org>
Authored: Tue Nov 28 09:14:58 2017 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Thu Nov 30 17:08:06 2017 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 100 +++++++++----
 .../hadoop/hbase/client/TestMultiParallel.java  | 148 +++++++++++++++++++
 .../hbase/regionserver/TestAtomicOperation.java |   5 +-
 3 files changed, 225 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6c490625/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 1b178f6..06f2990 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2191,7 +2191,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * Should the store be flushed because it is old enough.
    * <p>
    * Every FlushPolicy should call this to determine whether a store is old enough to flush(except
-   * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always
+   * that you always flush all stores). Otherwise the shouldFlush method will always
    * returns true which will make a lot of flush requests.
    */
   boolean shouldFlushStore(Store store) {
@@ -3243,11 +3243,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           continue;
         }
 
+
+        //HBASE-18233
         // If we haven't got any rows in our batch, we should block to
-        // get the next one.
+        // get the next one's read lock. We need at least one row to mutate.
+        // If we have got rows, do not block when lock is not available,
+        // so that we can fail fast and go on with the rows with locks in
+        // the batch. By doing this, we can reduce contention and prevent
+        // possible deadlocks.
+        // The unfinished rows in the batch will be detected in batchMutate,
+        // and it wil try to finish them by calling doMiniBatchMutation again.
+        boolean shouldBlock = numReadyToWrite == 0;
         RowLock rowLock = null;
         try {
-          rowLock = getRowLockInternal(mutation.getRow(), true);
+          rowLock = getRowLockInternal(mutation.getRow(), true, shouldBlock);
         } catch (TimeoutIOException e) {
           // We will retry when other exceptions, but we should stop if we timeout .
           throw e;
@@ -3256,8 +3265,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             + Bytes.toStringBinary(mutation.getRow()), ioe);
         }
         if (rowLock == null) {
-          // We failed to grab another lock
-          break; // stop acquiring more rows for this batch
+          // We failed to grab another lock. Stop acquiring more rows for this
+          // batch and go on with the gotten ones
+          break;
+
         } else {
           acquiredRowLocks.add(rowLock);
         }
@@ -3356,7 +3367,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
               checkAndPrepareMutation(cpMutation, isInReplay, cpFamilyMap, now);
 
               // Acquire row locks. If not, the whole batch will fail.
-              acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true));
+              acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true, true));
 
               // Returned mutations from coprocessor correspond to the Mutation at index i. We can
               // directly add the cells from those mutations to the familyMaps of this mutation.
@@ -3676,7 +3687,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       get.addColumn(family, qualifier);
       checkRow(row, "checkAndMutate");
       // Lock row - note that doBatchMutate will relock this row if called
-      RowLock rowLock = getRowLockInternal(get.getRow(), false);
+      RowLock rowLock = getRowLockInternal(get.getRow());
       // wait for all previous transactions to complete (with lock held)
       mvcc.await();
       try {
@@ -3786,7 +3797,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       get.addColumn(family, qualifier);
       checkRow(row, "checkAndRowMutate");
       // Lock row - note that doBatchMutate will relock this row if called
-      RowLock rowLock = getRowLockInternal(get.getRow(), false);
+      RowLock rowLock = getRowLockInternal(get.getRow());
       // wait for all previous transactions to complete (with lock held)
       mvcc.await();
       try {
@@ -4025,10 +4036,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * <b>not</b> check the families for validity.
    *
    * @param familyMap Map of kvs per family
-   * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
-   *        If null, then this method internally creates a mvcc transaction.
-   * @param output newly added KVs into memstore
-   * @param isInReplay true when adding replayed KVs into memstore
    * @return the additional memory usage of the memstore caused by the
    * new entries.
    * @throws IOException
@@ -5429,7 +5436,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * Get an exclusive ( write lock ) lock on a given row.
    * @param row Which row to lock.
    * @return A locked RowLock. The lock is exclusive and already aqquired.
-   * @throws IOException
+   * @throws IOException if any error occurred
    */
   public RowLock getRowLock(byte[] row) throws IOException {
     return getRowLock(row, false);
@@ -5437,12 +5444,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
+    return getRowLock(row, readLock, true);
+  }
+
+  /**
+   *
+   * Get a row lock for the specified row. All locks are reentrant.
+   *
+   * Before calling this function make sure that a region operation has already been
+   * started (the calling thread has already acquired the region-close-guard lock).
+   * @param row The row actions will be performed against
+   * @param readLock is the lock reader or writer. True indicates that a non-exlcusive
+   *          lock is requested
+   * @param waitForLock whether should wait for this lock
+   * @return A locked RowLock, or null if {@code waitForLock} set to false and tryLock failed
+   * @throws IOException if any error occurred
+   */
+  public RowLock getRowLock(byte[] row, boolean readLock, boolean waitForLock) throws IOException {
     // Make sure the row is inside of this region before getting the lock for it.
     checkRow(row, "row lock");
-    return getRowLockInternal(row, readLock);
+    return getRowLockInternal(row, readLock, waitForLock);
   }
 
-  protected RowLock getRowLockInternal(byte[] row, boolean readLock) throws IOException {
+  // getRowLock calls checkRow. Call this to skip checkRow.
+  protected RowLock getRowLockInternal(byte[] row)
+  throws IOException {
+    return getRowLockInternal(row, false, true);
+  }
+
+  protected RowLock getRowLockInternal(byte[] row, boolean readLock, boolean waitForLock)
+  throws IOException {
     // create an object to use a a key in the row lock map
     HashedBytes rowKey = new HashedBytes(row);
 
@@ -5493,18 +5524,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         }
       }
 
-      if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) {
+      boolean lockAvailable = false;
+      if (timeout > 0) {
+        if (waitForLock) {
+          // if waiting for lock, wait for timeout milliseconds
+          lockAvailable = result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS);
+        } else {
+          // If we are not waiting, tryLock() returns immediately whether we have the lock or not.
+          lockAvailable = result.getLock().tryLock();
+        }
+      }
+      if (!lockAvailable) {
         if (traceScope != null) {
           traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
         }
         result = null;
-        String message = "Timed out waiting for lock for row: " + rowKey + " in region "
-            + getRegionInfo().getEncodedName();
-        if (reachDeadlineFirst) {
-          throw new TimeoutIOException(message);
+        String message = "Timed out waiting for lock for row: " + rowKey + " in region " +
+            getRegionInfo().getEncodedName() + ", timeout=" + timeout + ", deadlined=" +
+            reachDeadlineFirst + ", waitForLock=" + waitForLock;
+        if (waitForLock) {
+          if (reachDeadlineFirst) {
+            LOG.info("TIMEOUT: " + message);
+            throw new TimeoutIOException(message);
+          } else {
+            // If timeToDeadline is larger than rowLockWaitDuration, we can not drop the request.
+            LOG.info("IOE " + message);
+            throw new IOException(message);
+          }
         } else {
-          // If timeToDeadline is larger than rowLockWaitDuration, we can not drop the request.
-          throw new IOException(message);
+          // We are here if we did a tryLock w/o waiting on it.
+          return null;
         }
       }
       rowLockContext.setThreadName(Thread.currentThread().getName());
@@ -7393,7 +7442,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         for (byte[] row : rowsToLock) {
           // Attempt to lock all involved rows, throw if any lock times out
           // use a writer lock for mixed reads and writes
-          acquiredRowLocks.add(getRowLockInternal(row, false));
+          acquiredRowLocks.add(getRowLockInternal(row));
         }
         // 3. Region lock
         lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
@@ -7625,7 +7674,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     WALKey walKey = null;
     boolean doRollBackMemstore = false;
     try {
-      rowLock = getRowLockInternal(row, false);
+      rowLock = getRowLockInternal(row);
       assert rowLock != null;
       try {
         lock(this.updatesLock.readLock());
@@ -7907,7 +7956,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     Map<Store, List<Cell>> forMemStore = new HashMap<>();
     Durability effectiveDurability = getEffectiveDurability(increment.getDurability());
     try {
-      rowLock = getRowLockInternal(increment.getRow(), false);
+      rowLock = getRowLockInternal(increment.getRow());
       long txid = 0;
       try {
         lock(this.updatesLock.readLock());
@@ -8126,7 +8175,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * from <code>incrementCoordinates</code> only.
    * @param increment
    * @param columnFamily
-   * @param incrementCoordinates
    * @return Return the Cells to Increment
    * @throws IOException
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/6c490625/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
index 4a80a26..2e5cc54 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -63,6 +64,7 @@ public class TestMultiParallel {
   private static final byte[] QUALIFIER = Bytes.toBytes("qual");
   private static final String FAMILY = "family";
   private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table");
+  private static final TableName TEST_TABLE2 = TableName.valueOf("multi_test_table2");
   private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
   private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
   private static final byte [][] KEYS = makeKeys();
@@ -761,4 +763,150 @@ public class TestMultiParallel {
       validateEmpty(result);
     }
   }
+
+  private static class MultiThread extends Thread {
+    public Throwable throwable = null;
+    private CountDownLatch endLatch;
+    private CountDownLatch beginLatch;
+    List<Put> puts;
+    public MultiThread(List<Put> puts, CountDownLatch beginLatch, CountDownLatch endLatch) {
+      this.puts = puts;
+      this.beginLatch = beginLatch;
+      this.endLatch = endLatch;
+    }
+    @Override
+    public void run() {
+      LOG.info("Start multi");
+      HTable table = null;
+      try {
+        table = new HTable(UTIL.getConfiguration(), TEST_TABLE2);
+        table.setAutoFlush(false);
+        beginLatch.await();
+        for (int i = 0; i < 100; i++) {
+          for(Put put : puts) {
+            table.put(put);
+          }
+          table.flushCommits();
+        }
+      } catch (Throwable t) {
+        throwable = t;
+        LOG.warn("Error when put:", t);
+      } finally {
+        endLatch.countDown();
+        if(table != null) {
+          try {
+            table.close();
+          } catch (IOException ioe) {
+            LOG.error("Error when close table", ioe);
+          }
+        }
+      }
+      LOG.info("End multi");
+    }
+  }
+
+
+  private static class IncrementThread extends Thread {
+    public Throwable throwable = null;
+    private CountDownLatch endLatch;
+    private CountDownLatch beginLatch;
+    List<Put> puts;
+    public IncrementThread(List<Put> puts, CountDownLatch beginLatch, CountDownLatch endLatch) {
+      this.puts = puts;
+      this.beginLatch = beginLatch;
+      this.endLatch = endLatch;
+    }
+    @Override
+    public void run() {
+      LOG.info("Start inc");
+      HTable table = null;
+      try {
+        table = new HTable(UTIL.getConfiguration(), TEST_TABLE2);
+        beginLatch.await();
+        for (int i = 0; i < 100; i++) {
+          for(Put put : puts) {
+            Increment inc = new Increment(put.getRow());
+            inc.addColumn(BYTES_FAMILY, BYTES_FAMILY, 1);
+            table.increment(inc);
+          }
+        }
+      } catch (Throwable t) {
+        throwable = t;
+        LOG.warn("Error when incr:", t);
+      } finally {
+        endLatch.countDown();
+        if(table != null) {
+          try {
+            table.close();
+          } catch (IOException ioe) {
+            LOG.error("Error when close table", ioe);
+          }
+        }
+      }
+      LOG.info("End inc");
+    }
+  }
+
+  /**
+   * UT for HBASE-18233, test for disordered batch mutation thread and
+   * increment won't lock each other
+   * @throws Exception if any error occurred
+   */
+  @Test(timeout=300000)
+  public void testMultiThreadWithRowLocks() throws Exception {
+    //set a short timeout to get timeout exception when getting row lock fail
+    UTIL.getConfiguration().setInt("hbase.rpc.timeout", 2000);
+    UTIL.getConfiguration().setInt("hbase.client.operation.timeout", 4000);
+    UTIL.getConfiguration().setInt("hbase.client.retries.number", 10);
+
+    UTIL.createTable(TEST_TABLE2, BYTES_FAMILY);
+    List<Put> puts = new ArrayList<>();
+    for(int i = 0; i < 10; i++) {
+      Put put = new Put(Bytes.toBytes(i));
+      put.add(BYTES_FAMILY, BYTES_FAMILY, Bytes.toBytes((long)0));
+      puts.add(put);
+    }
+    List<Put> reversePuts = new ArrayList<>(puts);
+    Collections.reverse(reversePuts);
+    int NUM_OF_THREAD = 12;
+    CountDownLatch latch = new CountDownLatch(NUM_OF_THREAD);
+    CountDownLatch beginLatch = new CountDownLatch(1);
+    int threadNum = NUM_OF_THREAD / 4;
+    List<MultiThread> multiThreads = new ArrayList<>();
+    List<IncrementThread> incThreads = new ArrayList<>();
+    for(int i = 0; i < threadNum; i ++) {
+      MultiThread thread = new MultiThread(reversePuts, beginLatch, latch);
+      thread.start();
+      multiThreads.add(thread);
+    }
+    for(int i = 0; i < threadNum; i++) {
+      MultiThread thread = new MultiThread(puts, beginLatch, latch);
+      thread.start();
+      multiThreads.add(thread);
+    }
+    for(int i = 0; i < threadNum; i ++) {
+      IncrementThread thread = new IncrementThread(reversePuts, beginLatch, latch);
+      thread.start();
+      incThreads.add(thread);
+    }
+    for(int i = 0; i < threadNum; i++) {
+      IncrementThread thread = new IncrementThread(puts, beginLatch, latch);
+      thread.start();
+      incThreads.add(thread);
+    }
+    long timeBegin = System.currentTimeMillis();
+    beginLatch.countDown();
+    latch.await();
+    LOG.error("Time took:" + (System.currentTimeMillis() - timeBegin));
+    for(MultiThread thread : multiThreads) {
+      if (thread != null && thread.throwable != null) {
+        LOG.error(thread.throwable);
+      }
+      Assert.assertTrue(thread.throwable == null);
+    }
+    for(IncrementThread thread : incThreads) {
+      Assert.assertTrue(thread.throwable == null);
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6c490625/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
index edef899..d9415df 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
@@ -663,11 +663,12 @@ public class TestAtomicOperation {
     }
 
     @Override
-    public RowLock getRowLockInternal(final byte[] row, boolean readLock) throws IOException {
+    public RowLock getRowLockInternal(final byte[] row, boolean readLock, boolean waitForLock)
+      throws IOException {
       if (testStep == TestStep.CHECKANDPUT_STARTED) {
         latch.countDown();
       }
-      return new WrappedRowLock(super.getRowLockInternal(row, readLock));
+      return new WrappedRowLock(super.getRowLockInternal(row, readLock, waitForLock));
     }
 
     public class WrappedRowLock implements RowLock {