You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/11/23 15:05:55 UTC

hbase git commit: HBASE-17158 Avoid deadlock caused by HRegion#doDelta (ChiaPing Tsai)

Repository: hbase
Updated Branches:
  refs/heads/master 511398f43 -> 9f5b8a83b


HBASE-17158 Avoid deadlock caused by HRegion#doDelta (ChiaPing Tsai)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9f5b8a83
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9f5b8a83
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9f5b8a83

Branch: refs/heads/master
Commit: 9f5b8a83b70cfb4aaf1e22be666a2516a0aa50ac
Parents: 511398f
Author: tedyu <yu...@gmail.com>
Authored: Wed Nov 23 07:05:46 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Wed Nov 23 07:05:46 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 42 +++++++++++++--
 .../hbase/client/TestFromClientSide3.java       | 57 ++++++++++++++++++--
 2 files changed, 91 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9f5b8a83/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 6b41bc4..f074b0e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -349,6 +349,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   private volatile Optional<ConfigurationManager> configurationManager;
 
+  // Used for testing.
+  private volatile Long timeoutForWriteLock = null;
+
   /**
    * @return The smallest mvcc readPoint across all the scanners in this
    * region. Writes older than this readPoint, are included in every
@@ -1446,6 +1449,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     this.closing.set(closing);
   }
 
+  /**
+   * The {@link HRegion#doClose} will block forever if someone tries proving the dead lock via the unit test.
+   * Instead of blocking, the {@link HRegion#doClose} will throw exception if you set the timeout.
+   * @param timeoutForWriteLock the second time to wait for the write lock in {@link HRegion#doClose}
+   */
+  @VisibleForTesting
+  public void setTimeoutForWriteLock(long timeoutForWriteLock) {
+    assert timeoutForWriteLock >= 0;
+    this.timeoutForWriteLock = timeoutForWriteLock;
+  }
+
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK_EXCEPTION_PATH",
       justification="I think FindBugs is confused")
   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
@@ -1484,8 +1498,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
     }
 
-    // block waiting for the lock for closing
-    lock.writeLock().lock(); // FindBugs: Complains UL_UNRELEASED_LOCK_EXCEPTION_PATH but seems fine
+    if (timeoutForWriteLock == null
+        || timeoutForWriteLock == Long.MAX_VALUE) {
+      // block waiting for the lock for closing
+      lock.writeLock().lock(); // FindBugs: Complains UL_UNRELEASED_LOCK_EXCEPTION_PATH but seems fine
+    } else {
+      try {
+        boolean succeed = lock.writeLock().tryLock(timeoutForWriteLock, TimeUnit.SECONDS);
+        if (!succeed) {
+          throw new IOException("Failed to get write lock when closing region");
+        }
+      } catch (InterruptedException e) {
+        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+      }
+    }
     this.closing.set(true);
     status.setStatus("Disabling writes for close");
     try {
@@ -5345,6 +5371,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  @VisibleForTesting
+  public int getReadLockCount() {
+    return lock.getReadLockCount();
+  }
+
   public ConcurrentHashMap<HashedBytes, RowLockContext> getLockedRows() {
     return lockedRows;
   }
@@ -7257,9 +7288,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     WriteEntry writeEntry = null;
     startRegionOperation(op);
     List<Cell> results = returnResults? new ArrayList<Cell>(mutation.size()): null;
-    RowLock rowLock = getRowLockInternal(mutation.getRow(), false);
+    RowLock rowLock = null;
     MemstoreSize memstoreSize = new MemstoreSize();
     try {
+      rowLock = getRowLockInternal(mutation.getRow(), false);
       lock(this.updatesLock.readLock());
       try {
         Result cpResult = doCoprocessorPreCall(op, mutation);
@@ -7307,7 +7339,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       // the client. Means only way to read-your-own-increment or append is to come in with an
       // a 0 increment.
       if (writeEntry != null) mvcc.complete(writeEntry);
-      rowLock.release();
+      if (rowLock != null) {
+        rowLock.release();
+      }
       // Request a cache flush if over the limit.  Do it outside update lock.
       if (isFlushSize(addAndGetMemstoreSize(memstoreSize))) {
         requestFlush();

http://git-wip-us.apache.org/repos/asf/hbase/blob/9f5b8a83/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index cbc97a2..9fc20ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -23,14 +23,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -57,7 +54,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -568,6 +564,59 @@ public class TestFromClientSide3 {
   }
 
   @Test(timeout = 30000)
+  public void testLockLeakWithDelta() throws Exception, Throwable {
+    TableName tableName = TableName.valueOf("testLockLeakWithDelta");
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addCoprocessor(WatiingForMultiMutationsObserver.class.getName());
+    desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
+    desc.addFamily(new HColumnDescriptor(FAMILY));
+    TEST_UTIL.getAdmin().createTable(desc);
+    // new a connection for lower retry number.
+    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
+    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    try (Connection con = ConnectionFactory.createConnection(copy)) {
+      HRegion region = (HRegion) find(tableName);
+      region.setTimeoutForWriteLock(10);
+      ExecutorService putService = Executors.newSingleThreadExecutor();
+      putService.execute(() -> {
+        try (Table table = con.getTable(tableName)) {
+          Put put = new Put(ROW);
+          put.addColumn(FAMILY, QUALIFIER, VALUE);
+          // the put will be blocked by WatiingForMultiMutationsObserver.
+          table.put(put);
+        } catch (IOException ex) {
+          throw new RuntimeException(ex);
+        }
+      });
+      ExecutorService appendService = Executors.newSingleThreadExecutor();
+      appendService.execute(() -> {
+        Append append = new Append(ROW);
+        append.add(FAMILY, QUALIFIER, VALUE);
+        try (Table table = con.getTable(tableName)) {
+          table.append(append);
+          fail("The APPEND should fail because the target lock is blocked by previous put");
+        } catch (Throwable ex) {
+        }
+      });
+      appendService.shutdown();
+      appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
+      WatiingForMultiMutationsObserver observer = find(tableName, WatiingForMultiMutationsObserver.class);
+      observer.latch.countDown();
+      putService.shutdown();
+      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
+      try (Table table = con.getTable(tableName)) {
+        Result r = table.get(new Get(ROW));
+        assertFalse(r.isEmpty());
+        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE));
+      }
+    }
+    HRegion region = (HRegion) find(tableName);
+    int readLockCount = region.getReadLockCount();
+    LOG.info("readLockCount:" + readLockCount);
+    assertEquals(0, readLockCount);
+  }
+
+  @Test(timeout = 30000)
   public void testMultiRowMutations() throws Exception, Throwable {
     TableName tableName = TableName.valueOf("testMultiRowMutations");
     HTableDescriptor desc = new HTableDescriptor(tableName);