You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/01/03 05:00:23 UTC

svn commit: r895330 - in /hadoop/hbase/branches/0.20: CHANGES.txt src/java/org/apache/hadoop/hbase/regionserver/HLog.java src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java

Author: stack
Date: Sun Jan  3 04:00:23 2010
New Revision: 895330

URL: http://svn.apache.org/viewvc?rev=895330&view=rev
Log:
HBASE-2052 Upper bound of outstanding WALs can be overrun

Modified:
    hadoop/hbase/branches/0.20/CHANGES.txt
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java

Modified: hadoop/hbase/branches/0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=895330&r1=895329&r2=895330&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.20/CHANGES.txt Sun Jan  3 04:00:23 2010
@@ -79,6 +79,7 @@
    HBASE-2080  [EC2] Support multivolume local instance storage
    HBASE-2083  [EC2] HDFS DataNode no longer required on master
    HBASE-2084  [EC2] JAVA_HOME handling broken
+   HBASE-2053  Upper bound of outstanding WALs can be overrun
 
 Release 0.20.2 - November 18th, 2009
   INCOMPATIBLE CHANGES

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=895330&r1=895329&r2=895330&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Sun Jan  3 04:00:23 2010
@@ -45,6 +45,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -54,17 +55,16 @@
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Metadata;
 import org.apache.hadoop.io.SequenceFile.Reader;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.fs.FSDataOutputStream;
 
 /**
  * HLog stores all the edits to the HStore.
@@ -130,7 +130,7 @@
     Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
 
   /*
-   * Map of region to last sequence/edit id. 
+   * Map of regions to first sequence/edit id in their memstore.
    */
   private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
     new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
@@ -321,21 +321,21 @@
    * cacheFlushLock and then completeCacheFlush could be called which would wait
    * for the lock on this and consequently never release the cacheFlushLock
    *
-   * @return If lots of logs, flush the returned region so next time through
+   * @return If lots of logs, flush the returned regions so next time through
    * we can clean logs. Returns null if nothing to flush.
    * @throws FailedLogCloseException
    * @throws IOException
    */
-  public byte [] rollWriter() throws FailedLogCloseException, IOException {
+  public byte [][] rollWriter() throws FailedLogCloseException, IOException {
     // Return if nothing to flush.
     if (this.writer != null && this.numEntries.get() <= 0) {
       return null;
     }
-    byte [] regionToFlush = null;
+    byte [][] regionsToFlush = null;
     this.cacheFlushLock.lock();
     try {
       if (closed) {
-        return regionToFlush;
+        return regionsToFlush;
       }
       synchronized (updateLock) {
         // Clean up current writer.
@@ -361,7 +361,7 @@
             }
             this.outputfiles.clear();
           } else {
-            regionToFlush = cleanOldLogs();
+            regionsToFlush = cleanOldLogs();
           }
         }
         this.numEntries.set(0);
@@ -371,7 +371,7 @@
     } finally {
       this.cacheFlushLock.unlock();
     }
-    return regionToFlush;
+    return regionsToFlush;
   }
 
   protected SequenceFile.Writer createWriter(Path path) throws IOException {
@@ -394,8 +394,7 @@
    * we can clean logs. Returns null if nothing to flush.
    * @throws IOException
    */
-  private byte [] cleanOldLogs() throws IOException {
-    byte [] regionToFlush = null;
+  private byte [][] cleanOldLogs() throws IOException {
     Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
     // Get the set of all log files whose final ID is older than or
     // equal to the oldest pending region operation
@@ -403,29 +402,60 @@
       new TreeSet<Long>(this.outputfiles.headMap(
         (Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet());
     // Now remove old log files (if any)
-    byte [] oldestRegion = null;
-    if (LOG.isDebugEnabled()) {
-      // Find region associated with oldest key -- helps debugging.
-      oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
-      LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove " +
-        " out of total " + this.outputfiles.size() + "; " +
-        "oldest outstanding seqnum is " + oldestOutstandingSeqNum +
-        " from region " + Bytes.toStringBinary(oldestRegion));
-    }
-    if (sequenceNumbers.size() > 0) {
+    int logsToRemove = sequenceNumbers.size();
+    if (logsToRemove > 0) {
+      if (LOG.isDebugEnabled()) {
+        // Find associated region; helps debugging.
+        byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
+        LOG.debug("Found " + logsToRemove + " hlogs to remove " +
+          " out of total " + this.outputfiles.size() + "; " +
+          "oldest outstanding seqnum is " + oldestOutstandingSeqNum +
+          " from region " + Bytes.toString(oldestRegion));
+      }
       for (Long seq : sequenceNumbers) {
         deleteLogFile(this.outputfiles.remove(seq), seq);
       }
     }
-    int countOfLogs = this.outputfiles.size() - sequenceNumbers.size();
-    if (countOfLogs > this.maxLogs) {
-      regionToFlush = oldestRegion != null?
-        oldestRegion: getOldestRegion(oldestOutstandingSeqNum);
-      LOG.info("Too many hlogs: logs=" + countOfLogs + ", maxlogs=" +
-        this.maxLogs + "; forcing flush of region with oldest edits: " +
-        Bytes.toStringBinary(regionToFlush));
+
+    // If too many log files, figure which regions we need to flush.
+    byte [][] regions = null;
+    int logCount = this.outputfiles.size() - logsToRemove;
+    if (logCount > this.maxLogs && this.outputfiles != null &&
+        this.outputfiles.size() > 0) {
+      regions = findMemstoresWithEditsOlderThan(this.outputfiles.firstKey(),
+        this.lastSeqWritten);
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < regions.length; i++) {
+        if (i > 0) sb.append(", ");
+        sb.append(Bytes.toStringBinary(regions[i]));
+      }
+      LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
+        this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
+        sb.toString());
+    }
+    return regions;
+  }
+
+  /**
+   * Return regions (memstores) that have edits that are less than the passed
+   * <code>oldestWALseqid</code>.
+   * @param oldestWALseqid
+   * @param regionsToSeqids
+   * @return All regions whose seqid is < than <code>oldestWALseqid</code> (Not
+   * necessarily in order).  Null if no regions found.
+   */
+  static byte [][] findMemstoresWithEditsOlderThan(final long oldestWALseqid,
+      final Map<byte [], Long> regionsToSeqids) {
+    //  This method is static so it can be unit tested the easier.
+    List<byte []> regions = null;
+    for (Map.Entry<byte [], Long> e: regionsToSeqids.entrySet()) {
+      if (e.getValue().longValue() < oldestWALseqid) {
+        if (regions == null) regions = new ArrayList<byte []>();
+        regions.add(e.getKey());
+      }
     }
-    return regionToFlush;
+    return regions == null?
+      null: regions.toArray(new byte [][] {HConstants.EMPTY_BYTE_ARRAY});
   }
 
   /*
@@ -568,7 +598,8 @@
       long seqNum = obtainSeqNum();
       logKey.setLogSeqNum(seqNum);
       // The 'lastSeqWritten' map holds the sequence number of the oldest
-      // write for each region. When the cache is flushed, the entry for the
+      // write for each region (i.e. the first edit added to the particular
+      // memstore). When the cache is flushed, the entry for the
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
       this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
@@ -617,7 +648,8 @@
     long seqNum [] = obtainSeqNum(edits.size());
     synchronized (this.updateLock) {
       // The 'lastSeqWritten' map holds the sequence number of the oldest
-      // write for each region. When the cache is flushed, the entry for the
+      // write for each region (i.e. the first edit added to the particular
+      // memstore). . When the cache is flushed, the entry for the
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
       this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0]));

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=895330&r1=895329&r2=895330&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java Sun Jan  3 04:00:23 2010
@@ -77,9 +77,9 @@
       rollLock.lock();          // Don't interrupt us. We're working
       try {
         this.lastrolltime = now;
-        byte [] regionToFlush = server.getLog().rollWriter();
-        if (regionToFlush != null) {
-          scheduleFlush(regionToFlush);
+        byte [][] regionsToFlush = server.getLog().rollWriter();
+        if (regionsToFlush != null) {
+          for (byte [] r: regionsToFlush) scheduleFlush(r);
         }
       } catch (FailedLogCloseException e) {
         LOG.fatal("Forcing server shutdown", e);
@@ -142,4 +142,4 @@
       rollLock.unlock();
     }
   }
-}
+}
\ No newline at end of file

Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java?rev=895330&r1=895329&r2=895330&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java Sun Jan  3 04:00:23 2010
@@ -21,7 +21,9 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestCase;
@@ -59,6 +61,31 @@
     shutdownDfs(cluster);
     super.tearDown();
   }
+
+  /**
+   * Test the findMemstoresWithEditsOlderThan method.
+   * @throws IOException
+   */
+  public void testFindMemstoresWithEditsOlderThan() throws IOException {
+    Map<byte [], Long> regionsToSeqids = new HashMap<byte [], Long>();
+    for (int i = 0; i < 10; i++) {
+      Long l = new Long(i);
+      regionsToSeqids.put(l.toString().getBytes(), l);
+    }
+    byte [][] regions =
+      HLog.findMemstoresWithEditsOlderThan(1, regionsToSeqids);
+    assertEquals(1, regions.length);
+    assertTrue(Bytes.equals(regions[0], "0".getBytes()));
+    regions = HLog.findMemstoresWithEditsOlderThan(3, regionsToSeqids);
+    int count = 3;
+    assertEquals(count, regions.length);
+    // Regions returned are not ordered.
+    for (int i = 0; i < count; i++) {
+      assertTrue(Bytes.equals(regions[i], "0".getBytes()) ||
+        Bytes.equals(regions[i], "1".getBytes()) ||
+        Bytes.equals(regions[i], "2".getBytes()));
+    }
+  }
  
   /**
    * Just write multiple logs then split.  Before fix for HADOOP-2283, this
@@ -184,5 +211,4 @@
       }
     }
   }
-
 }
\ No newline at end of file