You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/11/20 15:12:26 UTC

hbase git commit: HBASE-17131 Avoid livelock caused by HRegion#processRowsWithLocks (ChiaPing Tsai)

Repository: hbase
Updated Branches:
  refs/heads/master ec9c9e201 -> bb645bcfd


HBASE-17131 Avoid livelock caused by HRegion#processRowsWithLocks (ChiaPing Tsai)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bb645bcf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bb645bcf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bb645bcf

Branch: refs/heads/master
Commit: bb645bcfda74ad1f96b16e6e47543d44fbca5a98
Parents: ec9c9e2
Author: tedyu <yu...@gmail.com>
Authored: Sun Nov 20 07:09:02 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Sun Nov 20 07:09:02 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  26 ++--
 .../hbase/client/TestFromClientSide3.java       | 124 +++++++++++++++++--
 2 files changed, 129 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/bb645bcf/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 4d35b51..c372faa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7044,8 +7044,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return;
     }
 
-    boolean locked;
-    List<RowLock> acquiredRowLocks;
+    boolean locked = false;
+    List<RowLock> acquiredRowLocks = null;
     List<Mutation> mutations = new ArrayList<Mutation>();
     Collection<byte[]> rowsToLock = processor.getRowsToLock();
     // This is assigned by mvcc either explicity in the below or in the guts of the WAL append
@@ -7053,19 +7053,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     WriteEntry writeEntry = null;
     MemstoreSize memstoreSize = new MemstoreSize();
     try {
-      // STEP 2. Acquire the row lock(s)
-      acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
-      for (byte[] row : rowsToLock) {
-        // Attempt to lock all involved rows, throw if any lock times out
-        // use a writer lock for mixed reads and writes
-        acquiredRowLocks.add(getRowLockInternal(row, false));
-      }
-      // STEP 3. Region lock
-      lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
-      locked = true;
       boolean success = false;
-      long now = EnvironmentEdgeManager.currentTime();
       try {
+        // STEP 2. Acquire the row lock(s)
+        acquiredRowLocks = new ArrayList<>(rowsToLock.size());
+        for (byte[] row : rowsToLock) {
+          // Attempt to lock all involved rows, throw if any lock times out
+          // use a writer lock for mixed reads and writes
+          acquiredRowLocks.add(getRowLockInternal(row, false));
+        }
+        // STEP 3. Region lock
+        lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
+        locked = true;
+        long now = EnvironmentEdgeManager.currentTime();
         // STEP 4. Let the processor scan the rows, generate mutations and add waledits
         doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);
         if (!mutations.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb645bcf/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index 82fbe77..cbc97a2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -23,13 +23,17 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -43,10 +47,17 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -54,6 +65,7 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.junit.After;
 import org.junit.AfterClass;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -494,7 +506,7 @@ public class TestFromClientSide3 {
   }
 
   @Test(timeout = 60000)
-  public void testPutWithPreBatchMutate ()throws Exception {
+  public void testPutWithPreBatchMutate() throws Exception {
     TableName tableName = TableName.valueOf("testPutWithPreBatchMutate");
     testPreBatchMutate(tableName, () -> {
       try {
@@ -509,7 +521,7 @@ public class TestFromClientSide3 {
   }
 
   @Test(timeout = 60000)
-  public void testRowMutationsWithPreBatchMutate ()throws Exception {
+  public void testRowMutationsWithPreBatchMutate() throws Exception {
     TableName tableName = TableName.valueOf("testRowMutationsWithPreBatchMutate");
     testPreBatchMutate(tableName, () -> {
       try {
@@ -525,7 +537,7 @@ public class TestFromClientSide3 {
     });
   }
 
-  private void testPreBatchMutate (TableName tableName, Runnable rn)throws Exception {
+  private void testPreBatchMutate(TableName tableName, Runnable rn)throws Exception {
     HTableDescriptor desc = new HTableDescriptor(tableName);
     desc.addCoprocessor(WatiingForScanObserver.class.getName());
     desc.addFamily(new HColumnDescriptor(FAMILY));
@@ -555,22 +567,118 @@ public class TestFromClientSide3 {
     TEST_UTIL.deleteTable(tableName);
   }
 
-  private static <T extends RegionObserver> T find(final TableName tableName,
-          Class<T> clz) throws IOException, InterruptedException {
+  @Test(timeout = 30000)
+  public void testMultiRowMutations() throws Exception, Throwable {
+    TableName tableName = TableName.valueOf("testMultiRowMutations");
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addCoprocessor(MultiRowMutationEndpoint.class.getName());
+    desc.addCoprocessor(WatiingForMultiMutationsObserver.class.getName());
+    desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
+    desc.addFamily(new HColumnDescriptor(FAMILY));
+    TEST_UTIL.getAdmin().createTable(desc);
+    // new a connection for lower retry number.
+    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
+    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    try (Connection con = ConnectionFactory.createConnection(copy)) {
+      byte[] row = Bytes.toBytes("ROW-0");
+      byte[] rowLocked= Bytes.toBytes("ROW-1");
+      byte[] value0 = Bytes.toBytes("VALUE-0");
+      byte[] value1 = Bytes.toBytes("VALUE-1");
+      byte[] value2 = Bytes.toBytes("VALUE-2");
+      assertNoLocks(tableName);
+      ExecutorService putService = Executors.newSingleThreadExecutor();
+      putService.execute(() -> {
+        try (Table table = con.getTable(tableName)) {
+          Put put0 = new Put(rowLocked);
+          put0.addColumn(FAMILY, QUALIFIER, value0);
+          // the put will be blocked by WatiingForMultiMutationsObserver.
+          table.put(put0);
+        } catch (IOException ex) {
+          throw new RuntimeException(ex);
+        }
+      });
+      ExecutorService cpService = Executors.newSingleThreadExecutor();
+      cpService.execute(() -> {
+        Put put1 = new Put(row);
+        Put put2 = new Put(rowLocked);
+        put1.addColumn(FAMILY, QUALIFIER, value1);
+        put2.addColumn(FAMILY, QUALIFIER, value2);
+        try (Table table = con.getTable(tableName)) {
+          MultiRowMutationProtos.MutateRowsRequest request
+            = MultiRowMutationProtos.MutateRowsRequest.newBuilder()
+              .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
+                      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put1))
+              .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
+                      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put2))
+              .build();
+          table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class,
+            ROW, ROW,
+            (MultiRowMutationProtos.MultiRowMutationService exe) -> {
+              ServerRpcController controller = new ServerRpcController();
+              CoprocessorRpcUtils.BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse>
+                rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
+              exe.mutateRows(controller, request, rpcCallback);
+              return rpcCallback.get();
+            });
+          fail("This cp should fail because the target lock is blocked by previous put");
+        } catch (Throwable ex) {
+        }
+      });
+      cpService.shutdown();
+      cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
+      WatiingForMultiMutationsObserver observer = find(tableName, WatiingForMultiMutationsObserver.class);
+      observer.latch.countDown();
+      putService.shutdown();
+      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
+      try (Table table = con.getTable(tableName)) {
+        Get g0 = new Get(row);
+        Get g1 = new Get(rowLocked);
+        Result r0 = table.get(g0);
+        Result r1 = table.get(g1);
+        assertTrue(r0.isEmpty());
+        assertFalse(r1.isEmpty());
+        assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0));
+      }
+      assertNoLocks(tableName);
+    }
+  }
+
+  private static void assertNoLocks(final TableName tableName) throws IOException, InterruptedException {
+    HRegion region = (HRegion) find(tableName);
+    assertEquals(0, region.getLockedRows().size());
+  }
+  private static Region find(final TableName tableName)
+      throws IOException, InterruptedException {
     HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
     List<Region> regions = rs.getOnlineRegions(tableName);
     assertEquals(1, regions.size());
-    Region region = regions.get(0);
+    return regions.get(0);
+  }
+
+  private static <T extends RegionObserver> T find(final TableName tableName,
+          Class<T> clz) throws IOException, InterruptedException {
+    Region region = find(tableName);
     Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
     assertTrue("The cp instance should be " + clz.getName()
             + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp));
     return clz.cast(cp);
   }
 
-  public static class WatiingForScanObserver extends BaseRegionObserver {
+  public static class WatiingForMultiMutationsObserver extends BaseRegionObserver {
+    final CountDownLatch latch = new CountDownLatch(1);
+    @Override
+    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
+            final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+      try {
+        latch.await();
+      } catch (InterruptedException ex) {
+        throw new IOException(ex);
+      }
+    }
+  }
 
+  public static class WatiingForScanObserver extends BaseRegionObserver {
     private final CountDownLatch latch = new CountDownLatch(1);
-
     @Override
     public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
             final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {