You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2016/11/19 07:03:11 UTC

hbase git commit: HBASE-16989 RowProcess#postBatchMutate doesn’t be executed before the mvcc transaction completion. (ChiaPing Tsai)

Repository: hbase
Updated Branches:
  refs/heads/master fa7ed58e1 -> a3a56b638


HBASE-16989 RowProcess#postBatchMutate doesn\u2019t be executed before the mvcc transaction completion. (ChiaPing Tsai)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a3a56b63
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a3a56b63
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a3a56b63

Branch: refs/heads/master
Commit: a3a56b6380eedaab60d1dfdb993070d045f87045
Parents: fa7ed58
Author: anoopsamjohn <an...@gmail.com>
Authored: Sat Nov 19 12:32:49 2016 +0530
Committer: anoopsamjohn <an...@gmail.com>
Committed: Sat Nov 19 12:32:49 2016 +0530

----------------------------------------------------------------------
 .../hbase/coprocessor/RegionObserver.java       |   4 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  29 ++---
 .../hbase/client/TestFromClientSide3.java       | 122 ++++++++++++++++++-
 3 files changed, 134 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a3a56b63/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index ccdce03..586545c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -682,7 +682,9 @@ public interface RegionObserver extends Coprocessor {
 
   /**
    * This will be called after applying a batch of Mutations on a region. The Mutations are added to
-   * memstore and WAL.
+   * memstore and WAL. The difference of this one with {@link #postPut(ObserverContext, Put, WALEdit, Durability) }
+   * and {@link #postDelete(ObserverContext, Delete, WALEdit, Durability) } is
+   * this hook will be executed before the mvcc transaction completion.
    * <p>
    * Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
    * If need a Cell reference for later use, copy the cell and use that.

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3a56b63/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 31b2a15..4d35b51 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3330,6 +3330,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         applyFamilyMapToMemstore(familyMaps[i], memstoreSize);
       }
 
+      // calling the post CP hook for batch mutation
+      if (!replay && coprocessorHost != null) {
+        MiniBatchOperationInProgress<Mutation> miniBatchOp =
+          new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
+          batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
+        coprocessorHost.postBatchMutate(miniBatchOp);
+      }
+
       // STEP 6. Complete mvcc.
       if (replay) {
         this.mvcc.advanceTo(batchOp.getReplaySequenceId());
@@ -3346,14 +3354,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
       releaseRowLocks(acquiredRowLocks);
 
-      // calling the post CP hook for batch mutation
-      if (!replay && coprocessorHost != null) {
-        MiniBatchOperationInProgress<Mutation> miniBatchOp =
-          new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
-          batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
-        coprocessorHost.postBatchMutate(miniBatchOp);
-      }
-
       for (int i = firstIndex; i < lastIndexExclusive; i ++) {
         if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
           batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
@@ -7098,21 +7098,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
               applyToMemstore(getHStore(cell), cell, memstoreSize);
             }
           }
-          // STEP 8. Complete mvcc.
+
+          // STEP 8. call postBatchMutate hook
+          processor.postBatchMutate(this);
+
+          // STEP 9. Complete mvcc.
           mvcc.completeAndWait(writeEntry);
           writeEntry = null;
 
-          // STEP 9. Release region lock
+          // STEP 10. Release region lock
           if (locked) {
             this.updatesLock.readLock().unlock();
             locked = false;
           }
 
-          // STEP 10. Release row lock(s)
+          // STEP 11. Release row lock(s)
           releaseRowLocks(acquiredRowLocks);
-
-          // STEP 11. call postBatchMutate hook
-          processor.postBatchMutate(this);
         }
         success = true;
       } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3a56b63/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index da033c6..82fbe77 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -19,19 +19,28 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
-
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -44,6 +53,10 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.junit.After;
 import org.junit.AfterClass;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -479,4 +492,101 @@ public class TestFromClientSide3 {
     ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection();
     assertTrue(con.hasCellBlockSupport());
   }
+
+  @Test(timeout = 60000)
+  public void testPutWithPreBatchMutate ()throws Exception {
+    TableName tableName = TableName.valueOf("testPutWithPreBatchMutate");
+    testPreBatchMutate(tableName, () -> {
+      try {
+        Table t = TEST_UTIL.getConnection().getTable(tableName);
+        Put put = new Put(ROW);
+        put.addColumn(FAMILY, QUALIFIER, VALUE);
+        t.put(put);
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    });
+  }
+
+  @Test(timeout = 60000)
+  public void testRowMutationsWithPreBatchMutate ()throws Exception {
+    TableName tableName = TableName.valueOf("testRowMutationsWithPreBatchMutate");
+    testPreBatchMutate(tableName, () -> {
+      try {
+        RowMutations rm = new RowMutations(ROW, 1);
+        Table t = TEST_UTIL.getConnection().getTable(tableName);
+        Put put = new Put(ROW);
+        put.addColumn(FAMILY, QUALIFIER, VALUE);
+        rm.add(put);
+        t.mutateRow(rm);
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    });
+  }
+
+  private void testPreBatchMutate (TableName tableName, Runnable rn)throws Exception {
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addCoprocessor(WatiingForScanObserver.class.getName());
+    desc.addFamily(new HColumnDescriptor(FAMILY));
+    TEST_UTIL.getAdmin().createTable(desc);
+    ExecutorService service = Executors.newFixedThreadPool(2);
+    service.execute(rn);
+    final List<Cell> cells = new ArrayList<>();
+    service.execute(() -> {
+      try {
+        // waiting for update.
+        TimeUnit.SECONDS.sleep(3);
+        Table t = TEST_UTIL.getConnection().getTable(tableName);
+        Scan scan = new Scan();
+        try (ResultScanner scanner = t.getScanner(scan)) {
+          for (Result r : scanner) {
+            cells.addAll(Arrays.asList(r.rawCells()));
+          }
+        }
+      } catch (IOException | InterruptedException ex) {
+        throw new RuntimeException(ex);
+      }
+    });
+    service.shutdown();
+    service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
+    assertEquals("The write is blocking by RegionObserver#postBatchMutate"
+      + ", so the data is invisible to reader", 0, cells.size());
+    TEST_UTIL.deleteTable(tableName);
+  }
+
+  private static <T extends RegionObserver> T find(final TableName tableName,
+          Class<T> clz) throws IOException, InterruptedException {
+    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
+    List<Region> regions = rs.getOnlineRegions(tableName);
+    assertEquals(1, regions.size());
+    Region region = regions.get(0);
+    Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
+    assertTrue("The cp instance should be " + clz.getName()
+            + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp));
+    return clz.cast(cp);
+  }
+
+  public static class WatiingForScanObserver extends BaseRegionObserver {
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    @Override
+    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
+            final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+      try {
+        // waiting for scanner
+        latch.await();
+      } catch (InterruptedException ex) {
+        throw new IOException(ex);
+      }
+    }
+
+    @Override
+    public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
+            final Scan scan, final RegionScanner s) throws IOException {
+      latch.countDown();
+      return s;
+    }
+  }
 }