You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/12/05 19:28:06 UTC

hbase git commit: HBASE-12639 Backport HBASE-12565 Race condition in HRegion.batchMutate() causes partial data to be written when region closes

Repository: hbase
Updated Branches:
  refs/heads/0.98 5668b6c8e -> 45c8be3fc


HBASE-12639 Backport HBASE-12565 Race condition in HRegion.batchMutate() causes partial data to be written when region closes


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/45c8be3f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/45c8be3f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/45c8be3f

Branch: refs/heads/0.98
Commit: 45c8be3fc76c2813032c576f81439954a498d6d7
Parents: 5668b6c
Author: stack <st...@apache.org>
Authored: Fri Dec 5 10:27:57 2014 -0800
Committer: stack <st...@apache.org>
Committed: Fri Dec 5 10:27:57 2014 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 107 ++++++++++---------
 .../hbase/regionserver/TestAtomicOperation.java |   4 +-
 .../hadoop/hbase/regionserver/TestHRegion.java  | 107 +++++++++++++++----
 3 files changed, 144 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/45c8be3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index cebef32..1d554fc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2245,25 +2245,22 @@ public class HRegion implements HeapSize { // , Writable{
   /**
    * Perform a batch of mutations.
    * It supports only Put and Delete mutations and will ignore other types passed.
-   * @param mutations the list of mutations
+   * @param batchOp contains the list of mutations
    * @return an array of OperationStatus which internally contains the
    *         OperationStatusCode and the exceptionMessage if any.
    * @throws IOException
    */
   OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
     boolean initialized = false;
-    while (!batchOp.isDone()) {
-      if (!batchOp.isInReplay()) {
-        checkReadOnly();
-      }
-      checkResources();
-
-      long newSize;
-      Operation op = Operation.BATCH_MUTATE;
-      if (batchOp.isInReplay()) op = Operation.REPLAY_BATCH_MUTATE;
-      startRegionOperation(op);
+    Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
+    startRegionOperation(op);
+    try {
+      while (!batchOp.isDone()) {
+        if (!batchOp.isInReplay()) {
+          checkReadOnly();
+        }
+        checkResources();
 
-      try {
         if (!initialized) {
           this.writeRequestsCount.add(batchOp.operations.length);
           if (!batchOp.isInReplay()) {
@@ -2272,13 +2269,13 @@ public class HRegion implements HeapSize { // , Writable{
           initialized = true;
         }
         long addedSize = doMiniBatchMutation(batchOp);
-        newSize = this.addAndGetGlobalMemstoreSize(addedSize);
-      } finally {
-        closeRegionOperation(op);
-      }
-      if (isFlushSize(newSize)) {
-        requestFlush();
+        long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
+        if (isFlushSize(newSize)) {
+          requestFlush();
+        }
       }
+    } finally {
+      closeRegionOperation(op);
     }
     return batchOp.retCodeDetails;
   }
@@ -2404,7 +2401,7 @@ public class HRegion implements HeapSize { // , Writable{
         boolean shouldBlock = numReadyToWrite == 0;
         RowLock rowLock = null;
         try {
-          rowLock = getRowLock(mutation.getRow(), shouldBlock);
+          rowLock = getRowLockInternal(mutation.getRow(), shouldBlock);
         } catch (IOException ioe) {
           LOG.warn("Failed getting lock in batch put, row="
             + Bytes.toStringBinary(mutation.getRow()), ioe);
@@ -3575,45 +3572,53 @@ public class HRegion implements HeapSize { // , Writable{
    * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
    */
   public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
-    checkRow(row, "row lock");
     startRegionOperation();
     try {
-      HashedBytes rowKey = new HashedBytes(row);
-      RowLockContext rowLockContext = new RowLockContext(rowKey);
+      return getRowLockInternal(row, waitForLock);
+    } finally {
+      closeRegionOperation();
+    }
+  }
 
-      // loop until we acquire the row lock (unless !waitForLock)
-      while (true) {
-        RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
-        if (existingContext == null) {
-          // Row is not already locked by any thread, use newly created context.
-          break;
-        } else if (existingContext.ownedByCurrentThread()) {
-          // Row is already locked by current thread, reuse existing context instead.
-          rowLockContext = existingContext;
-          break;
-        } else {
-          // Row is already locked by some other thread, give up or wait for it
-          if (!waitForLock) {
-            return null;
-          }
-          try {
-            if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
-              throw new IOException("Timed out waiting for lock for row: " + rowKey);
-            }
-          } catch (InterruptedException ie) {
-            LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
-            InterruptedIOException iie = new InterruptedIOException();
-            iie.initCause(ie);
-            throw iie;
+  /**
+   * A version of getRowLock(byte[], boolean) to use when a region operation has already been
+   * started (the calling thread has already acquired the region-close-lock).
+   */
+  protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
+    checkRow(row, "row lock");
+    HashedBytes rowKey = new HashedBytes(row);
+    RowLockContext rowLockContext = new RowLockContext(rowKey);
+
+    // loop until we acquire the row lock (unless !waitForLock)
+    while (true) {
+      RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
+      if (existingContext == null) {
+        // Row is not already locked by any thread, use newly created context.
+        break;
+      } else if (existingContext.ownedByCurrentThread()) {
+        // Row is already locked by current thread, reuse existing context instead.
+        rowLockContext = existingContext;
+        break;
+      } else {
+        // Row is already locked by some other thread, give up or wait for it
+        if (!waitForLock) {
+          return null;
+        }
+        try {
+          if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
+            throw new IOException("Timed out waiting for lock for row: " + rowKey);
           }
+        } catch (InterruptedException ie) {
+          LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
+          InterruptedIOException iie = new InterruptedIOException();
+          iie.initCause(ie);
+          throw iie;
         }
       }
-
-      // allocate new lock for this thread
-      return rowLockContext.newLock();
-    } finally {
-      closeRegionOperation();
     }
+
+    // allocate new lock for this thread
+    return rowLockContext.newLock();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/45c8be3f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
index 257579a..96da717 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
@@ -612,11 +612,11 @@ public class TestAtomicOperation {
     }
 
     @Override
-    public RowLock getRowLock(final byte[] row, boolean waitForLock) throws IOException {
+    public RowLock getRowLockInternal(final byte[] row, boolean waitForLock) throws IOException {
       if (testStep == TestStep.CHECKANDPUT_STARTED) {
         latch.countDown();
       }
-      return new WrappedRowLock(super.getRowLock(row, waitForLock));
+      return new WrappedRowLock(super.getRowLockInternal(row, waitForLock));
     }
     
     public class WrappedRowLock extends RowLock {

http://git-wip-us.apache.org/repos/asf/hbase/blob/45c8be3f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index a26aeb5..777b3a5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -1005,12 +1005,11 @@ public class TestHRegion {
   }
 
   @Test
-  public void testBatchPut() throws Exception {
-    byte[] b = Bytes.toBytes(getName());
+  public void testBatchPut_whileNoRowLocksHeld() throws IOException {
     byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
     byte[] qual = Bytes.toBytes("qual");
     byte[] val = Bytes.toBytes("val");
-    this.region = initHRegion(b, getName(), CONF, cf);
+    this.region = initHRegion(Bytes.toBytes(getName()), getName(), CONF, cf);
     MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
     try {
       long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
@@ -1040,9 +1039,34 @@ public class TestHRegion {
       }
 
       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
+    } finally {
+      HRegion.closeHRegion(this.region);
+      this.region = null;
+    }
+  }
+
+  @Test
+  public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
+    byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
+    byte[] qual = Bytes.toBytes("qual");
+    byte[] val = Bytes.toBytes("val");
+    this.region = initHRegion(Bytes.toBytes(getName()), getName(), CONF, cf);
+    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
+    try {
+      long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
+      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
+
+      final Put[] puts = new Put[10];
+      for (int i = 0; i < 10; i++) {
+        puts[i] = new Put(Bytes.toBytes("row_" + i));
+        puts[i].add(cf, qual, val);
+      }
+      puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
 
-      LOG.info("Next a batch put that has to break into two batches to avoid a lock");
-      RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2"));
+      LOG.info("batchPut will have to break into four batches to avoid row locks");
+      RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
+      RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_4"));
+      RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_6"));
 
       MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
       final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<OperationStatus[]>();
@@ -1052,36 +1076,77 @@ public class TestHRegion {
           retFromThread.set(region.batchMutate(puts));
         }
       };
-      LOG.info("...starting put thread while holding lock");
+      LOG.info("...starting put thread while holding locks");
       ctx.addThread(putter);
       ctx.startThreads();
 
-      LOG.info("...waiting for put thread to sync first time");
-      long startWait = System.currentTimeMillis();
-      while (metricsAssertHelper.getCounter("syncTimeNumOps", source) == syncs + 2) {
-        Thread.sleep(100);
-        if (System.currentTimeMillis() - startWait > 10000) {
-          fail("Timed out waiting for thread to sync first minibatch");
+      LOG.info("...waiting for put thread to sync 1st time");
+      waitForCounter(source, "syncTimeNumOps", syncs + 1);
+
+      // Now attempt to close the region from another thread.  Prior to HBASE-12565
+      // this would cause the in-progress batchMutate operation to to fail with
+      // exception because it use to release and re-acquire the close-guard lock
+      // between batches.  Caller then didn't get status indicating which writes succeeded.
+      // We now expect this thread to block until the batchMutate call finishes.
+      Thread regionCloseThread = new Thread() {
+        @Override
+        public void run() {
+          try {
+            HRegion.closeHRegion(region);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
         }
-      }
-      LOG.info("...releasing row lock, which should let put thread continue");
-      rowLock.release();
-      LOG.info("...joining on thread");
+      };
+      regionCloseThread.start();
+
+      LOG.info("...releasing row lock 1, which should let put thread continue");
+      rowLock1.release();
+
+      LOG.info("...waiting for put thread to sync 2nd time");
+      waitForCounter(source, "syncTimeNumOps", syncs + 2);
+
+      LOG.info("...releasing row lock 2, which should let put thread continue");
+      rowLock2.release();
+
+      LOG.info("...waiting for put thread to sync 3rd time");
+      waitForCounter(source, "syncTimeNumOps", syncs + 3);
+
+      LOG.info("...releasing row lock 3, which should let put thread continue");
+      rowLock3.release();
+
+      LOG.info("...waiting for put thread to sync 4th time");
+      waitForCounter(source, "syncTimeNumOps", syncs + 4);
+
+      LOG.info("...joining on put thread");
       ctx.stop();
-      LOG.info("...checking that next batch was synced");
-      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 4, source);
-      codes = retFromThread.get();
-      for (int i = 0; i < 10; i++) {
+      regionCloseThread.join();
+
+      OperationStatus[] codes = retFromThread.get();
+      for (int i = 0; i < codes.length; i++) {
         assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
             codes[i].getOperationStatusCode());
       }
-
     } finally {
       HRegion.closeHRegion(this.region);
       this.region = null;
     }
   }
 
+  private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount)
+      throws InterruptedException {
+    long startWait = System.currentTimeMillis();
+    long currentCount;
+    while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) {
+      Thread.sleep(100);
+      if (System.currentTimeMillis() - startWait > 10000) {
+        fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName,
+          expectedCount, currentCount));
+      }
+    }
+  }
+
+
   @Test
   public void testBatchPutWithTsSlop() throws Exception {
     byte[] b = Bytes.toBytes(getName());