You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2016/09/29 22:07:53 UTC

[3/3] hbase git commit: HBASE-16721 Concurrency issue in WAL unflushed seqId tracking

HBASE-16721 Concurrency issue in WAL unflushed seqId tracking


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cf374af1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cf374af1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cf374af1

Branch: refs/heads/branch-1.2
Commit: cf374af102f139a6176d05b97201bfa8d9f687be
Parents: 42dff8a
Author: Enis Soztutar <en...@apache.org>
Authored: Thu Sep 29 13:50:58 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Thu Sep 29 14:55:45 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  19 ++--
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |   2 +-
 .../hbase/regionserver/wal/TestFSHLog.java      | 100 ++++++++++++++++++-
 3 files changed, 109 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cf374af1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 22c66e3..f93b5a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2216,6 +2216,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     long trxId = 0;
     MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.begin();
+    // wait for all in-progress transactions to commit to WAL before
+    // we can start the flush. This prevents
+    // uncommitted transactions from being written into HFiles.
+    // We have to block before we start the flush, otherwise keys that
+    // were removed via a rollbackMemstore could be written to Hfiles.
+    mvcc.completeAndWait(writeEntry);
+    // set writeEntry to null to prevent mvcc.complete from being called again inside finally
+    // block
+    writeEntry = null;
     try {
       try {
         if (wal != null) {
@@ -2294,16 +2303,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           throw ioe;
         }
       }
-
-      // wait for all in-progress transactions to commit to WAL before
-      // we can start the flush. This prevents
-      // uncommitted transactions from being written into HFiles.
-      // We have to block before we start the flush, otherwise keys that
-      // were removed via a rollbackMemstore could be written to Hfiles.
-      mvcc.completeAndWait(writeEntry);
-      // set writeEntry to null to prevent mvcc.complete from being called again inside finally
-      // block
-      writeEntry = null;
     } finally {
       if (writeEntry != null) {
         // In case of failure just mark current writeEntry as complete.

http://git-wip-us.apache.org/repos/asf/hbase/blob/cf374af1/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index d2b336e..041a5b9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -47,7 +47,7 @@ import com.google.common.annotations.VisibleForTesting;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public interface WAL {
+public interface WAL extends AutoCloseable {
 
   /**
    * Registers WALActionsListener

http://git-wip-us.apache.org/repos/asf/hbase/blob/cf374af1/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 6ece700..e09b621 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -30,6 +30,10 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
@@ -55,6 +59,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
@@ -343,8 +348,8 @@ public class TestFSHLog {
    * by slowing appends in the background ring buffer thread while in foreground we call
    * flush.  The addition of the sync over HRegion in flush should fix an issue where flush was
    * returning before all of its appends had made it out to the WAL (HBASE-11109).
+   * see HBASE-11109
    * @throws IOException
-   * @see HBASE-11109
    */
   @Test
   public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
@@ -448,4 +453,97 @@ public class TestFSHLog {
       log.close();
     }
   }
+
+  /**
+   * Test case for https://issues.apache.org/jira/browse/HBASE-16721
+   */
+  @Test (timeout = 30000)
+  public void testUnflushedSeqIdTracking() throws IOException, InterruptedException {
+    final String name = "testSyncRunnerIndexOverflow";
+    final byte[] b = Bytes.toBytes("b");
+
+    final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
+    final CountDownLatch holdAppend = new CountDownLatch(1);
+    final CountDownLatch flushFinished = new CountDownLatch(1);
+    final CountDownLatch putFinished = new CountDownLatch(1);
+
+    try (FSHLog log =
+        new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
+            null, true, null, null)) {
+
+      log.registerWALActionsListener(new WALActionsListener.Base() {
+        @Override
+        public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
+          if (startHoldingForAppend.get()) {
+            try {
+              holdAppend.await();
+            } catch (InterruptedException e) {
+              LOG.error(e);
+            }
+          }
+        }
+      });
+
+      // open a new region which uses this WAL
+      HTableDescriptor htd =
+          new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor(b));
+      HRegionInfo hri =
+          new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+
+      final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log);
+      ExecutorService exec = Executors.newFixedThreadPool(2);
+
+      // do a regular write first because of memstore size calculation.
+      region.put(new Put(b).addColumn(b, b,b));
+
+      startHoldingForAppend.set(true);
+      exec.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            region.put(new Put(b).addColumn(b, b,b));
+            putFinished.countDown();
+          } catch (IOException e) {
+            LOG.error(e);
+          }
+        }
+      });
+
+      // give the put a chance to start
+      Threads.sleep(3000);
+
+      exec.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            Region.FlushResult flushResult = region.flush(true);
+            LOG.info("Flush result:" +  flushResult.getResult());
+            LOG.info("Flush succeeded:" +  flushResult.isFlushSucceeded());
+            flushFinished.countDown();
+          } catch (IOException e) {
+            LOG.error(e);
+          }
+        }
+      });
+
+      // give the flush a chance to start. Flush should have got the region lock, and
+      // should have been waiting on the mvcc complete after this.
+      Threads.sleep(3000);
+
+      // let the append to WAL go through now that the flush already started
+      holdAppend.countDown();
+      putFinished.await();
+      flushFinished.await();
+
+      // check whether flush went through
+      assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][]{b}).size());
+
+      // now check the region's unflushed seqIds.
+      long seqId = log.getEarliestMemstoreSeqNum(hri.getEncodedNameAsBytes());
+      assertEquals("Found seqId for the region which is already flushed",
+          HConstants.NO_SEQNUM, seqId);
+
+      region.close();
+    }
+  }
 }