You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2009/05/20 19:06:30 UTC

svn commit: r776768 - in /hadoop/hbase/branches/0.19: CHANGES.txt src/java/org/apache/hadoop/hbase/HConstants.java src/java/org/apache/hadoop/hbase/regionserver/HLog.java

Author: jdcryans
Date: Wed May 20 17:06:27 2009
New Revision: 776768

URL: http://svn.apache.org/viewvc?rev=776768&view=rev
Log:
HBASE-1430  Read the logs in batches during log splitting to avoid OOME

Modified:
    hadoop/hbase/branches/0.19/CHANGES.txt
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java

Modified: hadoop/hbase/branches/0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/CHANGES.txt?rev=776768&r1=776767&r2=776768&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.19/CHANGES.txt Wed May 20 17:06:27 2009
@@ -27,6 +27,7 @@
                N minutes/hours
    HBASE-1420  add abliity to add and remove table) indexes on existing
                tables (Clint Morgan via Stack)
+   HBASE-1430  Read the logs in batches during log splitting to avoid OOME
 
 Release 0.19.2 - May 9th, 2009 
   BUG FIXES

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java?rev=776768&r1=776767&r2=776768&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java Wed May 20 17:06:27 2009
@@ -124,6 +124,10 @@
    *  to rewrite the logs. More means faster but bigger mem consumption */
   static final int DEFAULT_NUMBER_LOG_WRITER_THREAD = 10;
   
+  /** Default number of logs to read concurrently
+   *  when log splitting. More means faster but bigger mem consumption  */
+  static final int DEFAULT_NUMBER_CONCURRENT_LOG_READS = 10;
+  
   // Always store the location of the root table's HRegion.
   // This HRegion is never split.
   

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=776768&r1=776767&r2=776768&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Wed May 20 17:06:27 2009
@@ -761,156 +761,163 @@
    */
   private static void splitLog(final Path rootDir, final FileStatus [] logfiles,
       final FileSystem fs, final Configuration conf)
-  throws IOException {
-    final Map<byte [], SequenceFile.Writer> logWriters =
-      new TreeMap<byte [], SequenceFile.Writer>(Bytes.BYTES_COMPARATOR);
-    final Map<byte[], LinkedList<HLogEntry>> logEntries = 
-      new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
-    try {
-      for (int i = 0; i < logfiles.length; i++) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Splitting " + (i + 1) + " of " + logfiles.length + ": " +
-              logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
-        }
-        // Check for possibly empty file. With appends, currently Hadoop reports
-        // a zero length even if the file has been sync'd. Revisit if 
-        // HADOOP-4751 is committed.
-        long length = logfiles[i].getLen();
-        HLogKey key = new HLogKey();
-        HLogEdit val = new HLogEdit();
-        SequenceFile.Reader in = null;
-        try {
-          in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
-          try {
-            int count = 0;
-            while (in.next(key, val)) {
-              byte[] regionName = key.getRegionName();
-              LinkedList<HLogEntry> queue = logEntries.get(regionName);
-              if (queue == null) {
-                queue = new LinkedList<HLogEntry>();
-                LOG.debug("Adding queue for " + Bytes.toString(regionName));
-                logEntries.put(regionName, queue);
-              }
-              queue.push(new HLogEntry(val, key));
-              count++;
-            }
-            LOG.debug("Pushed " + count + " entries");
-          } catch (IOException e) {
-            e = RemoteExceptionHandler.checkIOException(e);
-            if (!(e instanceof EOFException)) {
-              LOG.warn("Exception processing " + logfiles[i].getPath() +
-                  " -- continuing. Possible DATA LOSS!", e);
+    throws IOException {
+      final Map<byte [], SequenceFile.Writer> logWriters =
+        new TreeMap<byte [], SequenceFile.Writer>(Bytes.BYTES_COMPARATOR);
+      
+      try {
+        int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) / 
+            DEFAULT_NUMBER_CONCURRENT_LOG_READS)).intValue();
+        for(int step = 0; step < maxSteps; step++) {
+          final Map<byte[], LinkedList<HLogEntry>> logEntries = 
+            new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
+          // Stop at logfiles.length when it's the last step
+          int endIndex = step == maxSteps - 1 ? logfiles.length : 
+            step*DEFAULT_NUMBER_CONCURRENT_LOG_READS + 
+            DEFAULT_NUMBER_CONCURRENT_LOG_READS;
+          for (int i = (step * 10); i < endIndex; i++) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
+                  ": " + logfiles[i].getPath() + 
+                  ", length=" + logfiles[i].getLen());
             }
-          }
-        } catch (IOException e) {
-          if (length <= 0) {
-            LOG.warn("Empty log, continuing: " + logfiles[i]);
-            continue;
-          }
-          throw e;
-        } finally {
-          try {
-            if (in != null) {
-              in.close();
+            // Check for possibly empty file. With appends, currently Hadoop 
+            // reports a zero length even if the file has been sync'd. Revisit if
+            // HADOOP-4751 is committed.
+            long length = logfiles[i].getLen();
+            HLogKey key = new HLogKey();
+            HLogEdit val = new HLogEdit();
+            SequenceFile.Reader in = null;
+            try {
+              in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
+              try {
+                int count = 0;
+                while (in.next(key, val)) {
+                  byte [] regionName = key.getRegionName();
+                  LinkedList<HLogEntry> queue = logEntries.get(regionName);
+                  if (queue == null) {
+                    queue = new LinkedList<HLogEntry>();
+                    LOG.debug("Adding queue for " + Bytes.toString(regionName));
+                    logEntries.put(regionName, queue);
+                  }
+                  queue.push(new HLogEntry(val, key));
+                  count++;
+                }
+                LOG.debug("Pushed " + count + " entries from " +
+                    logfiles[i].getPath());
+              } catch (IOException e) {
+                e = RemoteExceptionHandler.checkIOException(e);
+                if (!(e instanceof EOFException)) {
+                  LOG.warn("Exception processing " + logfiles[i].getPath() +
+                      " -- continuing. Possible DATA LOSS!", e);
+                }
+              }
+            } catch (IOException e) {
+              if (length <= 0) {
+                LOG.warn("Empty hlog, continuing: " + logfiles[i]);
+                continue;
+              }
+              throw e;
+            } finally {
+              try {
+                if (in != null) {
+                  in.close();
+                }
+              } catch (IOException e) {
+                LOG.warn("Close in finally threw exception -- continuing", e);
+              }
+              // Delete the input file now so we do not replay edits. We could
+              // have gotten here because of an exception. If so, probably
+              // nothing we can do about it. Replaying it, it could work but we
+              // could be stuck replaying for ever. Just continue though we
+              // could have lost some edits.
+              fs.delete(logfiles[i].getPath(), true);
             }
-          } catch (IOException e) {
-            LOG.warn("Close in finally threw exception -- continuing", e);
           }
-          // Delete the input file now so we do not replay edits. We could
-          // have gotten here because of an exception. If so, probably
-          // nothing we can do about it. Replaying it, it could work but we
-          // could be stuck replaying for ever. Just continue though we
-          // could have lost some edits.
-          fs.delete(logfiles[i].getPath(), true);
-        }
-      }
-      ExecutorService threadPool = 
-        Executors.newFixedThreadPool(DEFAULT_NUMBER_LOG_WRITER_THREAD);
-      for (final byte[] key : logEntries.keySet()) {
+          ExecutorService threadPool = 
+            Executors.newFixedThreadPool(DEFAULT_NUMBER_LOG_WRITER_THREAD);
+          for (final byte[] key : logEntries.keySet()) {
 
-        Thread thread = new Thread(Bytes.toString(key)) {
-          public void run() {
-            LinkedList<HLogEntry> entries = logEntries.get(key);
-            LOG.debug("Thread got " + entries.size() + " to process");
-            long threadTime = System.currentTimeMillis();
-            try {
-              int count = 0;
-              for (HLogEntry logEntry : entries) {
-                SequenceFile.Writer w = logWriters.get(key);
-                if (w == null) {
-                  Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
-                      .getTableDir(rootDir, logEntry.getKey().getTablename()),
-                      HRegionInfo.encodeRegionName(key)),
-                      HREGION_OLDLOGFILE_NAME);
-                  Path oldlogfile = null;
-                  SequenceFile.Reader old = null;
-                  if (fs.exists(logfile)) {
-                    LOG.warn("Old log file " + logfile
-                        + " already exists. Copying existing file to new file");
-                    oldlogfile = new Path(logfile.toString() + ".old");
-                    fs.rename(logfile, oldlogfile);
-                    old = new SequenceFile.Reader(fs, oldlogfile, conf);
-                  }
-                  w = SequenceFile.createWriter(fs, conf, logfile,
-                      HLogKey.class, HLogEdit.class, getCompressionType(conf));
-                  // Use copy of regionName; regionName object is reused inside
-                  // in
-                  // HStoreKey.getRegionName so its content changes as we
-                  // iterate.
-                  logWriters.put(key, w);
-                  if (LOG.isDebugEnabled()) {
-                    LOG.debug("Creating new log file writer for path "
-                        + logfile + " and region " + Bytes.toString(key));
-                  }
+            Thread thread = new Thread(Bytes.toString(key)) {
+              public void run() {
+                LinkedList<HLogEntry> entries = logEntries.get(key);
+                LOG.debug("Thread got " + entries.size() + " to process");
+                long threadTime = System.currentTimeMillis();
+                try {
+                  int count = 0;
+                  for (HLogEntry logEntry : entries) {
+                    SequenceFile.Writer w = logWriters.get(key);
+                    if (w == null) {
+                      Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
+                          .getTableDir(rootDir, logEntry.getKey().getTablename()),
+                          HRegionInfo.encodeRegionName(key)),
+                          HREGION_OLDLOGFILE_NAME);
+                      Path oldlogfile = null;
+                      SequenceFile.Reader old = null;
+                      if (fs.exists(logfile)) {
+                        LOG.warn("Old hlog file " + logfile
+                          + " already exists. Copying existing file to new file");
+                        oldlogfile = new Path(logfile.toString() + ".old");
+                        fs.rename(logfile, oldlogfile);
+                        old = new SequenceFile.Reader(fs, oldlogfile, conf);
+                      }
+                      w = SequenceFile.createWriter(fs, conf, logfile,
+                          HLogKey.class, HLogEdit.class, getCompressionType(conf));
+                      logWriters.put(key, w);
+                      if (LOG.isDebugEnabled()) {
+                        LOG.debug("Creating new hlog file writer for path "
+                            + logfile + " and region " + Bytes.toString(key));
+                      }
 
-                  if (old != null) {
-                    // Copy from existing log file
-                    HLogKey oldkey = new HLogKey();
-                    HLogEdit oldval = new HLogEdit();
-                    for (; old.next(oldkey, oldval); count++) {
-                      if (LOG.isDebugEnabled() && count > 0
-                          && count % 10000 == 0) {
-                        LOG.debug("Copied " + count + " edits");
+                      if (old != null) {
+                        // Copy from existing log file
+                        HLogKey oldkey = new HLogKey();
+                        HLogEdit oldval = new HLogEdit();
+                        for (; old.next(oldkey, oldval); count++) {
+                          if (LOG.isDebugEnabled() && count > 0
+                              && count % 10000 == 0) {
+                            LOG.debug("Copied " + count + " edits");
+                          }
+                          w.append(oldkey, oldval);
+                        }
+                        old.close();
+                        fs.delete(oldlogfile, true);
                       }
-                      w.append(oldkey, oldval);
                     }
-                    old.close();
-                    fs.delete(oldlogfile, true);
+                    w.append(logEntry.getKey(), logEntry.getEdit());
+                    count++;
+                  }
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("Applied " + count + " total edits to "
+                        + Bytes.toString(key) + " in "
+                        + (System.currentTimeMillis() - threadTime) + "ms");
                   }
+                } catch (IOException e) {
+                  e = RemoteExceptionHandler.checkIOException(e);
+                  LOG.warn("Got while writing region " + Bytes.toString(key)
+                      + " log " + e);
+                  e.printStackTrace();
                 }
-                w.append(logEntry.getKey(), logEntry.getEdit());
-                count++;
               }
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Applied " + count + " total edits to "
-                    + Bytes.toString(key) + " in "
-                    + (System.currentTimeMillis() - threadTime) + "ms");
-              }
-            } catch (IOException e) {
-              e = RemoteExceptionHandler.checkIOException(e);
-              LOG.warn("Got while writing region " + Bytes.toString(key)
-                  + " log " + e);
-              e.printStackTrace();
+            };
+            threadPool.execute(thread);
+          }
+          threadPool.shutdown();
+          // Wait for all threads to terminate
+          try {
+            for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); i++) {
+              LOG.debug("Waiting for hlog writers to terminate, iteration #" + i);
             }
+          }catch(InterruptedException ex) {
+            LOG.warn("Hlog writers were interrupted, possible data loss!");
           }
-        };
-        threadPool.execute(thread);
-      }
-      threadPool.shutdown();
-      // Wait for all threads to terminate
-      try {
-        for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS) ; i++) {
-          LOG.debug("Waiting for log writers to terminate, iteration #" + i);
         }
-      }catch(InterruptedException ex) {
-        LOG.warn("Log writers were interrupted, possible data loss!");
-      }
-    } finally {
-      for (SequenceFile.Writer w : logWriters.values()) {
-        w.close();
+      } finally {
+        for (SequenceFile.Writer w : logWriters.values()) {
+          w.close();
+        }
       }
     }
-  }
   
   /**
    * Utility class that lets us keep track of the edit with it's key