You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/12/20 21:36:18 UTC

svn commit: r1051279 - in /hbase/branches/0.90: ./ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/regionserver/wal/ src/test/java/org/apache/hadoop/hbase/regionserver/wal/

Author: stack
Date: Mon Dec 20 20:36:17 2010
New Revision: 1051279

URL: http://svn.apache.org/viewvc?rev=1051279&view=rev
Log:
HBASE-3323 OOME in master splitting logs

Modified:
    hbase/branches/0.90/CHANGES.txt
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
    hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
    hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
    hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
    hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/branches/0.90/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/CHANGES.txt?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/CHANGES.txt (original)
+++ hbase/branches/0.90/CHANGES.txt Mon Dec 20 20:36:17 2010
@@ -761,6 +761,7 @@ Release 0.90.0 - Unreleased
    HBASE-3370  ReplicationSource.openReader fails to locate HLogs when they
                aren't split yet
    HBASE-3371  Race in TestReplication can make it fail
+   HBASE-3323  OOME in master splitting logs
 
 
   IMPROVEMENTS

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java Mon Dec 20 20:36:17 2010
@@ -190,12 +190,13 @@ public class MasterFileSystem {
     long splitTime = 0, splitLogSize = 0;
     Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName));
     try {
-      HLogSplitter splitter = HLogSplitter.createLogSplitter(conf);
+      HLogSplitter splitter = HLogSplitter.createLogSplitter(
+        conf, rootdir, logDir, oldLogDir, this.fs);
       try {
-        splitter.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf);
+        splitter.splitLog();
       } catch (OrphanHLogAfterSplitException e) {
         LOG.warn("Retrying splitting because of:", e);
-        splitter.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf);
+        splitter.splitLog();
       }
       splitTime = splitter.getTime();
       splitLogSize = splitter.getSize();

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Mon Dec 20 20:36:17 2010
@@ -1439,8 +1439,9 @@ public class HLog implements Syncable {
       throw new IOException(p + " is not a directory");
     }
 
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-    logSplitter.splitLog(baseDir, p, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(
+        conf, baseDir, p, oldLogDir, fs);
+    logSplitter.splitLog();
   }
 
   /**

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java Mon Dec 20 20:36:17 2010
@@ -160,6 +160,32 @@ public class HLogKey implements Writable
     return result;
   }
 
+  /**
+   * Drop this instance's tablename byte array and instead
+   * hold a reference to the provided tablename. This is not
+   * meant to be a general purpose setter - it's only used
+   * to collapse references to conserve memory.
+   */
+  void internTableName(byte []tablename) {
+    // We should not use this as a setter - only to swap
+    // in a new reference to the same table name.
+    assert Bytes.equals(tablename, this.tablename);
+    this.tablename = tablename;
+  }
+
+  /**
+   * Drop this instance's region name byte array and instead
+   * hold a reference to the provided region name. This is not
+   * meant to be a general purpose setter - it's only used
+   * to collapse references to conserve memory.
+   */
+  void internEncodedRegionName(byte []encodedRegionName) {
+    // We should not use this as a setter - only to swap
+    // in a new reference to the same table name.
+    assert Bytes.equals(this.encodedRegionName, encodedRegionName);
+    this.encodedRegionName = encodedRegionName;
+  }
+
   public void write(DataOutput out) throws IOException {
     Bytes.writeByteArray(out, this.encodedRegionName);
     Bytes.writeByteArray(out, this.tablename);

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Mon Dec 20 20:36:17 2010
@@ -23,21 +23,18 @@ import static org.apache.hadoop.hbase.ut
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,6 +42,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
@@ -53,9 +51,11 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.MultipleIOException;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  * This class is responsible for splitting up a bunch of regionserver commit log
@@ -66,74 +66,109 @@ public class HLogSplitter {
 
   private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
 
-  static final Log LOG = LogFactory.getLog(HLogSplitter.class);
-
-  private long splitTime = 0;
-  private long splitSize = 0;
-
   /**
    * Name of file that holds recovered edits written by the wal log splitting
    * code, one per region
    */
   public static final String RECOVERED_EDITS = "recovered.edits";
 
+  
+  static final Log LOG = LogFactory.getLog(HLogSplitter.class);
+
+  private boolean hasSplit = false;
+  private long splitTime = 0;
+  private long splitSize = 0;
+
+
+  // Parameters for split process
+  protected final Path rootDir;
+  protected final Path srcDir;
+  protected final Path oldLogDir;
+  protected final FileSystem fs;
+  protected final Configuration conf;
+  
+  // Major subcomponents of the split process.
+  // These are separated into inner classes to make testing easier.
+  OutputSink outputSink;
+  EntryBuffers entryBuffers;
+
+  // If an exception is thrown by one of the other threads, it will be
+  // stored here.
+  protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+
+  // Wait/notify for when data has been produced by the reader thread,
+  // consumed by the reader thread, or an exception occurred
+  Object dataAvailable = new Object();
+
+  
   /**
    * Create a new HLogSplitter using the given {@link Configuration} and the
    * <code>hbase.hlog.splitter.impl</code> property to derived the instance
    * class to use.
-   * 
-   * @param conf
-   * @return New HLogSplitter instance
-   */
-  public static HLogSplitter createLogSplitter(Configuration conf) {
+   *
+   * @param rootDir hbase directory
+   * @param srcDir logs directory
+   * @param oldLogDir directory where processed logs are archived to
+   * @param logfiles the list of log files to split
+   */
+  public static HLogSplitter createLogSplitter(Configuration conf,
+      final Path rootDir, final Path srcDir,
+      Path oldLogDir, final FileSystem fs)  {
+
     @SuppressWarnings("unchecked")
     Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf
         .getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
     try {
-      return splitterClass.newInstance();
+       Constructor<? extends HLogSplitter> constructor =
+         splitterClass.getConstructor(
+          Configuration.class, // conf
+          Path.class, // rootDir
+          Path.class, // srcDir
+          Path.class, // oldLogDir
+          FileSystem.class); // fs
+      return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs);
+    } catch (IllegalArgumentException e) {
+      throw new RuntimeException(e);
     } catch (InstantiationException e) {
       throw new RuntimeException(e);
     } catch (IllegalAccessException e) {
       throw new RuntimeException(e);
+    } catch (InvocationTargetException e) {
+      throw new RuntimeException(e);
+    } catch (SecurityException e) {
+      throw new RuntimeException(e);
+    } catch (NoSuchMethodException e) {
+      throw new RuntimeException(e);
     }
   }
 
-  
-  
-  // Private immutable datastructure to hold Writer and its Path.
-  private final static class WriterAndPath {
-    final Path p;
-    final Writer w;
-
-    WriterAndPath(final Path p, final Writer w) {
-      this.p = p;
-      this.w = w;
-    }
+  public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
+      Path oldLogDir, FileSystem fs) {
+    this.conf = conf;
+    this.rootDir = rootDir;
+    this.srcDir = srcDir;
+    this.oldLogDir = oldLogDir;
+    this.fs = fs;
+    
+    entryBuffers = new EntryBuffers(
+        conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
+            128*1024*1024));
+    outputSink = new OutputSink();
   }
-
+  
   /**
    * Split up a bunch of regionserver commit log files that are no longer being
    * written to, into new files, one per region for region to replay on startup.
    * Delete the old log files when finished.
    * 
-   * @param rootDir
-   *          qualified root directory of the HBase instance
-   * @param srcDir
-   *          Directory of log files to split: e.g.
-   *          <code>${ROOTDIR}/log_HOST_PORT</code>
-   * @param oldLogDir
-   *          directory where processed (split) logs will be archived to
-   * @param fs
-   *          FileSystem
-   * @param conf
-   *          Configuration
-   * @throws IOException
-   *           will throw if corrupted hlogs aren't tolerated
+   * @throws IOException will throw if corrupted hlogs aren't tolerated
    * @return the list of splits
    */
-  public List<Path> splitLog(final Path rootDir, final Path srcDir,
-      Path oldLogDir, final FileSystem fs, final Configuration conf)
+  public List<Path> splitLog()
       throws IOException {
+    Preconditions.checkState(!hasSplit,
+        "An HLogSplitter instance may only be used once");
+    hasSplit = true;
 
     long startTime = System.currentTimeMillis();
     List<Path> splits = null;
@@ -148,29 +183,8 @@ public class HLogSplitter {
     }
     LOG.info("Splitting " + logfiles.length + " hlog(s) in "
         + srcDir.toString());
-    splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
-    try {
-      FileStatus[] files = fs.listStatus(srcDir);
-      for (FileStatus file : files) {
-        Path newPath = HLog.getHLogArchivePath(oldLogDir, file.getPath());
-        LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to "
-            + FSUtils.getPath(newPath));
-        if (!fs.rename(file.getPath(), newPath)) {
-          throw new IOException("Unable to rename " + file.getPath() +
-            " to " + newPath);
-        }
-      }
-      LOG.debug("Moved " + files.length + " log files to "
-          + FSUtils.getPath(oldLogDir));
-      if (!fs.delete(srcDir, true)) {
-        throw new IOException("Unable to delete " + srcDir);
-      }
-    } catch (IOException e) {
-      e = RemoteExceptionHandler.checkIOException(e);
-      IOException io = new IOException("Cannot delete: " + srcDir);
-      io.initCause(e);
-      throw io;
-    }
+    splits = splitLog(logfiles);
+    
     splitTime = System.currentTimeMillis() - startTime;
     LOG.info("hlog file splitting completed in " + splitTime +
         " ms for " + srcDir.toString());
@@ -190,100 +204,77 @@ public class HLogSplitter {
   public long getSize() {
     return this.splitSize;
   }
+
+  /**
+   * @return a map from encoded region ID to the number of edits written out
+   * for that region.
+   */
+  Map<byte[], Long> getOutputCounts() {
+    Preconditions.checkState(hasSplit);
+    return outputSink.getOutputCounts();
+  }
    
   /**
-   * Sorts the HLog edits in the given list of logfiles (that are a mix of edits
+   * Splits the HLog edits in the given list of logfiles (that are a mix of edits
    * on multiple regions) by region and then splits them per region directories,
    * in batches of (hbase.hlog.split.batch.size)
    * 
-   * A batch consists of a set of log files that will be sorted in a single map
-   * of edits indexed by region the resulting map will be concurrently written
-   * by multiple threads to their corresponding regions
+   * This process is split into multiple threads. In the main thread, we loop
+   * through the logs to be split. For each log, we:
+   * <ul>
+   *   <li> Recover it (take and drop HDFS lease) to ensure no other process can write</li>
+   *   <li> Read each edit (see {@link #parseHLog}</li>
+   *   <li> Mark as "processed" or "corrupt" depending on outcome</li>
+   * </ul>
    * 
-   * Each batch consists of more more log files that are - recovered (files is
-   * opened for append then closed to ensure no process is writing into it) -
-   * parsed (each edit in the log is appended to a list of edits indexed by
-   * region see {@link #parseHLog} for more details) - marked as either
-   * processed or corrupt depending on parsing outcome - the resulting edits
-   * indexed by region are concurrently written to their corresponding region
-   * region directories - original files are then archived to a different
-   * directory
+   * Each edit is passed into the EntryBuffers instance, which takes care of
+   * memory accounting and splitting the edits by region.
    * 
+   * The OutputSink object then manages N other WriterThreads which pull chunks
+   * of edits from EntryBuffers and write them to the output region directories.
    * 
-   * 
-   * @param rootDir
-   *          hbase directory
-   * @param srcDir
-   *          logs directory
-   * @param oldLogDir
-   *          directory where processed logs are archived to
-   * @param logfiles
-   *          the list of log files to split
-   * @param fs
-   * @param conf
-   * @return
-   * @throws IOException
+   * After the process is complete, the log files are archived to a separate
+   * directory.
    */
-  private List<Path> splitLog(final Path rootDir, final Path srcDir,
-      Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
-      final Configuration conf) throws IOException {
+  private List<Path> splitLog(final FileStatus[] logfiles) throws IOException {
     List<Path> processedLogs = new ArrayList<Path>();
     List<Path> corruptedLogs = new ArrayList<Path>();
-    final Map<byte[], WriterAndPath> logWriters = Collections
-        .synchronizedMap(new TreeMap<byte[], WriterAndPath>(
-            Bytes.BYTES_COMPARATOR));
     List<Path> splits = null;
 
-    // Number of logs in a read batch
-    // More means faster but bigger mem consumption
-    // TODO make a note on the conf rename and update hbase-site.xml if needed
-    int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
 
     splitSize = 0;
+
+    outputSink.startWriterThreads(entryBuffers);
     
     try {
-      int i = -1;
-      while (i < logfiles.length) {
-        final Map<byte[], LinkedList<Entry>> editsByRegion = new TreeMap<byte[], LinkedList<Entry>>(
-            Bytes.BYTES_COMPARATOR);
-        for (int j = 0; j < logFilesPerStep; j++) {
-          i++;
-          if (i == logfiles.length) {
-            break;
-          }
-          FileStatus log = logfiles[i];
-          Path logPath = log.getPath();
-          long logLength = log.getLen();
-          splitSize += logLength;
-          LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length
-              + ": " + logPath + ", length=" + logLength);
-          try {
-            recoverFileLease(fs, logPath, conf);
-            parseHLog(log, editsByRegion, fs, conf);
+      int i = 0;
+      for (FileStatus log : logfiles) {
+       Path logPath = log.getPath();
+        long logLength = log.getLen();
+        splitSize += logLength;
+        LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
+            + ": " + logPath + ", length=" + logLength);
+        try {
+          recoverFileLease(fs, logPath, conf);
+          parseHLog(log, entryBuffers, fs, conf);
+          processedLogs.add(logPath);
+        } catch (IOException e) {
+          // If the IOE resulted from bad file format,
+          // then this problem is idempotent and retrying won't help
+          if (e.getCause() instanceof ParseException) {
+            LOG.warn("ParseException from hlog " + logPath + ".  continuing");
             processedLogs.add(logPath);
-          } catch (EOFException eof) {
-              // truncated files are expected if a RS crashes (see HBASE-2643)
-              LOG.info("EOF from hlog " + logPath + ".  continuing");
-              processedLogs.add(logPath);
-          } catch (IOException e) {
-            // If the IOE resulted from bad file format,
-            // then this problem is idempotent and retrying won't help
-            if (e.getCause() instanceof ParseException) {
-              LOG.warn("ParseException from hlog " + logPath + ".  continuing");
-              processedLogs.add(logPath);
+          } else {
+            if (skipErrors) {
+              LOG.info("Got while parsing hlog " + logPath +
+                ". Marking as corrupted", e);
+              corruptedLogs.add(logPath);
             } else {
-              if (skipErrors) {
-                LOG.info("Got while parsing hlog " + logPath +
-                  ". Marking as corrupted", e);
-                corruptedLogs.add(logPath);
-              } else {
-                throw e;
-              }
+              throw e;
             }
           }
         }
-        writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
       }
       if (fs.listStatus(srcDir).length > processedLogs.size()
           + corruptedLogs.size()) {
@@ -291,86 +282,13 @@ public class HLogSplitter {
             "Discovered orphan hlog after split. Maybe the "
             + "HRegionServer was not dead when we started");
       }
-      archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
+      archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);      
     } finally {
-      splits = new ArrayList<Path>(logWriters.size());
-      for (WriterAndPath wap : logWriters.values()) {
-        wap.w.close();
-        splits.add(wap.p);
-        LOG.debug("Closed " + wap.p);
-      }
+      splits = outputSink.finishWritingAndClose();
     }
     return splits;
   }
 
-
-  /**
-   * Takes splitLogsMap and concurrently writes them to region directories using a thread pool
-   *
-   * @param splitLogsMap map that contains the log splitting result indexed by region
-   * @param logWriters map that contains a writer per region
-   * @param rootDir hbase root dir
-   * @param fs
-   * @param conf
-   * @throws IOException
-   */
-  private void writeEditsBatchToRegions(
-      final Map<byte[], LinkedList<Entry>> splitLogsMap,
-      final Map<byte[], WriterAndPath> logWriters, final Path rootDir,
-      final FileSystem fs, final Configuration conf) 
-	throws IOException {
-    // Number of threads to use when log splitting to rewrite the logs.
-    // More means faster but bigger mem consumption.
-    int logWriterThreads = conf.getInt(
-        "hbase.regionserver.hlog.splitlog.writer.threads", 3);
-    boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
-    HashMap<byte[], Future> writeFutureResult = new HashMap<byte[], Future>();
-    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
-    builder.setNameFormat("SplitWriter-%1$d");
-    ThreadFactory factory = builder.build();
-    ThreadPoolExecutor threadPool =
-      (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, factory);
-    for (final byte[] region : splitLogsMap.keySet()) {
-      Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap,
-          region, fs, conf);
-      writeFutureResult.put(region, threadPool.submit(splitter));
-    }
-
-    threadPool.shutdown();
-    // Wait for all threads to terminate
-    try {
-      for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) {
-        String message = "Waiting for hlog writers to terminate, elapsed " + j
-            * 5 + " seconds";
-        if (j < 30) {
-          LOG.debug(message);
-        } else {
-          LOG.info(message);
-        }
-
-      }
-    } catch (InterruptedException ex) {
-      LOG.warn("Hlog writers were interrupted, possible data loss!");
-      if (!skipErrors) {
-        throw new IOException("Could not finish writing log entries", ex);
-        // TODO maybe we should fail here regardless if skipErrors is active or not
-      }
-    }
-
-    for (Map.Entry<byte[], Future> entry : writeFutureResult.entrySet()) {
-      try {
-        entry.getValue().get();
-      } catch (ExecutionException e) {
-        throw (new IOException(e.getCause()));
-      } catch (InterruptedException e1) {
-        LOG.warn("Writer for region " + Bytes.toString(entry.getKey())
-            + " was interrupted, however the write process should have "
-            + "finished. Throwing up ", e1);
-        throw (new IOException(e1.getCause()));
-      }
-    }
-  }
-  
   /**
    * Moves processed logs to a oldLogDir after successful processing Moves
    * corrupted logs (any log that couldn't be successfully parsed to corruptDir
@@ -383,7 +301,9 @@ public class HLogSplitter {
    * @param conf
    * @throws IOException
    */
-  private static void archiveLogs(final List<Path> corruptedLogs,
+  private static void archiveLogs(
+      final Path srcDir,
+      final List<Path> corruptedLogs,
       final List<Path> processedLogs, final Path oldLogDir,
       final FileSystem fs, final Configuration conf) throws IOException {
     final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
@@ -411,6 +331,10 @@ public class HLogSplitter {
         LOG.info("Archived processed log " + p + " to " + newPath);
       }
     }
+    
+    if (!fs.delete(srcDir, true)) {
+      throw new IOException("Unable to delete src dir: " + srcDir);
+    }
   }
 
   /**
@@ -460,7 +384,7 @@ public class HLogSplitter {
    * @throws IOException if hlog is corrupted, or can't be open
    */
   private void parseHLog(final FileStatus logfile,
-		final Map<byte[], LinkedList<Entry>> splitLogsMap, final FileSystem fs,
+		EntryBuffers entryBuffers, final FileSystem fs,
     final Configuration conf) 
 	throws IOException {
     // Check for possibly empty file. With appends, currently Hadoop reports a
@@ -490,15 +414,11 @@ public class HLogSplitter {
     try {
       Entry entry;
       while ((entry = in.next()) != null) {
-        byte[] region = entry.getKey().getEncodedRegionName();
-        LinkedList<Entry> queue = splitLogsMap.get(region);
-        if (queue == null) {
-          queue = new LinkedList<Entry>();
-          splitLogsMap.put(region, queue);
-        }
-        queue.addLast(entry);
+        entryBuffers.appendEntry(entry);
         editsCount++;
       }
+    } catch (InterruptedException ie) {
+      throw new RuntimeException(ie);
     } finally {
       LOG.debug("Pushed=" + editsCount + " entries from " + path);
       try {
@@ -506,76 +426,30 @@ public class HLogSplitter {
           in.close();
         }
       } catch (IOException e) {
-        LOG
-            .warn("Close log reader in finally threw exception -- continuing",
-                e);
+        LOG.warn("Close log reader in finally threw exception -- continuing",
+                 e);
       }
     }
   }
-  
-  private Callable<Void> createNewSplitter(final Path rootDir,
-      final Map<byte[], WriterAndPath> logWriters,
-      final Map<byte[], LinkedList<Entry>> logEntries, final byte[] region,
-      final FileSystem fs, final Configuration conf) {
-    return new Callable<Void>() {
-      public String getName() {
-        return "Split writer thread for region " + Bytes.toStringBinary(region);
-      }
 
-      @Override
-      public Void call() throws IOException {
-        LinkedList<Entry> entries = logEntries.get(region);
-        LOG.debug(this.getName() + " got " + entries.size() + " to process");
-        long threadTime = System.currentTimeMillis();
-        try {
-          int editsCount = 0;
-          WriterAndPath wap = logWriters.get(region);
-          for (Entry logEntry : entries) {
-            if (wap == null) {
-              Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir);
-              if (regionedits == null) {
-                // we already print a message if it's null in getRegionSplitEditsPath
-                break;
-              }
-              if (fs.exists(regionedits)) {
-                LOG.warn("Found existing old edits file. It could be the "
-                    + "result of a previous failed split attempt. Deleting "
-                    + regionedits + ", length="
-                    + fs.getFileStatus(regionedits).getLen());
-                if (!fs.delete(regionedits, false)) {
-                  LOG.warn("Failed delete of old " + regionedits);
-                }
-              }
-              Writer w = createWriter(fs, regionedits, conf);
-              wap = new WriterAndPath(regionedits, w);
-              logWriters.put(region, wap);
-              LOG.debug("Creating writer path=" + regionedits + " region="
-                  + Bytes.toStringBinary(region));
-            }
-            wap.w.append(logEntry);
-            editsCount++;
-          }
-          LOG.debug(this.getName() + " Applied " + editsCount
-              + " total edits to " + Bytes.toStringBinary(region) + " in "
-              + (System.currentTimeMillis() - threadTime) + "ms");
-        } catch (IOException e) {
-          e = RemoteExceptionHandler.checkIOException(e);
-          LOG.fatal(this.getName() + " Got while writing log entry to log", e);
-          throw e;
-        }
-        return null;
-      }
-    };
+  private void writerThreadError(Throwable t) {
+    thrown.compareAndSet(null, t);
+  }
+  
+  /**
+   * Check for errors in the writer threads. If any is found, rethrow it.
+   */
+  private void checkForErrors() throws IOException {
+    Throwable thrown = this.thrown.get();
+    if (thrown == null) return;
+    if (thrown instanceof IOException) {
+      throw (IOException)thrown;
+    } else {
+      throw new RuntimeException(thrown);
+    }
   }
-
   /**
    * Create a new {@link Writer} for writing log splits.
-   * 
-   * @param fs
-   * @param logfile
-   * @param conf
-   * @return A new Writer instance
-   * @throws IOException
    */
   protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
       throws IOException {
@@ -584,16 +458,410 @@ public class HLogSplitter {
 
   /**
    * Create a new {@link Reader} for reading logs to split.
-   * 
-   * @param fs
-   * @param curLogFile
-   * @param conf
-   * @return A new Reader instance
-   * @throws IOException
    */
   protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
       throws IOException {
     return HLog.getReader(fs, curLogFile, conf);
   }
 
+
+  /**
+   * Class which accumulates edits and separates them into a buffer per region
+   * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
+   * a predefined threshold.
+   * 
+   * Writer threads then pull region-specific buffers from this class.
+   */
+  class EntryBuffers {
+    Map<byte[], RegionEntryBuffer> buffers =
+      new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
+    
+    /* Track which regions are currently in the middle of writing. We don't allow
+       an IO thread to pick up bytes from a region if we're already writing
+       data for that region in a different IO thread. */ 
+    Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+
+    long totalBuffered = 0;
+    long maxHeapUsage;
+    
+    EntryBuffers(long maxHeapUsage) {
+      this.maxHeapUsage = maxHeapUsage;
+    }
+
+    /**
+     * Append a log entry into the corresponding region buffer.
+     * Blocks if the total heap usage has crossed the specified threshold.
+     * 
+     * @throws InterruptedException
+     * @throws IOException 
+     */
+    void appendEntry(Entry entry) throws InterruptedException, IOException {
+      HLogKey key = entry.getKey();
+      
+      RegionEntryBuffer buffer;
+      synchronized (this) {
+        buffer = buffers.get(key.getEncodedRegionName());
+        if (buffer == null) {
+          buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
+          buffers.put(key.getEncodedRegionName(), buffer);
+        }
+        long incrHeap = buffer.appendEntry(entry);
+        totalBuffered += incrHeap;
+      }
+
+      // If we crossed the chunk threshold, wait for more space to be available
+      synchronized (dataAvailable) {
+        while (totalBuffered > maxHeapUsage && thrown == null) {
+          LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
+          dataAvailable.wait(3000);
+        }
+        dataAvailable.notifyAll();
+      }
+      checkForErrors();
+    }
+
+    synchronized RegionEntryBuffer getChunkToWrite() {
+      long biggestSize=0;
+      byte[] biggestBufferKey=null;
+
+      for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
+        long size = entry.getValue().heapSize();
+        if (size > biggestSize && !currentlyWriting.contains(entry.getKey())) {
+          biggestSize = size;
+          biggestBufferKey = entry.getKey();
+        }
+      }
+      if (biggestBufferKey == null) {
+        return null;
+      }
+
+      RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
+      currentlyWriting.add(biggestBufferKey);
+      return buffer;
+    }
+
+    void doneWriting(RegionEntryBuffer buffer) {
+      synchronized (this) {
+        boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
+        assert removed;
+      }
+      long size = buffer.heapSize();
+
+      synchronized (dataAvailable) {
+        totalBuffered -= size;
+        // We may unblock writers
+        dataAvailable.notifyAll();
+      }
+    }
+    
+    synchronized boolean isRegionCurrentlyWriting(byte[] region) {
+      return currentlyWriting.contains(region);
+    }
+  }
+
+  /**
+   * A buffer of some number of edits for a given region.
+   * This accumulates edits and also provides a memory optimization in order to
+   * share a single byte array instance for the table and region name.
+   * Also tracks memory usage of the accumulated edits.
+   */
+  static class RegionEntryBuffer implements HeapSize {
+    long heapInBuffer = 0;
+    List<Entry> entryBuffer;
+    byte[] tableName;
+    byte[] encodedRegionName;
+
+    RegionEntryBuffer(byte[] table, byte[] region) {
+      this.tableName = table;
+      this.encodedRegionName = region;
+      this.entryBuffer = new LinkedList<Entry>();
+    }
+
+    long appendEntry(Entry entry) {
+      internify(entry);
+      entryBuffer.add(entry);
+      long incrHeap = entry.getEdit().heapSize() +
+        ClassSize.align(2 * ClassSize.REFERENCE) + // HLogKey pointers
+        0; // TODO linkedlist entry
+      heapInBuffer += incrHeap;
+      return incrHeap;
+    }
+
+    private void internify(Entry entry) {
+      HLogKey k = entry.getKey();
+      k.internTableName(this.tableName);
+      k.internEncodedRegionName(this.encodedRegionName);
+    }
+
+    public long heapSize() {
+      return heapInBuffer;
+    }
+  }
+
+
+  class WriterThread extends Thread {
+    private volatile boolean shouldStop = false;
+    
+    WriterThread(int i) {
+      super("WriterThread-" + i);
+    }
+    
+    public void run()  {
+      try {
+        doRun();
+      } catch (Throwable t) {
+        LOG.error("Error in log splitting write thread", t);
+        writerThreadError(t);
+      }
+    }
+    
+    private void doRun() throws IOException {
+      LOG.debug("Writer thread " + this + ": starting");
+      while (true) {
+        RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
+        if (buffer == null) {
+          // No data currently available, wait on some more to show up
+          synchronized (dataAvailable) {
+            if (shouldStop) return;
+            try {
+              dataAvailable.wait(1000);
+            } catch (InterruptedException ie) {
+              if (!shouldStop) {
+                throw new RuntimeException(ie);
+              }
+            }
+          }
+          continue;
+        }
+        
+        assert buffer != null;
+        try {
+          writeBuffer(buffer);
+        } finally {
+          entryBuffers.doneWriting(buffer);
+        }
+      }
+    }
+       
+    private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
+      List<Entry> entries = buffer.entryBuffer;      
+      if (entries.isEmpty()) {
+        LOG.warn(this.getName() + " got an empty buffer, skipping");
+        return;
+      }
+
+      WriterAndPath wap = null;
+      
+      long startTime = System.nanoTime();
+      try {
+        int editsCount = 0;
+
+        for (Entry logEntry : entries) {
+          if (wap == null) {
+            wap = outputSink.getWriterAndPath(logEntry);
+            if (wap == null) {
+              // getWriterAndPath decided we don't need to write these edits
+              // Message was already logged
+              return;
+            }
+          }
+          wap.w.append(logEntry);
+          editsCount++;
+        }
+        // Pass along summary statistics
+        wap.incrementEdits(editsCount);
+        wap.incrementNanoTime(System.nanoTime() - startTime);
+      } catch (IOException e) {
+        e = RemoteExceptionHandler.checkIOException(e);
+        LOG.fatal(this.getName() + " Got while writing log entry to log", e);
+        throw e;
+      }
+    }
+    
+    void finish() {
+      shouldStop = true;
+    }
+  }
+
+  /**
+   * Class that manages the output streams from the log splitting process.
+   */
+  class OutputSink {
+    private final Map<byte[], WriterAndPath> logWriters = Collections.synchronizedMap(
+          new TreeMap<byte[], WriterAndPath>(Bytes.BYTES_COMPARATOR));
+    private final List<WriterThread> writerThreads = Lists.newArrayList();
+    
+    /* Set of regions which we've decided should not output edits */
+    private final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(
+        new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
+    
+    private boolean hasClosed = false;    
+    
+    /**
+     * Start the threads that will pump data from the entryBuffers
+     * to the output files.
+     * @return the list of started threads
+     */
+    synchronized void startWriterThreads(EntryBuffers entryBuffers) {
+      // More threads could potentially write faster at the expense
+      // of causing more disk seeks as the logs are split.
+      // 3. After a certain setting (probably around 3) the
+      // process will be bound on the reader in the current
+      // implementation anyway.
+      int numThreads = conf.getInt(
+          "hbase.regionserver.hlog.splitlog.writer.threads", 3);
+
+      for (int i = 0; i < numThreads; i++) {
+        WriterThread t = new WriterThread(i);
+        t.start();
+        writerThreads.add(t);
+      }
+    }
+    
+    List<Path> finishWritingAndClose() throws IOException {
+      LOG.info("Waiting for split writer threads to finish");
+      for (WriterThread t : writerThreads) {
+        t.finish();
+      }
+      for (WriterThread t: writerThreads) {
+        try {
+          t.join();
+        } catch (InterruptedException ie) {
+          throw new IOException(ie);
+        }
+        checkForErrors();
+      }
+      LOG.info("Split writers finished");
+      
+      return closeStreams();
+    }
+
+    /**
+     * Close all of the output streams.
+     * @return the list of paths written.
+     */
+    private List<Path> closeStreams() throws IOException {
+      Preconditions.checkState(!hasClosed);
+      
+      List<Path> paths = new ArrayList<Path>();
+      List<IOException> thrown = Lists.newArrayList();
+      
+      for (WriterAndPath wap : logWriters.values()) {
+        try {
+          wap.w.close();
+        } catch (IOException ioe) {
+          LOG.error("Couldn't close log at " + wap.p, ioe);
+          thrown.add(ioe);
+          continue;
+        }
+        paths.add(wap.p);
+        LOG.info("Closed path " + wap.p +" (wrote " + wap.editsWritten + " edits in "
+            + (wap.nanosSpent / 1000/ 1000) + "ms)");
+      }
+      if (!thrown.isEmpty()) {
+        throw MultipleIOException.createIOException(thrown);
+      }
+      
+      hasClosed = true;
+      return paths;
+    }
+
+    /**
+     * Get a writer and path for a log starting at the given entry.
+     * 
+     * This function is threadsafe so long as multiple threads are always
+     * acting on different regions.
+     * 
+     * @return null if this region shouldn't output any logs
+     */
+    WriterAndPath getWriterAndPath(Entry entry) throws IOException {
+    
+      byte region[] = entry.getKey().getEncodedRegionName();
+      WriterAndPath ret = logWriters.get(region);
+      if (ret != null) {
+        return ret;
+      }
+      
+      // If we already decided that this region doesn't get any output
+      // we don't need to check again.
+      if (blacklistedRegions.contains(region)) {
+        return null;
+      }
+      
+      // Need to create writer
+      Path regionedits = getRegionSplitEditsPath(fs,
+          entry, rootDir);
+      if (regionedits == null) {
+        // Edits dir doesn't exist
+        blacklistedRegions.add(region);
+        return null;
+      }
+      deletePreexistingOldEdits(regionedits);
+      Writer w = createWriter(fs, regionedits, conf);
+      ret = new WriterAndPath(regionedits, w);
+      logWriters.put(region, ret);
+      LOG.debug("Creating writer path=" + regionedits + " region="
+          + Bytes.toStringBinary(region));
+
+      return ret;
+    }
+
+    /**
+     * If the specified path exists, issue a warning and delete it.
+     */
+    private void deletePreexistingOldEdits(Path regionedits) throws IOException {
+      if (fs.exists(regionedits)) {
+        LOG.warn("Found existing old edits file. It could be the "
+            + "result of a previous failed split attempt. Deleting "
+            + regionedits + ", length="
+            + fs.getFileStatus(regionedits).getLen());
+        if (!fs.delete(regionedits, false)) {
+          LOG.warn("Failed delete of old " + regionedits);
+        }
+      }
+    }
+
+    /**
+     * @return a map from encoded region ID to the number of edits written out
+     * for that region.
+     */
+    private Map<byte[], Long> getOutputCounts() {
+      TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(
+          Bytes.BYTES_COMPARATOR);
+      synchronized (logWriters) {
+        for (Map.Entry<byte[], WriterAndPath> entry : logWriters.entrySet()) {
+          ret.put(entry.getKey(), entry.getValue().editsWritten);
+        }
+      }
+      return ret;
+    }
+  }
+
+  /**
+   *  Private data structure that wraps a Writer and its Path,
+   *  also collecting statistics about the data written to this
+   *  output.
+   */
+  private final static class WriterAndPath {
+    final Path p;
+    final Writer w;
+
+    /* Count of edits written to this path */
+    long editsWritten = 0;
+    /* Number of nanos spent writing to this log */
+    long nanosSpent = 0;
+
+    WriterAndPath(final Path p, final Writer w) {
+      this.p = p;
+      this.w = w;
+    }
+
+    void incrementEdits(int edits) {
+      editsWritten += edits;
+    }
+
+    void incrementNanoTime(long nanos) {
+      nanosSpent += nanos;
+    }
+  }
 }

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java Mon Dec 20 20:36:17 2010
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
+import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -66,7 +67,7 @@ import org.apache.hadoop.io.Writable;
  * is an old style KeyValue or the new style WALEdit.
  *
  */
-public class WALEdit implements Writable {
+public class WALEdit implements Writable, HeapSize {
 
   private final int VERSION_2 = -1;
 
@@ -154,7 +155,19 @@ public class WALEdit implements Writable
         out.writeInt(scopes.get(key));
       }
     }
+  }
 
+  public long heapSize() {
+    long ret = 0;
+    for (KeyValue kv : kvs) {
+      ret += kv.heapSize();
+    }
+    if (scopes != null) {
+      ret += ClassSize.TREEMAP;
+      ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY);
+      // TODO this isn't quite right, need help here
+    }
+    return ret;
   }
 
   public String toString() {

Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Mon Dec 20 20:36:17 2010
@@ -164,10 +164,10 @@ public class TestHLog  {
         log.rollWriter();
       }
       log.close();
-      HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+      HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+          hbaseDir, logdir, this.oldLogDir, this.fs);
       List<Path> splits =
-        logSplitter.splitLog(hbaseDir, logdir,
-          this.oldLogDir, this.fs, conf);
+        logSplitter.splitLog();
       verifySplits(splits, howmany);
       log = null;
     } finally {

Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java Mon Dec 20 20:36:17 2010
@@ -24,16 +24,28 @@ import static org.junit.Assert.*;
 import java.io.IOException;
 import java.util.NavigableSet;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValueTestUtil;
+import org.apache.hadoop.hbase.MultithreadedTestUtil;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
+import static org.mockito.Mockito.mock;
 
 /**
  * Simple testing of a few HLog methods.
  */
 public class TestHLogMethods {
+  private static final byte[] TEST_REGION = Bytes.toBytes("test_region");;
+  private static final byte[] TEST_TABLE = Bytes.toBytes("test_table");
+  
   private final HBaseTestingUtility util = new HBaseTestingUtility();
 
   /**
@@ -84,4 +96,71 @@ public class TestHLogMethods {
     FSDataOutputStream fdos = fs.create(new Path(testdir, name), true);
     fdos.close();
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testRegionEntryBuffer() throws Exception {
+    HLogSplitter.RegionEntryBuffer reb = new HLogSplitter.RegionEntryBuffer(
+        TEST_TABLE, TEST_REGION);
+    assertEquals(0, reb.heapSize());
+
+    reb.appendEntry(createTestLogEntry(1));
+    assertTrue(reb.heapSize() > 0);
+  }
+  
+  @Test
+  public void testEntrySink() throws Exception {
+    Configuration conf = new Configuration();
+    HLogSplitter splitter = HLogSplitter.createLogSplitter(
+        conf, mock(Path.class), mock(Path.class), mock(Path.class),
+        mock(FileSystem.class));
+
+    EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
+    for (int i = 0; i < 1000; i++) {
+      HLog.Entry entry = createTestLogEntry(i);
+      sink.appendEntry(entry);
+    }
+    
+    assertTrue(sink.totalBuffered > 0);
+    long amountInChunk = sink.totalBuffered;
+    // Get a chunk
+    RegionEntryBuffer chunk = sink.getChunkToWrite();
+    assertEquals(chunk.heapSize(), amountInChunk);
+    
+    // Make sure it got marked that a thread is "working on this"
+    assertTrue(sink.isRegionCurrentlyWriting(TEST_REGION));
+
+    // Insert some more entries
+    for (int i = 0; i < 500; i++) {
+      HLog.Entry entry = createTestLogEntry(i);
+      sink.appendEntry(entry);
+    }    
+    // Asking for another chunk shouldn't work since the first one
+    // is still writing
+    assertNull(sink.getChunkToWrite());
+    
+    // If we say we're done writing the first chunk, then we should be able
+    // to get the second
+    sink.doneWriting(chunk);
+    
+    RegionEntryBuffer chunk2 = sink.getChunkToWrite();
+    assertNotNull(chunk2);
+    assertNotSame(chunk, chunk2);
+    long amountInChunk2 = sink.totalBuffered;
+    // The second chunk had fewer rows than the first
+    assertTrue(amountInChunk2 < amountInChunk);
+    
+    sink.doneWriting(chunk2);
+    assertEquals(0, sink.totalBuffered);
+  }
+  
+  private HLog.Entry createTestLogEntry(int i) {
+    long seq = i;
+    long now = i * 1000;
+    
+    WALEdit edit = new WALEdit();
+    edit.add(KeyValueTestUtil.create("row", "fam", "qual", 1234, "val"));
+    HLogKey key = new HLogKey(TEST_REGION, TEST_TABLE, seq, now);
+    HLog.Entry entry = new HLog.Entry(key, edit);
+    return entry;
+  }
+}

Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Mon Dec 20 20:36:17 2010
@@ -19,16 +19,14 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -39,7 +37,9 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -52,9 +52,16 @@ import org.apache.hadoop.hbase.util.Thre
 import org.apache.hadoop.ipc.RemoteException;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
 
 /**
  * Testing {@link HLog} splitting code.
@@ -119,11 +126,15 @@ public class TestHLogSplit {
 
   @Before
   public void setUp() throws Exception {
+    flushToConsole("Cleaning up cluster for new test\n"
+        + "--------------------------");
     conf = TEST_UTIL.getConfiguration();
     fs = TEST_UTIL.getDFSCluster().getFileSystem();
     FileStatus[] entries = fs.listStatus(new Path("/"));
+    flushToConsole("Num entries in /:" + entries.length);
     for (FileStatus dir : entries){
-      fs.delete(dir.getPath(), true);
+      assertTrue("Deleting " + dir.getPath(),
+          fs.delete(dir.getPath(), true));
     }
     seq = 0;
     regions = new ArrayList<String>();
@@ -161,18 +172,23 @@ public class TestHLogSplit {
   public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted()
   throws IOException {
     AtomicBoolean stop = new AtomicBoolean(false);
+    
+    FileStatus[] stats = fs.listStatus(new Path("/hbase/t1"));
+    assertTrue("Previous test should clean up table dir",
+        stats == null || stats.length == 0);
+
     generateHLogs(-1);
-    fs.initialize(fs.getUri(), conf);
+    
     try {
     (new ZombieNewLogWriterRegionServer(stop)).start();
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-      logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+        hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
     } finally {
       stop.set(true);
     }
   }
 
-
   @Test
   public void testSplitPreservesEdits() throws IOException{
     final String REGION = "region__1";
@@ -181,8 +197,9 @@ public class TestHLogSplit {
 
     generateHLogs(1, 10, -1);
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-    logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+      hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
 
     Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
     Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
@@ -202,8 +219,9 @@ public class TestHLogSplit {
     // initialize will create a new DFSClient with a new client ID
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-    logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+        hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
 
 
     for (String region : regions) {
@@ -224,8 +242,9 @@ public class TestHLogSplit {
     // initialize will create a new DFSClient with a new client ID
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-    logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+        hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
 
     for (String region : regions) {
       Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@@ -240,8 +259,9 @@ public class TestHLogSplit {
 
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-    logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+        hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
 
     for (String region : regions) {
       Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@@ -260,8 +280,9 @@ public class TestHLogSplit {
             Corruptions.APPEND_GARBAGE, true, fs);
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-    logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+        hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
     for (String region : regions) {
       Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
       assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
@@ -278,8 +299,9 @@ public class TestHLogSplit {
             Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-    logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+        hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
     for (String region : regions) {
       Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
       assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf));
@@ -296,8 +318,9 @@ public class TestHLogSplit {
     corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
             Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-    logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+        hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
 
     for (String region : regions) {
       Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@@ -323,13 +346,13 @@ public class TestHLogSplit {
     Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0");
       conf.setClass("hbase.regionserver.hlog.reader.impl",
           FaultySequenceFileLogReader.class, HLog.Reader.class);
-      String[] failureTypes = { "begin", "middle", "end" };
       for (FaultySequenceFileLogReader.FailureType  failureType : FaultySequenceFileLogReader.FailureType.values()) {
         conf.set("faultysequencefilelogreader.failuretype", failureType.name());
         generateHLogs(1, ENTRIES, -1);
         fs.initialize(fs.getUri(), conf);
-        HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-        logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+        HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+            hbaseDir, hlogDir, oldLogDir, fs);
+        logSplitter.splitLog();
         FileStatus[] archivedLogs = fs.listStatus(corruptDir);
         assertEquals("expected a different file", c1.getName(), archivedLogs[0]
             .getPath().getName());
@@ -358,8 +381,9 @@ public class TestHLogSplit {
       conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
       generateHLogs(Integer.MAX_VALUE);
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-    logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+        hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
     } finally {
       conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
           Reader.class);
@@ -383,9 +407,10 @@ public class TestHLogSplit {
       conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
       generateHLogs(-1);
       fs.initialize(fs.getUri(), conf);
-      HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+      HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+          hbaseDir, hlogDir, oldLogDir, fs);
       try {
-        logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+        logSplitter.splitLog();
       } catch (IOException e) {
         assertEquals(
             "if skip.errors is false all files should remain in place",
@@ -413,8 +438,9 @@ public class TestHLogSplit {
     corruptHLog(c1, Corruptions.TRUNCATE, true, fs);
 
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-    logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+        hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
 
     Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
     Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
@@ -437,8 +463,9 @@ public class TestHLogSplit {
     generateHLogs(-1);
 
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-    logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+        hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
 
     FileStatus[] archivedLogs = fs.listStatus(oldLogDir);
 
@@ -449,8 +476,9 @@ public class TestHLogSplit {
   public void testSplit() throws IOException {
     generateHLogs(-1);
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-    logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+        hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
 
     for (String region : regions) {
       Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@@ -464,12 +492,16 @@ public class TestHLogSplit {
   throws IOException {
     generateHLogs(-1);
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-    logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+        hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
     FileStatus [] statuses = null;
     try {
       statuses = fs.listStatus(hlogDir);
-      assertNull(statuses);
+      if (statuses != null) {
+        Assert.fail("Files left in log dir: " +
+            Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
+      }
     } catch (FileNotFoundException e) {
       // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
     }
@@ -516,8 +548,9 @@ public class TestHLogSplit {
     try {
       zombie.start();
       try {
-        HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-        logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+        HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+            hbaseDir, hlogDir, oldLogDir, fs);
+        logSplitter.splitLog();
       } catch (IOException ex) {/* expected */}
       int logFilesNumber = fs.listStatus(hlogDir).length;
 
@@ -549,11 +582,12 @@ public class TestHLogSplit {
 
     try {
       InstrumentedSequenceFileLogWriter.activateFailure = true;
-      HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-      logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+      HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+          hbaseDir, hlogDir, oldLogDir, fs);
+      logSplitter.splitLog();
 
     } catch (IOException e) {
-      assertEquals("java.io.IOException: This exception is instrumented and should only be thrown for testing", e.getMessage());
+      assertEquals("This exception is instrumented and should only be thrown for testing", e.getMessage());
       throw e;
     } finally {
       InstrumentedSequenceFileLogWriter.activateFailure = false;
@@ -561,7 +595,10 @@ public class TestHLogSplit {
   }
 
 
-//  @Test
+  // @Test TODO this test has been disabled since it was created!
+  // It currently fails because the second split doesn't output anything
+  // -- because there are no region dirs after we move aside the first
+  // split result
   public void testSplittingLargeNumberOfRegionsConsistency() throws IOException {
 
     regions.removeAll(regions);
@@ -572,8 +609,9 @@ public class TestHLogSplit {
     generateHLogs(1, 100, -1);
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-    logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+        hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
     fs.rename(oldLogDir, hlogDir);
     Path firstSplitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME) + ".first");
     Path splitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME));
@@ -582,7 +620,9 @@ public class TestHLogSplit {
 
 
     fs.initialize(fs.getUri(), conf);
-    logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    logSplitter = HLogSplitter.createLogSplitter(conf,
+        hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
 
     assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
   }
@@ -600,11 +640,161 @@ public class TestHLogSplit {
     Path regiondir = new Path(tabledir, region);
     fs.delete(regiondir, true);
 
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
-    logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+        hbaseDir, hlogDir, oldLogDir, fs);
+    logSplitter.splitLog();
     
     assertFalse(fs.exists(regiondir));
   }
+  
+  @Test
+  public void testIOEOnOutputThread() throws Exception {
+    conf.setBoolean(HBASE_SKIP_ERRORS, false);
+
+    generateHLogs(-1);
+
+    fs.initialize(fs.getUri(), conf);
+    // Set up a splitter that will throw an IOE on the output side
+    HLogSplitter logSplitter = new HLogSplitter(
+        conf, hbaseDir, hlogDir, oldLogDir, fs) {
+      protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
+      throws IOException {
+        HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
+        Mockito.doThrow(new IOException("Injected")).when(mockWriter).append(Mockito.<HLog.Entry>any());
+        return mockWriter;
+        
+      }
+    };
+    try {
+      logSplitter.splitLog();
+      fail("Didn't throw!");
+    } catch (IOException ioe) {
+      assertTrue(ioe.toString().contains("Injected"));
+    }
+  }
+  
+  /**
+   * Test log split process with fake data and lots of edits to trigger threading
+   * issues.
+   */
+  @Test
+  public void testThreading() throws Exception {
+    doTestThreading(20000, 128*1024*1024, 0);
+  }
+  
+  /**
+   * Test blocking behavior of the log split process if writers are writing slower
+   * than the reader is reading.
+   */
+  @Test
+  public void testThreadingSlowWriterSmallBuffer() throws Exception {
+    doTestThreading(200, 1024, 50);
+  }
+  
+  /**
+   * Sets up a log splitter with a mock reader and writer. The mock reader generates
+   * a specified number of edits spread across 5 regions. The mock writer optionally
+   * sleeps for each edit it is fed.
+   * *
+   * After the split is complete, verifies that the statistics show the correct number
+   * of edits output into each region.
+   * 
+   * @param numFakeEdits number of fake edits to push through pipeline
+   * @param bufferSize size of in-memory buffer
+   * @param writerSlowness writer threads will sleep this many ms per edit
+   */
+  private void doTestThreading(final int numFakeEdits,
+      final int bufferSize,
+      final int writerSlowness) throws Exception {
+
+    Configuration localConf = new Configuration(conf);
+    localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
+
+    // Create a fake log file (we'll override the reader to produce a stream of edits)
+    FSDataOutputStream out = fs.create(new Path(hlogDir, HLOG_FILE_PREFIX + ".fake"));
+    out.close();
+
+    // Make region dirs for our destination regions so the output doesn't get skipped
+    final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); 
+    makeRegionDirs(fs, regions);
+
+    // Create a splitter that reads and writes the data without touching disk
+    HLogSplitter logSplitter = new HLogSplitter(
+        localConf, hbaseDir, hlogDir, oldLogDir, fs) {
+      
+      /* Produce a mock writer that doesn't write anywhere */
+      protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
+      throws IOException {
+        HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
+        Mockito.doAnswer(new Answer<Void>() {
+          int expectedIndex = 0;
+          
+          @Override
+          public Void answer(InvocationOnMock invocation) {
+            if (writerSlowness > 0) {
+              try {
+                Thread.sleep(writerSlowness);
+              } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+              }
+            }
+            HLog.Entry entry = (Entry) invocation.getArguments()[0];
+            WALEdit edit = entry.getEdit();
+            List<KeyValue> keyValues = edit.getKeyValues();
+            assertEquals(1, keyValues.size());
+            KeyValue kv = keyValues.get(0);
+            
+            // Check that the edits come in the right order.
+            assertEquals(expectedIndex, Bytes.toInt(kv.getRow()));
+            expectedIndex++;
+            return null;
+          }
+        }).when(mockWriter).append(Mockito.<HLog.Entry>any());
+        return mockWriter;        
+      }
+      
+      
+      /* Produce a mock reader that generates fake entries */
+      protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
+      throws IOException {
+        Reader mockReader = Mockito.mock(Reader.class);
+        Mockito.doAnswer(new Answer<HLog.Entry>() {
+          int index = 0;
+
+          @Override
+          public HLog.Entry answer(InvocationOnMock invocation) throws Throwable {
+            if (index >= numFakeEdits) return null;
+           
+            // Generate r0 through r4 in round robin fashion
+            int regionIdx = index % regions.size();
+            byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
+            
+            HLog.Entry ret = createTestEntry(TABLE_NAME, region,
+                Bytes.toBytes((int)(index / regions.size())),
+                FAMILY, QUALIFIER, VALUE, index);
+            index++;
+            return ret;
+          }
+        }).when(mockReader).next();
+        return mockReader;
+      }
+    };
+    
+    logSplitter.splitLog();
+    
+    // Verify number of written edits per region
+
+    Map<byte[], Long> outputCounts = logSplitter.getOutputCounts();
+    for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
+      LOG.info("Got " + entry.getValue() + " output edits for region " + 
+          Bytes.toString(entry.getKey()));
+      
+      assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
+    }
+    assertEquals(regions.size(), outputCounts.size());
+  }
+  
+  
 
   /**
    * This thread will keep writing to the file after the split process has started
@@ -677,29 +867,19 @@ public class TestHLogSplit {
       if (stop.get()) {
         return;
       }
-      boolean splitStarted = false;
-      Path p = new Path(hbaseDir, new String(TABLE_NAME));
-      while (!splitStarted) {
-        try {
-          FileStatus [] statuses = fs.listStatus(p);
-          // In 0.20, listStatus comes back with a null if file doesn't exit.
-          // In 0.21, it throws FNFE.
-          if (statuses != null && statuses.length > 0) {
-            // Done.
-            break;
-          }
-        } catch (FileNotFoundException e) {
-          // Expected in hadoop 0.21
-        } catch (IOException e1) {
-          assertTrue("Failed to list status ", false);
-        }
-        flushToConsole("Juliet: split not started, sleeping a bit...");
-        Threads.sleep(100);
-      }
+      Path tableDir = new Path(hbaseDir, new String(TABLE_NAME));
+      Path regionDir = new Path(tableDir, regions.get(0));      
+      Path recoveredEdits = new Path(regionDir, HLogSplitter.RECOVERED_EDITS);
       String region = "juliet";
       Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet");
       try {
-        fs.mkdirs(new Path(new Path(hbaseDir, region), region));
+
+        while (!fs.exists(recoveredEdits) && !stop.get()) {
+          flushToConsole("Juliet: split not started, sleeping a bit...");
+          Threads.sleep(10);
+        }
+
+        fs.mkdirs(new Path(tableDir, region));
         HLog.Writer writer = HLog.createWriter(fs,
                 julietLog, conf);
         appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(),
@@ -722,10 +902,15 @@ public class TestHLogSplit {
     generateHLogs(NUM_WRITERS, ENTRIES, leaveOpen);
   }
 
-  private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException {
+  private void makeRegionDirs(FileSystem fs, List<String> regions) throws IOException {
     for (String region : regions) {
+      flushToConsole("Creating dir for region " + region);
       fs.mkdirs(new Path(tabledir, region));
     }
+  }
+  
+  private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException {
+    makeRegionDirs(fs, regions);
     for (int i = 0; i < writers; i++) {
       writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf);
       for (int j = 0; j < entries; j++) {
@@ -835,14 +1020,20 @@ public class TestHLogSplit {
                           byte[] value, long seq)
           throws IOException {
 
+    writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
+    writer.sync();
+    return seq;
+  }
+  
+  private HLog.Entry createTestEntry(
+      byte[] table, byte[] region,
+      byte[] row, byte[] family, byte[] qualifier,
+      byte[] value, long seq) {
     long time = System.nanoTime();
     WALEdit edit = new WALEdit();
     seq++;
     edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
-    writer.append(new HLog.Entry(new HLogKey(region, table, seq, time), edit));
-    writer.sync();
-    return seq;
-
+    return new HLog.Entry(new HLogKey(region, table, seq, time), edit);
   }
 
 
@@ -864,6 +1055,14 @@ public class TestHLogSplit {
   private int compareHLogSplitDirs(Path p1, Path p2) throws IOException {
     FileStatus[] f1 = fs.listStatus(p1);
     FileStatus[] f2 = fs.listStatus(p2);
+    assertNotNull("Path " + p1 + " doesn't exist", f1);
+    assertNotNull("Path " + p2 + " doesn't exist", f2);
+    
+    System.out.println("Files in " + p1 + ": " +
+        Joiner.on(",").join(FileUtil.stat2Paths(f1)));
+    System.out.println("Files in " + p2 + ": " +
+        Joiner.on(",").join(FileUtil.stat2Paths(f2)));
+    assertEquals(f1.length, f2.length);
 
     for (int i = 0; i < f1.length; i++) {
       // Regions now have a directory named RECOVERED_EDITS_DIR and in here

Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Mon Dec 20 20:36:17 2010
@@ -487,9 +487,9 @@ public class TestWALReplay {
    */
   private Path runWALSplit(final Configuration c) throws IOException {
     FileSystem fs = FileSystem.get(c);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c);
-    List<Path> splits = logSplitter.splitLog(this.hbaseRootDir, this.logDir,
-      this.oldLogDir, fs, c);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c,
+        this.hbaseRootDir, this.logDir, this.oldLogDir, fs);
+    List<Path> splits = logSplitter.splitLog();
     // Split should generate only 1 file since there's only 1 region
     assertEquals(1, splits.size());
     // Make sure the file exists