You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 19:42:19 UTC

svn commit: r1181924 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/wal/ test/java/org/apache/hadoop/hbase/regionserver/wal/

Author: nspiegelberg
Date: Tue Oct 11 17:42:18 2011
New Revision: 1181924

URL: http://svn.apache.org/viewvc?rev=1181924&view=rev
Log:
backport tracking of oldest sequence id - Task #592226

Summary:
backport from 0.90 and to .89

https://phabricator.fb.com/D252265 and
https://phabricator.fb.com/D292941

Test Plan:
TestHLog
TestWALReplay

Reviewed By: nspiegelberg
Reviewers: pkhemani, jgray, dhruba, nspiegelberg, kranganathan
Commenters: pkhemani
CC: hbase@lists, pkhemani, anirudht, nspiegelberg
Differential Revision: 293592

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1181924&r1=1181923&r2=1181924&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 11 17:42:18 2011
@@ -1230,7 +1230,8 @@ public class HRegion implements HeapSize
     List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(
         stores.size());
     try {
-      sequenceId = (wal == null)? myseqid: wal.startCacheFlush();
+      sequenceId = (wal == null)? myseqid :
+        wal.startCacheFlush(this.regionInfo.getRegionName());
       completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
 
       for (Store s : stores.values()) {
@@ -1304,7 +1305,9 @@ public class HRegion implements HeapSize
       // We used to only catch IOEs but its possible that we'd get other
       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
       // all and sundry.
-      if (wal != null) wal.abortCacheFlush();
+      if (wal != null) {
+        wal.abortCacheFlush(this.regionInfo.getRegionName());
+      }
       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
           Bytes.toStringBinary(getRegionName()));
       dse.initCause(t);

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1181924&r1=1181923&r2=1181924&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Tue Oct 11 17:42:18 2011
@@ -1163,13 +1163,47 @@ public class HLog implements Syncable {
    * completion of a cache-flush. Otherwise the log-seq-id for the flush will
    * not appear in the correct logfile.
    *
+   * Ensuring that flushes and log-rolls don't happen concurrently also allows
+   * us to temporarily put a log-seq-number in lastSeqWritten against the region
+   * being flushed that might not be the earliest in-memory log-seq-number for
+   * that region. By the time the flush is completed or aborted and before the
+   * cacheFlushLock is released it is ensured that lastSeqWritten again has the
+   * oldest in-memory edit's lsn for the region that was being flushed.
+   *
+   * In this method, by removing the entry in lastSeqWritten for the region
+   * being flushed we ensure that the next edit inserted in this region will be
+   * correctly recorded in {@link #append(HRegionInfo, HLogKey, WALEdit)}. The
+   * lsn of the earliest in-memory lsn - which is now in the memstore snapshot -
+   * is saved temporarily in the lastSeqWritten map while the flush is active.
+   *
    * @return sequence ID to pass {@link #completeCacheFlush(byte[], byte[], long, boolean)}
    * (byte[], byte[], long)}
    * @see #completeCacheFlush(byte[], byte[], long, boolean)
    * @see #abortCacheFlush()
    */
-  public long startCacheFlush() {
+  public long startCacheFlush(final byte [] regionName) {
     this.cacheFlushLock.lock();
+    Long seq = this.lastSeqWritten.remove(regionName);
+    // seq is the lsn of the oldest edit associated with this region. If a
+    // snapshot already exists - because the last flush failed - then seq will
+    // be the lsn of the oldest edit in the snapshot
+    if (seq != null) {
+      // keeping the earliest sequence number of the snapshot in
+      // lastSeqWritten maintains the correctness of
+      // getOldestOutstandingSeqNum(). But it doesn't matter really because
+      // everything is being done inside of cacheFlush lock.
+      Long oldseq =
+        lastSeqWritten.put(regionName, seq);
+      if (oldseq != null) {
+        LOG.error("Logic Error Snapshot seq id from earlier flush still" +
+                  " present! for region " + Bytes.toString(regionName) +
+                  " overwritten oldseq=" + oldseq + "with new seq=" + seq);
+        Runtime.getRuntime().halt(1);
+      } else {
+        LOG.debug("Inserted lastSeqWritten of region snapshot " +
+                  regionName + " as " + seq);
+      }
+    }
     return obtainSeqNum();
   }
 
@@ -1184,9 +1218,8 @@ public class HLog implements Syncable {
    * @throws IOException
    */
   public void completeCacheFlush(final byte [] regionName, final byte [] tableName,
-    final long logSeqId,
-    final boolean isMetaRegion)
-  throws IOException {
+                                 final long logSeqId, final boolean isMetaRegion)
+    throws IOException {
     try {
       synchronized (updateLock) {
         if (this.closed) {
@@ -1195,18 +1228,18 @@ public class HLog implements Syncable {
         long now = System.currentTimeMillis();
         WALEdit edit = completeCacheFlushLogEdit();
         HLogKey key = makeKey(regionName, tableName, logSeqId,
-            System.currentTimeMillis());
+                              System.currentTimeMillis());
         this.writer.append(new Entry(key, edit));
         this.numEntries.incrementAndGet();
-        Long seq = this.lastSeqWritten.get(regionName);
-        if (seq != null && logSeqId >= seq.longValue()) {
-          this.lastSeqWritten.remove(regionName);
-        }
       }
       // sync txn to file system
       this.sync(isMetaRegion);
 
     } finally {
+      // updateLock not needed for removing snapshot's entry
+      // Cleaning up of lastSeqWritten is in the finally clause because we
+      // don't want to confuse getOldestOutstandingSeqNum()
+      this.lastSeqWritten.remove(regionName);
       this.cacheFlushLock.unlock();
     }
   }
@@ -1225,7 +1258,27 @@ public class HLog implements Syncable {
    * currently is a restart of the regionserver so the snapshot content dropped
    * by the failure gets restored to the memstore.
    */
-  public void abortCacheFlush() {
+  public void abortCacheFlush(byte[] regionName) {
+    LOG.debug("Aborting cache flush of region " +
+              Bytes.toString(regionName));
+    Long snapshot_seq =
+      this.lastSeqWritten.remove(regionName);
+    if (snapshot_seq != null) {
+      // updateLock not necessary because we are racing against
+      // lastSeqWritten.putIfAbsent() in append() and we will always win
+      // before releasing cacheFlushLock make sure that the region's entry in
+      // lastSeqWritten points to the earliest edit in the region
+      Long current_memstore_earliest_seq =
+        this.lastSeqWritten.put(regionName, snapshot_seq);
+      if (current_memstore_earliest_seq != null &&
+          (current_memstore_earliest_seq.longValue() <=
+           snapshot_seq.longValue())) {
+        LOG.error("Logic Error region " + Bytes.toString(regionName) +
+                  "acquired edits out of order current memstore seq=" +
+                  current_memstore_earliest_seq + " snapshot seq=" + snapshot_seq);
+        Runtime.getRuntime().halt(1);
+      }
+    }
     this.cacheFlushLock.unlock();
   }
 

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1181924&r1=1181923&r2=1181924&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Tue Oct 11 17:42:18 2011
@@ -448,7 +448,7 @@ public class TestHLog  {
         row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
       final byte [] regionName = info.getRegionName();
       log.append(info, tableName, cols, System.currentTimeMillis());
-      long logSeqId = log.startCacheFlush();
+      long logSeqId = log.startCacheFlush(info.getRegionName());
       log.completeCacheFlush(regionName, tableName, logSeqId, info.isMetaRegion());
       log.close();
       Path filename = log.computeFilename();
@@ -516,7 +516,7 @@ public class TestHLog  {
       HRegionInfo hri = new HRegionInfo(new HTableDescriptor(tableName),
           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
       log.append(hri, tableName, cols, System.currentTimeMillis());
-      long logSeqId = log.startCacheFlush();
+      long logSeqId = log.startCacheFlush(hri.getRegionName());
       log.completeCacheFlush(hri.getRegionName(), tableName, logSeqId, false);
       log.close();
       Path filename = log.computeFilename();
@@ -624,7 +624,7 @@ public class TestHLog  {
 
     // Flush the first region, we expect to see the first two files getting
     // archived
-    long seqId = log.startCacheFlush();
+    long seqId = log.startCacheFlush(hri.getRegionName());
     log.completeCacheFlush(hri.getRegionName(), tableName, seqId, false);
     log.rollWriter();
     assertEquals(2, log.getNumLogFiles());
@@ -632,7 +632,7 @@ public class TestHLog  {
     // Flush the second region, which removes all the remaining output files
     // since the oldest was completely flushed and the two others only contain
     // flush information
-    seqId = log.startCacheFlush();
+    seqId = log.startCacheFlush(hri2.getRegionName());
     log.completeCacheFlush(hri2.getRegionName(), tableName2, seqId, false);
     log.rollWriter();
     assertEquals(0, log.getNumLogFiles());

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1181924&r1=1181923&r2=1181924&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Tue Oct 11 17:42:18 2011
@@ -338,7 +338,7 @@ public class TestWALReplay {
     }
 
     // Add a cache flush, shouldn't have any effect
-    long logSeqId = wal.startCacheFlush();
+    long logSeqId = wal.startCacheFlush(regionName);
     wal.completeCacheFlush(regionName, tableName, logSeqId, hri.isMetaRegion());
 
     // Add an edit to another family, should be skipped.