You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2009/05/15 15:45:20 UTC

svn commit: r775133 - in /hadoop/hbase/branches/0.19: CHANGES.txt src/java/org/apache/hadoop/hbase/HConstants.java src/java/org/apache/hadoop/hbase/regionserver/HLog.java

Author: jdcryans
Date: Fri May 15 13:45:20 2009
New Revision: 775133

URL: http://svn.apache.org/viewvc?rev=775133&view=rev
Log:
HBASE-1008  [performance] The replay of logs on server crash takes way too long

Modified:
    hadoop/hbase/branches/0.19/CHANGES.txt
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java

Modified: hadoop/hbase/branches/0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/CHANGES.txt?rev=775133&r1=775132&r2=775133&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.19/CHANGES.txt Fri May 15 13:45:20 2009
@@ -1,5 +1,5 @@
 HBase Change Log
-Release 0.19.2 - Unreleased
+Release 0.19.3 - Unreleased
   BUG FIXES
    HBASE-1413  fall back to filesystem block size default if
                hbase.regionserver.hlog.blocksize is not specified
@@ -12,6 +12,7 @@
    HBASE-1418  Transacitonal improvments and fixes (Clint Morgan via Stack)
    HBASE-1424  have shell print regioninfo and location on first load if
                DEBUG enabled
+   HBASE-1008  [performance] The replay of logs on server crash takes way too long
 
 Release 0.19.2 - May 9th, 2009 
   BUG FIXES

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java?rev=775133&r1=775132&r2=775133&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java Fri May 15 13:45:20 2009
@@ -120,6 +120,10 @@
   /** Default size of a reservation block   */
   static final int DEFAULT_SIZE_RESERVATION_BLOCK = 1024 * 1024 * 5;
   
+  /** Default number of threads to use when log splitting 
+   *  to rewrite the logs. More means faster but bigger mem consumption */
+  static final int DEFAULT_NUMBER_LOG_WRITER_THREAD = 10;
+  
   // Always store the location of the root table's HRegion.
   // This HRegion is never split.
   

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=775133&r1=775132&r2=775133&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Fri May 15 13:45:20 2009
@@ -25,10 +25,14 @@
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -718,6 +722,7 @@
   public static void splitLog(final Path rootDir, final Path srcDir,
       final FileSystem fs, final Configuration conf)
   throws IOException {
+    long millis = System.currentTimeMillis();
     if (!fs.exists(srcDir)) {
       // Nothing to do
       return;
@@ -738,7 +743,9 @@
       io.initCause(e);
       throw io;
     }
-    LOG.info("log file splitting completed for " + srcDir.toString());
+    long endMillis = System.currentTimeMillis();
+    LOG.info("log file splitting completed in " + (endMillis - millis) +
+        " millis for " + srcDir.toString());
   }
   
   /*
@@ -749,101 +756,150 @@
    * @throws IOException
    */
   private static void splitLog(final Path rootDir, final FileStatus [] logfiles,
-    final FileSystem fs, final Configuration conf)
+      final FileSystem fs, final Configuration conf)
   throws IOException {
-    Map<byte [], SequenceFile.Writer> logWriters =
+    final Map<byte [], SequenceFile.Writer> logWriters =
       new TreeMap<byte [], SequenceFile.Writer>(Bytes.BYTES_COMPARATOR);
+    final Map<byte[], LinkedList<HLogEntry>> logEntries = 
+      new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
     try {
       for (int i = 0; i < logfiles.length; i++) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Splitting " + (i + 1) + " of " + logfiles.length + ": " +
-            logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
+              logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
         }
         // Check for possibly empty file. With appends, currently Hadoop reports
         // a zero length even if the file has been sync'd. Revisit if 
         // HADOOP-4751 is committed.
-        boolean possiblyEmpty = logfiles[i].getLen() <= 0;
+        long length = logfiles[i].getLen();
         HLogKey key = new HLogKey();
         HLogEdit val = new HLogEdit();
+        SequenceFile.Reader in = null;
         try {
-          SequenceFile.Reader in =
-            new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
+          in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
           try {
             int count = 0;
-            for (; in.next(key, val); count++) {
-              byte [] tableName = key.getTablename();
-              byte [] regionName = key.getRegionName();
-              SequenceFile.Writer w = logWriters.get(regionName);
-              if (w == null) {
-                Path logfile = new Path(
-                    HRegion.getRegionDir(
-                        HTableDescriptor.getTableDir(rootDir, tableName),
-                        HRegionInfo.encodeRegionName(regionName)),
-                        HREGION_OLDLOGFILE_NAME);
-                Path oldlogfile = null;
-                SequenceFile.Reader old = null;
-                if (fs.exists(logfile)) {
-                  LOG.warn("Old log file " + logfile +
-                  " already exists. Copying existing file to new file");
-                  oldlogfile = new Path(logfile.toString() + ".old");
-                  fs.rename(logfile, oldlogfile);
-                  old = new SequenceFile.Reader(fs, oldlogfile, conf);
-                }
-                w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class,
-                    HLogEdit.class, getCompressionType(conf));
-                // Use copy of regionName; regionName object is reused inside in
-                // HStoreKey.getRegionName so its content changes as we iterate.
-                logWriters.put(regionName, w);
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Creating new log file writer for path " + logfile +
-                      " and region " + Bytes.toString(regionName));
-                }
-
-                if (old != null) {
-                  // Copy from existing log file
-                  HLogKey oldkey = new HLogKey();
-                  HLogEdit oldval = new HLogEdit();
-                  for (; old.next(oldkey, oldval); count++) {
-                    if (LOG.isDebugEnabled() && count > 0 && count % 10000 == 0) {
-                      LOG.debug("Copied " + count + " edits");
-                    }
-                    w.append(oldkey, oldval);
-                  }
-                  old.close();
-                  fs.delete(oldlogfile, true);
-                }
+            while (in.next(key, val)) {
+              byte[] regionName = key.getRegionName();
+              LinkedList<HLogEntry> queue = logEntries.get(regionName);
+              if (queue == null) {
+                queue = new LinkedList<HLogEntry>();
+                LOG.debug("Adding queue for " + Bytes.toString(regionName));
+                logEntries.put(regionName, queue);
               }
-              w.append(key, val);
-            }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Applied " + count + " total edits from " +
-                  logfiles[i].getPath().toString());
+              queue.push(new HLogEntry(val, key));
+              count++;
             }
+            LOG.debug("Pushed " + count + " entries");
           } catch (IOException e) {
             e = RemoteExceptionHandler.checkIOException(e);
             if (!(e instanceof EOFException)) {
               LOG.warn("Exception processing " + logfiles[i].getPath() +
                   " -- continuing. Possible DATA LOSS!", e);
             }
-          } finally {
-            try {
-              in.close();
-            } catch (IOException e) {
-              LOG.warn("Close in finally threw exception -- continuing", e);
-            }
-            // Delete the input file now so we do not replay edits.  We could
-            // have gotten here because of an exception.  If so, probably
-            // nothing we can do about it. Replaying it, it could work but we
-            // could be stuck replaying for ever. Just continue though we
-            // could have lost some edits.
-            fs.delete(logfiles[i].getPath(), true);
           }
         } catch (IOException e) {
-          if (possiblyEmpty) {
+          if (length <= 0) {
+            LOG.warn("Empty log, continuing: " + logfiles[i]);
             continue;
           }
           throw e;
+        } finally {
+          try {
+            if (in != null) {
+              in.close();
+            }
+          } catch (IOException e) {
+            LOG.warn("Close in finally threw exception -- continuing", e);
+          }
+          // Delete the input file now so we do not replay edits. We could
+          // have gotten here because of an exception. If so, probably
+          // nothing we can do about it. Replaying it, it could work but we
+          // could be stuck replaying for ever. Just continue though we
+          // could have lost some edits.
+          fs.delete(logfiles[i].getPath(), true);
+        }
+      }
+      ExecutorService threadPool = 
+        Executors.newFixedThreadPool(DEFAULT_NUMBER_LOG_WRITER_THREAD);
+      for (final byte[] key : logEntries.keySet()) {
+
+        Thread thread = new Thread(Bytes.toString(key)) {
+          public void run() {
+            LinkedList<HLogEntry> entries = logEntries.get(key);
+            LOG.debug("Thread got " + entries.size() + " to process");
+            long threadTime = System.currentTimeMillis();
+            try {
+              int count = 0;
+              for (HLogEntry logEntry : entries) {
+                SequenceFile.Writer w = logWriters.get(key);
+                if (w == null) {
+                  Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
+                      .getTableDir(rootDir, logEntry.getKey().getTablename()),
+                      HRegionInfo.encodeRegionName(key)),
+                      HREGION_OLDLOGFILE_NAME);
+                  Path oldlogfile = null;
+                  SequenceFile.Reader old = null;
+                  if (fs.exists(logfile)) {
+                    LOG.warn("Old log file " + logfile
+                        + " already exists. Copying existing file to new file");
+                    oldlogfile = new Path(logfile.toString() + ".old");
+                    fs.rename(logfile, oldlogfile);
+                    old = new SequenceFile.Reader(fs, oldlogfile, conf);
+                  }
+                  w = SequenceFile.createWriter(fs, conf, logfile,
+                      HLogKey.class, HLogEdit.class, getCompressionType(conf));
+                  // Use copy of regionName; regionName object is reused inside
+                  // in
+                  // HStoreKey.getRegionName so its content changes as we
+                  // iterate.
+                  logWriters.put(key, w);
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("Creating new log file writer for path "
+                        + logfile + " and region " + Bytes.toString(key));
+                  }
+
+                  if (old != null) {
+                    // Copy from existing log file
+                    HLogKey oldkey = new HLogKey();
+                    HLogEdit oldval = new HLogEdit();
+                    for (; old.next(oldkey, oldval); count++) {
+                      if (LOG.isDebugEnabled() && count > 0
+                          && count % 10000 == 0) {
+                        LOG.debug("Copied " + count + " edits");
+                      }
+                      w.append(oldkey, oldval);
+                    }
+                    old.close();
+                    fs.delete(oldlogfile, true);
+                  }
+                }
+                w.append(logEntry.getKey(), logEntry.getEdit());
+                count++;
+              }
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Applied " + count + " total edits to "
+                    + Bytes.toString(key) + " in "
+                    + (System.currentTimeMillis() - threadTime) + "ms");
+              }
+            } catch (IOException e) {
+              e = RemoteExceptionHandler.checkIOException(e);
+              LOG.warn("Got while writing region " + Bytes.toString(key)
+                  + " log " + e);
+              e.printStackTrace();
+            }
+          }
+        };
+        threadPool.execute(thread);
+      }
+      threadPool.shutdown();
+      // Wait for all threads to terminate
+      try {
+        for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS) ; i++) {
+          LOG.debug("Waiting for log writers to terminate, iteration #" + i);
         }
+      }catch(InterruptedException ex) {
+        LOG.warn("Log writers were interrupted, possible data loss!");
       }
     } finally {
       for (SequenceFile.Writer w : logWriters.values()) {
@@ -851,6 +907,40 @@
       }
     }
   }
+  
+  /**
+   * Utility class that lets us keep track of the edit with it's key
+   * Only used when splitting logs
+   */
+  public static class HLogEntry {
+    private HLogEdit edit;
+    private HLogKey key;
+    /**
+     * Constructor for both params
+     * @param edit log's edit
+     * @param key log's key
+     */
+    public HLogEntry(HLogEdit edit, HLogKey key) {
+      super();
+      this.edit = edit;
+      this.key = key;
+    }
+    /**
+     * Gets the edit
+     * @return edit
+     */
+    public HLogEdit getEdit() {
+      return edit;
+    }
+    /**
+     * Gets the key
+     * @return key
+     */
+    public HLogKey getKey() {
+      return key;
+    }
+
+  }
 
   /**
    * Construct the HLog directory name