You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2016/09/29 22:07:52 UTC
[2/3] hbase git commit: HBASE-16721 Concurrency issue in WAL
unflushed seqId tracking
HBASE-16721 Concurrency issue in WAL unflushed seqId tracking
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f77f1530
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f77f1530
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f77f1530
Branch: refs/heads/branch-1.3
Commit: f77f1530d4cebd1679bc1c27782bc283638dbd5f
Parents: 728f58a
Author: Enis Soztutar <en...@apache.org>
Authored: Thu Sep 29 13:50:58 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Thu Sep 29 14:53:29 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 19 ++--
.../java/org/apache/hadoop/hbase/wal/WAL.java | 2 +-
.../hbase/regionserver/wal/TestFSHLog.java | 101 ++++++++++++++++++-
3 files changed, 110 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f77f1530/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d43e838..520286f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2293,6 +2293,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long trxId = 0;
MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.begin();
+ // wait for all in-progress transactions to commit to WAL before
+ // we can start the flush. This prevents
+ // uncommitted transactions from being written into HFiles.
+ // We have to block before we start the flush, otherwise keys that
+ // were removed via a rollbackMemstore could be written to Hfiles.
+ mvcc.completeAndWait(writeEntry);
+ // set writeEntry to null to prevent mvcc.complete from being called again inside finally
+ // block
+ writeEntry = null;
try {
try {
if (wal != null) {
@@ -2371,16 +2380,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throw ioe;
}
}
-
- // wait for all in-progress transactions to commit to WAL before
- // we can start the flush. This prevents
- // uncommitted transactions from being written into HFiles.
- // We have to block before we start the flush, otherwise keys that
- // were removed via a rollbackMemstore could be written to Hfiles.
- mvcc.completeAndWait(writeEntry);
- // set writeEntry to null to prevent mvcc.complete from being called again inside finally
- // block
- writeEntry = null;
} finally {
if (writeEntry != null) {
// In case of failure just mark current writeEntry as complete.
http://git-wip-us.apache.org/repos/asf/hbase/blob/f77f1530/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index d2b336e..041a5b9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -47,7 +47,7 @@ import com.google.common.annotations.VisibleForTesting;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public interface WAL {
+public interface WAL extends AutoCloseable {
/**
* Registers WALActionsListener
http://git-wip-us.apache.org/repos/asf/hbase/blob/f77f1530/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 6ece700..760cdc1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -30,6 +30,10 @@ import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
@@ -55,6 +59,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
@@ -343,8 +348,8 @@ public class TestFSHLog {
* by slowing appends in the background ring buffer thread while in foreground we call
* flush. The addition of the sync over HRegion in flush should fix an issue where flush was
* returning before all of its appends had made it out to the WAL (HBASE-11109).
+ * see HBASE-11109
* @throws IOException
- * @see HBASE-11109
*/
@Test
public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
@@ -448,4 +453,98 @@ public class TestFSHLog {
log.close();
}
}
+
+ /**
+ * Test case for https://issues.apache.org/jira/browse/HBASE-16721
+ */
+ @Test (timeout = 30000)
+ public void testUnflushedSeqIdTracking() throws IOException, InterruptedException {
+ final String name = "testSyncRunnerIndexOverflow";
+ final byte[] b = Bytes.toBytes("b");
+
+ final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
+ final CountDownLatch holdAppend = new CountDownLatch(1);
+ final CountDownLatch flushFinished = new CountDownLatch(1);
+ final CountDownLatch putFinished = new CountDownLatch(1);
+
+ try (FSHLog log =
+ new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
+ null, true, null, null)) {
+
+ log.registerWALActionsListener(new WALActionsListener.Base() {
+ @Override
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
+ throws IOException {
+ if (startHoldingForAppend.get()) {
+ try {
+ holdAppend.await();
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ }
+ });
+
+ // open a new region which uses this WAL
+ HTableDescriptor htd =
+ new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor(b));
+ HRegionInfo hri =
+ new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+
+ final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log);
+ ExecutorService exec = Executors.newFixedThreadPool(2);
+
+ // do a regular write first because of memstore size calculation.
+ region.put(new Put(b).addColumn(b, b,b));
+
+ startHoldingForAppend.set(true);
+ exec.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ region.put(new Put(b).addColumn(b, b,b));
+ putFinished.countDown();
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+ });
+
+ // give the put a chance to start
+ Threads.sleep(3000);
+
+ exec.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Region.FlushResult flushResult = region.flush(true);
+ LOG.info("Flush result:" + flushResult.getResult());
+ LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
+ flushFinished.countDown();
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+ });
+
+ // give the flush a chance to start. Flush should have got the region lock, and
+ // should have been waiting on the mvcc complete after this.
+ Threads.sleep(3000);
+
+ // let the append to WAL go through now that the flush already started
+ holdAppend.countDown();
+ putFinished.await();
+ flushFinished.await();
+
+ // check whether flush went through
+ assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][]{b}).size());
+
+ // now check the region's unflushed seqIds.
+ long seqId = log.getEarliestMemstoreSeqNum(hri.getEncodedNameAsBytes());
+ assertEquals("Found seqId for the region which is already flushed",
+ HConstants.NO_SEQNUM, seqId);
+
+ region.close();
+ }
+ }
}