You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/04/18 19:16:15 UTC

svn commit: r1094662 [2/3] - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/regionserver/wal/ src/main/java/org/apache/hadoop/hbase/zookeeper/ ...

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1094662&r1=1094661&r2=1094662&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Mon Apr 18 17:16:15 2011
@@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.ut
 import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.text.ParseException;
@@ -44,6 +45,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
@@ -52,8 +54,12 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -73,7 +79,7 @@ public class HLogSplitter {
    */
   public static final String RECOVERED_EDITS = "recovered.edits";
 
-  
+
   static final Log LOG = LogFactory.getLog(HLogSplitter.class);
 
   private boolean hasSplit = false;
@@ -87,7 +93,7 @@ public class HLogSplitter {
   protected final Path oldLogDir;
   protected final FileSystem fs;
   protected final Configuration conf;
-  
+
   // Major subcomponents of the split process.
   // These are separated into inner classes to make testing easier.
   OutputSink outputSink;
@@ -101,17 +107,18 @@ public class HLogSplitter {
   // consumed by the reader thread, or an exception occurred
   Object dataAvailable = new Object();
 
-  
+
   /**
    * Create a new HLogSplitter using the given {@link Configuration} and the
    * <code>hbase.hlog.splitter.impl</code> property to derived the instance
    * class to use.
-   *
+   * <p>
    * @param conf
    * @param rootDir hbase directory
    * @param srcDir logs directory
    * @param oldLogDir directory where processed logs are archived to
    * @param fs FileSystem
+   * @return New HLogSplitter instance
    */
   public static HLogSplitter createLogSplitter(Configuration conf,
       final Path rootDir, final Path srcDir,
@@ -151,18 +158,18 @@ public class HLogSplitter {
     this.srcDir = srcDir;
     this.oldLogDir = oldLogDir;
     this.fs = fs;
-    
+
     entryBuffers = new EntryBuffers(
         conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
             128*1024*1024));
     outputSink = new OutputSink();
   }
-  
+
   /**
    * Split up a bunch of regionserver commit log files that are no longer being
    * written to, into new files, one per region for region to replay on startup.
    * Delete the old log files when finished.
-   * 
+   *
    * @throws IOException will throw if corrupted hlogs aren't tolerated
    * @return the list of splits
    */
@@ -172,7 +179,7 @@ public class HLogSplitter {
         "An HLogSplitter instance may only be used once");
     hasSplit = true;
 
-    long startTime = System.currentTimeMillis();
+    long startTime = EnvironmentEdgeManager.currentTimeMillis();
     List<Path> splits = null;
     if (!fs.exists(srcDir)) {
       // Nothing to do
@@ -186,20 +193,20 @@ public class HLogSplitter {
     LOG.info("Splitting " + logfiles.length + " hlog(s) in "
         + srcDir.toString());
     splits = splitLog(logfiles);
-    
-    splitTime = System.currentTimeMillis() - startTime;
+
+    splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
     LOG.info("hlog file splitting completed in " + splitTime +
         " ms for " + srcDir.toString());
     return splits;
   }
-  
+
   /**
    * @return time that this split took
    */
   public long getTime() {
     return this.splitTime;
   }
-  
+
   /**
    * @return aggregate size of hlogs that were split
    */
@@ -215,12 +222,12 @@ public class HLogSplitter {
     Preconditions.checkState(hasSplit);
     return outputSink.getOutputCounts();
   }
-   
+
   /**
    * Splits the HLog edits in the given list of logfiles (that are a mix of edits
    * on multiple regions) by region and then splits them per region directories,
    * in batches of (hbase.hlog.split.batch.size)
-   * 
+   * <p>
    * This process is split into multiple threads. In the main thread, we loop
    * through the logs to be split. For each log, we:
    * <ul>
@@ -228,13 +235,13 @@ public class HLogSplitter {
    *   <li> Read each edit (see {@link #parseHLog}</li>
    *   <li> Mark as "processed" or "corrupt" depending on outcome</li>
    * </ul>
-   * 
+   * <p>
    * Each edit is passed into the EntryBuffers instance, which takes care of
    * memory accounting and splitting the edits by region.
-   * 
+   * <p>
    * The OutputSink object then manages N other WriterThreads which pull chunks
    * of edits from EntryBuffers and write them to the output region directories.
-   * 
+   * <p>
    * After the process is complete, the log files are archived to a separate
    * directory.
    */
@@ -248,7 +255,7 @@ public class HLogSplitter {
     splitSize = 0;
 
     outputSink.startWriterThreads(entryBuffers);
-    
+
     try {
       int i = 0;
       for (FileStatus log : logfiles) {
@@ -257,36 +264,24 @@ public class HLogSplitter {
         splitSize += logLength;
         LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
             + ": " + logPath + ", length=" + logLength);
+        Reader in;
         try {
-          recoverFileLease(fs, logPath, conf);
-          parseHLog(log, entryBuffers, fs, conf);
-          processedLogs.add(logPath);
-        } catch (EOFException eof) {
-          // truncated files are expected if a RS crashes (see HBASE-2643)
-          LOG.info("EOF from hlog " + logPath + ". Continuing");
-          processedLogs.add(logPath);
-        } catch (FileNotFoundException fnfe) {
-          // A file may be missing if the region server was able to archive it
-          // before shutting down. This means the edits were persisted already
-          LOG.info("A log was missing " + logPath +
-              ", probably because it was moved by the" +
-              " now dead region server. Continuing");
-          processedLogs.add(logPath);
-        } catch (IOException e) {
-          // If the IOE resulted from bad file format,
-          // then this problem is idempotent and retrying won't help
-          if (e.getCause() instanceof ParseException) {
-            LOG.warn("Parse exception from hlog " + logPath + ".  continuing", e);
-            processedLogs.add(logPath);
-          } else {
-            if (skipErrors) {
-              LOG.info("Got while parsing hlog " + logPath +
-                ". Marking as corrupted", e);
-              corruptedLogs.add(logPath);
-            } else {
-              throw e;
+          in = getReader(fs, log, conf, skipErrors);
+          if (in != null) {
+            parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);
+            try {
+              in.close();
+            } catch (IOException e) {
+              LOG.warn("Close log reader threw exception -- continuing",
+                  e);
             }
           }
+          processedLogs.add(logPath);
+        } catch (CorruptedLogFileException e) {
+          LOG.info("Got while parsing hlog " + logPath +
+              ". Marking as corrupted", e);
+          corruptedLogs.add(logPath);
+          continue;
         }
       }
       if (fs.listStatus(srcDir).length > processedLogs.size()
@@ -295,7 +290,7 @@ public class HLogSplitter {
             "Discovered orphan hlog after split. Maybe the "
             + "HRegionServer was not dead when we started");
       }
-      archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);      
+      archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);
     } finally {
       splits = outputSink.finishWritingAndClose();
     }
@@ -303,10 +298,214 @@ public class HLogSplitter {
   }
 
   /**
+   * Splits a HLog file into a temporary staging area. tmpname is used to build
+   * the name of the staging area where the recovered-edits will be separated
+   * out by region and stored.
+   * <p>
+   * If the log file has N regions then N recovered.edits files will be
+   * produced. There is no buffering in this code. Instead it relies on the
+   * buffering in the SequenceFileWriter.
+   * <p>
+   * @param rootDir
+   * @param tmpname
+   * @param logfile
+   * @param fs
+   * @param conf
+   * @param reporter
+   * @return false if it is interrupted by the progress-able.
+   * @throws IOException
+   */
+  static public boolean splitLogFileToTemp(Path rootDir, String tmpname,
+      FileStatus logfile, FileSystem fs,
+      Configuration conf, CancelableProgressable reporter) throws IOException {
+    HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */,
+        fs);
+    return s.splitLogFileToTemp(logfile, tmpname, reporter);
+  }
+
+  public boolean splitLogFileToTemp(FileStatus logfile, String tmpname,
+      CancelableProgressable reporter)  throws IOException {
+    final Map<byte[], Object> logWriters = Collections.
+    synchronizedMap(new TreeMap<byte[], Object>(Bytes.BYTES_COMPARATOR));
+    boolean isCorrupted = false;
+
+    Object BAD_WRITER = new Object();
+
+    boolean progress_failed = false;
+
+    boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
+    int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
+    // How often to send a progress report (default 1/2 master timeout)
+    int period = conf.getInt("hbase.splitlog.report.period",
+        conf.getInt("hbase.splitlog.manager.timeout",
+            ZKSplitLog.DEFAULT_TIMEOUT) / 2);
+    Path logPath = logfile.getPath();
+    long logLength = logfile.getLen();
+    LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
+    Reader in = null;
+    try {
+      in = getReader(fs, logfile, conf, skipErrors);
+    } catch (CorruptedLogFileException e) {
+      LOG.warn("Could not get reader, corrupted log file " + logPath, e);
+      ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+      isCorrupted = true;
+    }
+    if (in == null) {
+      LOG.warn("Nothing to split in log file " + logPath);
+      return true;
+    }
+    long t = EnvironmentEdgeManager.currentTimeMillis();
+    long last_report_at = t;
+    if (reporter != null && reporter.progress() == false) {
+      return false;
+    }
+    int editsCount = 0;
+    Entry entry;
+    try {
+      while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
+        byte[] region = entry.getKey().getEncodedRegionName();
+        Object o = logWriters.get(region);
+        if (o == BAD_WRITER) {
+          continue;
+        }
+        WriterAndPath wap = (WriterAndPath)o;
+        if (wap == null) {
+          wap = createWAP(region, entry, rootDir, tmpname, fs, conf);
+          if (wap == null) {
+            logWriters.put(region, BAD_WRITER);
+          } else {
+            logWriters.put(region, wap);
+          }
+        }
+        wap.w.append(entry);
+        editsCount++;
+        if (editsCount % interval == 0) {
+          long t1 = EnvironmentEdgeManager.currentTimeMillis();
+          if ((t1 - last_report_at) > period) {
+            last_report_at = t;
+            if (reporter != null && reporter.progress() == false) {
+              progress_failed = true;
+              return false;
+            }
+          }
+        }
+      }
+    } catch (CorruptedLogFileException e) {
+      LOG.warn("Could not parse, corrupted log file " + logPath, e);
+      ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+      isCorrupted = true;
+    } catch (IOException e) {
+      e = RemoteExceptionHandler.checkIOException(e);
+      throw e;
+    } finally {
+      int n = 0;
+      for (Object o : logWriters.values()) {
+        long t1 = EnvironmentEdgeManager.currentTimeMillis();
+        if ((t1 - last_report_at) > period) {
+          last_report_at = t;
+          if ((progress_failed == false) && (reporter != null) &&
+              (reporter.progress() == false)) {
+            progress_failed = true;
+          }
+        }
+        if (o == BAD_WRITER) {
+          continue;
+        }
+        n++;
+        WriterAndPath wap = (WriterAndPath)o;
+        wap.w.close();
+        LOG.debug("Closed " + wap.p);
+      }
+      LOG.info("processed " + editsCount + " edits across " + n + " regions" +
+          " threw away edits for " + (logWriters.size() - n) + " regions" +
+          " log file = " + logPath +
+          " is corrupted = " + isCorrupted);
+    }
+    return true;
+  }
+
+  /**
+   * Completes the work done by splitLogFileToTemp by moving the
+   * recovered.edits from the staging area to the respective region server's
+   * directories.
+   * <p>
+   * It is invoked by SplitLogManager once it knows that one of the
+   * SplitLogWorkers have completed the splitLogFileToTemp() part. If the
+   * master crashes then this function might get called multiple times.
+   * <p>
+   * @param tmpname
+   * @param conf
+   * @throws IOException
+   */
+  public static void moveRecoveredEditsFromTemp(String tmpname,
+      String logfile, Configuration conf)
+  throws IOException{
+    Path rootdir = FSUtils.getRootDir(conf);
+    Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
+    moveRecoveredEditsFromTemp(tmpname, rootdir, oldLogDir, logfile, conf);
+  }
+
+  public static void moveRecoveredEditsFromTemp(String tmpname,
+      Path rootdir, Path oldLogDir,
+      String logfile, Configuration conf)
+  throws IOException{
+    List<Path> processedLogs = new ArrayList<Path>();
+    List<Path> corruptedLogs = new ArrayList<Path>();
+    FileSystem fs;
+    fs = rootdir.getFileSystem(conf);
+    Path logPath = new Path(logfile);
+    if (ZKSplitLog.isCorrupted(rootdir, tmpname, fs)) {
+      corruptedLogs.add(logPath);
+    } else {
+      processedLogs.add(logPath);
+    }
+    Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
+    List<FileStatus> files = listAll(fs, stagingDir);
+    for (FileStatus f : files) {
+      Path src = f.getPath();
+      Path dst = ZKSplitLog.stripSplitLogTempDir(rootdir, src);
+      if (ZKSplitLog.isCorruptFlagFile(dst)) {
+        continue;
+      }
+      if (fs.exists(dst)) {
+        fs.delete(dst, false);
+      } else {
+        Path dstdir = dst.getParent();
+        if (!fs.exists(dstdir)) {
+          if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
+        }
+      }
+      fs.rename(src, dst);
+      LOG.debug(" moved " + src + " => " + dst);
+    }
+    archiveLogs(null, corruptedLogs, processedLogs,
+        oldLogDir, fs, conf);
+    fs.delete(stagingDir, true);
+    return;
+  }
+
+  private static List<FileStatus> listAll(FileSystem fs, Path dir)
+  throws IOException {
+    List<FileStatus> fset = new ArrayList<FileStatus>(100);
+    FileStatus [] files = fs.listStatus(dir);
+    if (files != null) {
+      for (FileStatus f : files) {
+        if (f.isDir()) {
+          fset.addAll(listAll(fs, f.getPath()));
+        } else {
+          fset.add(f);
+        }
+      }
+    }
+    return fset;
+  }
+
+
+  /**
    * Moves processed logs to a oldLogDir after successful processing Moves
    * corrupted logs (any log that couldn't be successfully parsed to corruptDir
    * (.corrupt) for later investigation
-   * 
+   *
    * @param corruptedLogs
    * @param processedLogs
    * @param oldLogDir
@@ -329,7 +528,7 @@ public class HLogSplitter {
 
     for (Path corrupted : corruptedLogs) {
       Path p = new Path(corruptDir, corrupted.getName());
-      if (!fs.rename(corrupted, p)) { 
+      if (!fs.rename(corrupted, p)) {
         LOG.info("Unable to move corrupted log " + corrupted + " to " + p);
       } else {
         LOG.info("Moving corrupted log " + corrupted + " to " + p);
@@ -344,8 +543,8 @@ public class HLogSplitter {
         LOG.info("Archived processed log " + p + " to " + newPath);
       }
     }
-    
-    if (!fs.delete(srcDir, true)) {
+
+    if (srcDir != null && !fs.delete(srcDir, true)) {
       throw new IOException("Unable to delete src dir: " + srcDir);
     }
   }
@@ -363,19 +562,21 @@ public class HLogSplitter {
    * @throws IOException
    */
   static Path getRegionSplitEditsPath(final FileSystem fs,
-      final Entry logEntry, final Path rootDir) throws IOException {
+      final Entry logEntry, final Path rootDir, boolean isCreate)
+  throws IOException {
     Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey()
         .getTablename());
     Path regiondir = HRegion.getRegionDir(tableDir,
         Bytes.toString(logEntry.getKey().getEncodedRegionName()));
+    Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir);
+
     if (!fs.exists(regiondir)) {
       LOG.info("This region's directory doesn't exist: "
           + regiondir.toString() + ". It is very likely that it was" +
           " already split so it's safe to discard those edits.");
       return null;
     }
-    Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir);
-    if (!fs.exists(dir)) {
+    if (isCreate && !fs.exists(dir)) {
       if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
     }
     return new Path(dir, formatRecoveredEditsFileName(logEntry.getKey()
@@ -385,7 +586,7 @@ public class HLogSplitter {
   static String formatRecoveredEditsFileName(final long seqid) {
     return String.format("%019d", seqid);
   }
-  
+
   /*
    * Parse a single hlog and put the edits in @splitLogsMap
    *
@@ -394,61 +595,116 @@ public class HLogSplitter {
    * list of edits as values
    * @param fs the filesystem
    * @param conf the configuration
-   * @throws IOException if hlog is corrupted, or can't be open
+   * @throws IOException
+   * @throws CorruptedLogFileException if hlog is corrupted
    */
-  private void parseHLog(final FileStatus logfile,
+  private void parseHLog(final Reader in, Path path,
 		EntryBuffers entryBuffers, final FileSystem fs,
-    final Configuration conf) 
-	throws IOException {
-    // Check for possibly empty file. With appends, currently Hadoop reports a
-    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
-    // HDFS-878 is committed.
-    long length = logfile.getLen();
-    if (length <= 0) {
-      LOG.warn("File " + logfile.getPath() + " might be still open, length is 0");
-    }
-    Path path = logfile.getPath();
-    Reader in;
+    final Configuration conf, boolean skipErrors)
+	throws IOException, CorruptedLogFileException {
     int editsCount = 0;
     try {
-      in = getReader(fs, path, conf);
-    } catch (EOFException e) {
-      if (length <= 0) {
-	      //TODO should we ignore an empty, not-last log file if skip.errors is false?
-        //Either way, the caller should decide what to do. E.g. ignore if this is the last
-        //log in sequence.
-        //TODO is this scenario still possible if the log has been recovered (i.e. closed)
-        LOG.warn("Could not open " + path + " for reading. File is empty" + e);
-        return;
-      } else {
-        throw e;
-      }
-    }
-    try {
       Entry entry;
-      while ((entry = in.next()) != null) {
+      while ((entry = getNextLogLine(in, path, skipErrors)) != null) {
         entryBuffers.appendEntry(entry);
         editsCount++;
       }
     } catch (InterruptedException ie) {
-      throw new RuntimeException(ie);
+      IOException t = new InterruptedIOException();
+      t.initCause(ie);
+      throw t;
     } finally {
       LOG.debug("Pushed=" + editsCount + " entries from " + path);
+    }
+  }
+
+  /**
+   * Create a new {@link Reader} for reading logs to split.
+   *
+   * @param fs
+   * @param file
+   * @param conf
+   * @return A new Reader instance
+   * @throws IOException
+   * @throws CorruptedLogFile
+   */
+  protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
+      boolean skipErrors)
+      throws IOException, CorruptedLogFileException {
+    Path path = file.getPath();
+    long length = file.getLen();
+    Reader in;
+
+
+    // Check for possibly empty file. With appends, currently Hadoop reports a
+    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
+    // HDFS-878 is committed.
+    if (length <= 0) {
+      LOG.warn("File " + path + " might be still open, length is 0");
+    }
+
+    try {
+      recoverFileLease(fs, path, conf);
       try {
-        if (in != null) {
-          in.close();
+        in = getReader(fs, path, conf);
+      } catch (EOFException e) {
+        if (length <= 0) {
+          // TODO should we ignore an empty, not-last log file if skip.errors
+          // is false? Either way, the caller should decide what to do. E.g.
+          // ignore if this is the last log in sequence.
+          // TODO is this scenario still possible if the log has been
+          // recovered (i.e. closed)
+          LOG.warn("Could not open " + path + " for reading. File is empty", e);
+          return null;
+        } else {
+          // EOFException being ignored
+          return null;
         }
-      } catch (IOException e) {
-        LOG.warn("Close log reader in finally threw exception -- continuing",
-                 e);
       }
+    } catch (IOException e) {
+      if (!skipErrors) {
+        throw e;
+      }
+      CorruptedLogFileException t =
+        new CorruptedLogFileException("skipErrors=true Could not open hlog " +
+            path + " ignoring");
+      t.initCause(e);
+      throw t;
+    }
+    return in;
+  }
+
+  static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
+  throws CorruptedLogFileException, IOException {
+    try {
+      return in.next();
+    } catch (EOFException eof) {
+      // truncated files are expected if a RS crashes (see HBASE-2643)
+      LOG.info("EOF from hlog " + path + ".  continuing");
+      return null;
+    } catch (IOException e) {
+      // If the IOE resulted from bad file format,
+      // then this problem is idempotent and retrying won't help
+      if (e.getCause() instanceof ParseException) {
+        LOG.warn("ParseException from hlog " + path + ".  continuing");
+        return null;
+      }
+      if (!skipErrors) {
+        throw e;
+      }
+      CorruptedLogFileException t =
+        new CorruptedLogFileException("skipErrors=true Ignoring exception" +
+            " while parsing hlog " + path + ". Marking as corrupted");
+      t.initCause(e);
+      throw t;
     }
   }
 
+
   private void writerThreadError(Throwable t) {
     thrown.compareAndSet(null, t);
   }
-  
+
   /**
    * Check for errors in the writer threads. If any is found, rethrow it.
    */
@@ -477,26 +733,25 @@ public class HLogSplitter {
     return HLog.getReader(fs, curLogFile, conf);
   }
 
-
   /**
    * Class which accumulates edits and separates them into a buffer per region
    * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
    * a predefined threshold.
-   * 
+   *
    * Writer threads then pull region-specific buffers from this class.
    */
   class EntryBuffers {
     Map<byte[], RegionEntryBuffer> buffers =
       new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
-    
+
     /* Track which regions are currently in the middle of writing. We don't allow
        an IO thread to pick up bytes from a region if we're already writing
-       data for that region in a different IO thread. */ 
+       data for that region in a different IO thread. */
     Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
 
     long totalBuffered = 0;
     long maxHeapUsage;
-    
+
     EntryBuffers(long maxHeapUsage) {
       this.maxHeapUsage = maxHeapUsage;
     }
@@ -504,13 +759,13 @@ public class HLogSplitter {
     /**
      * Append a log entry into the corresponding region buffer.
      * Blocks if the total heap usage has crossed the specified threshold.
-     * 
+     *
      * @throws InterruptedException
-     * @throws IOException 
+     * @throws IOException
      */
     void appendEntry(Entry entry) throws InterruptedException, IOException {
       HLogKey key = entry.getKey();
-      
+
       RegionEntryBuffer buffer;
       synchronized (this) {
         buffer = buffers.get(key.getEncodedRegionName());
@@ -566,7 +821,7 @@ public class HLogSplitter {
         dataAvailable.notifyAll();
       }
     }
-    
+
     synchronized boolean isRegionCurrentlyWriting(byte[] region) {
       return currentlyWriting.contains(region);
     }
@@ -614,11 +869,11 @@ public class HLogSplitter {
 
   class WriterThread extends Thread {
     private volatile boolean shouldStop = false;
-    
+
     WriterThread(int i) {
       super("WriterThread-" + i);
     }
-    
+
     public void run()  {
       try {
         doRun();
@@ -627,7 +882,7 @@ public class HLogSplitter {
         writerThreadError(t);
       }
     }
-    
+
     private void doRun() throws IOException {
       LOG.debug("Writer thread " + this + ": starting");
       while (true) {
@@ -646,7 +901,7 @@ public class HLogSplitter {
           }
           continue;
         }
-        
+
         assert buffer != null;
         try {
           writeBuffer(buffer);
@@ -655,16 +910,16 @@ public class HLogSplitter {
         }
       }
     }
-       
+
     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
-      List<Entry> entries = buffer.entryBuffer;      
+      List<Entry> entries = buffer.entryBuffer;
       if (entries.isEmpty()) {
         LOG.warn(this.getName() + " got an empty buffer, skipping");
         return;
       }
 
       WriterAndPath wap = null;
-      
+
       long startTime = System.nanoTime();
       try {
         int editsCount = 0;
@@ -690,12 +945,74 @@ public class HLogSplitter {
         throw e;
       }
     }
-    
+
     void finish() {
       shouldStop = true;
     }
   }
 
+  private WriterAndPath createWAP(byte[] region, Entry entry,
+      Path rootdir, String tmpname, FileSystem fs, Configuration conf)
+  throws IOException {
+    Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir,
+        tmpname==null);
+    if (regionedits == null) {
+      return null;
+    }
+    if ((tmpname == null) && fs.exists(regionedits)) {
+      LOG.warn("Found existing old edits file. It could be the "
+          + "result of a previous failed split attempt. Deleting "
+          + regionedits + ", length="
+          + fs.getFileStatus(regionedits).getLen());
+      if (!fs.delete(regionedits, false)) {
+        LOG.warn("Failed delete of old " + regionedits);
+      }
+    }
+    Path editsfile;
+    if (tmpname != null) {
+      // During distributed log splitting the output by each
+      // SplitLogWorker is written to a temporary area.
+      editsfile = convertRegionEditsToTemp(rootdir, regionedits, tmpname);
+    } else {
+      editsfile = regionedits;
+    }
+    Writer w = createWriter(fs, editsfile, conf);
+    LOG.debug("Creating writer path=" + editsfile + " region="
+        + Bytes.toStringBinary(region));
+    return (new WriterAndPath(editsfile, w));
+  }
+
+  Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
+    List<String> components = new ArrayList<String>(10);
+    do {
+      components.add(edits.getName());
+      edits = edits.getParent();
+    } while (edits.depth() > rootdir.depth());
+    Path ret = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
+    for (int i = components.size() - 1; i >= 0; i--) {
+      ret = new Path(ret, components.get(i));
+    }
+    try {
+      if (fs.exists(ret)) {
+        LOG.warn("Found existing old temporary edits file. It could be the "
+            + "result of a previous failed split attempt. Deleting "
+            + ret + ", length="
+            + fs.getFileStatus(ret).getLen());
+        if (!fs.delete(ret, false)) {
+          LOG.warn("Failed delete of old " + ret);
+        }
+      }
+      Path dir = ret.getParent();
+      if (!fs.exists(dir)) {
+        if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
+      }
+    } catch (IOException e) {
+      LOG.warn("Could not prepare temp staging area ", e);
+      // ignore, exceptions will be thrown elsewhere
+    }
+    return ret;
+  }
+
   /**
    * Class that manages the output streams from the log splitting process.
    */
@@ -703,13 +1020,13 @@ public class HLogSplitter {
     private final Map<byte[], WriterAndPath> logWriters = Collections.synchronizedMap(
           new TreeMap<byte[], WriterAndPath>(Bytes.BYTES_COMPARATOR));
     private final List<WriterThread> writerThreads = Lists.newArrayList();
-    
+
     /* Set of regions which we've decided should not output edits */
     private final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(
         new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
-    
-    private boolean hasClosed = false;    
-    
+
+    private boolean hasClosed = false;
+
     /**
      * Start the threads that will pump data from the entryBuffers
      * to the output files.
@@ -730,7 +1047,7 @@ public class HLogSplitter {
         writerThreads.add(t);
       }
     }
-    
+
     List<Path> finishWritingAndClose() throws IOException {
       LOG.info("Waiting for split writer threads to finish");
       for (WriterThread t : writerThreads) {
@@ -745,7 +1062,7 @@ public class HLogSplitter {
         checkForErrors();
       }
       LOG.info("Split writers finished");
-      
+
       return closeStreams();
     }
 
@@ -755,10 +1072,10 @@ public class HLogSplitter {
      */
     private List<Path> closeStreams() throws IOException {
       Preconditions.checkState(!hasClosed);
-      
+
       List<Path> paths = new ArrayList<Path>();
       List<IOException> thrown = Lists.newArrayList();
-      
+
       for (WriterAndPath wap : logWriters.values()) {
         try {
           wap.w.close();
@@ -774,67 +1091,40 @@ public class HLogSplitter {
       if (!thrown.isEmpty()) {
         throw MultipleIOException.createIOException(thrown);
       }
-      
+
       hasClosed = true;
       return paths;
     }
 
     /**
      * Get a writer and path for a log starting at the given entry.
-     * 
+     *
      * This function is threadsafe so long as multiple threads are always
      * acting on different regions.
-     * 
+     *
      * @return null if this region shouldn't output any logs
      */
     WriterAndPath getWriterAndPath(Entry entry) throws IOException {
-    
       byte region[] = entry.getKey().getEncodedRegionName();
       WriterAndPath ret = logWriters.get(region);
       if (ret != null) {
         return ret;
       }
-      
       // If we already decided that this region doesn't get any output
       // we don't need to check again.
       if (blacklistedRegions.contains(region)) {
         return null;
       }
-      
-      // Need to create writer
-      Path regionedits = getRegionSplitEditsPath(fs,
-          entry, rootDir);
-      if (regionedits == null) {
-        // Edits dir doesn't exist
+      ret = createWAP(region, entry, rootDir, null, fs, conf);
+      if (ret == null) {
         blacklistedRegions.add(region);
         return null;
       }
-      deletePreexistingOldEdits(regionedits);
-      Writer w = createWriter(fs, regionedits, conf);
-      ret = new WriterAndPath(regionedits, w);
       logWriters.put(region, ret);
-      LOG.debug("Creating writer path=" + regionedits + " region="
-          + Bytes.toStringBinary(region));
-
       return ret;
     }
 
     /**
-     * If the specified path exists, issue a warning and delete it.
-     */
-    private void deletePreexistingOldEdits(Path regionedits) throws IOException {
-      if (fs.exists(regionedits)) {
-        LOG.warn("Found existing old edits file. It could be the "
-            + "result of a previous failed split attempt. Deleting "
-            + regionedits + ", length="
-            + fs.getFileStatus(regionedits).getLen());
-        if (!fs.delete(regionedits, false)) {
-          LOG.warn("Failed delete of old " + regionedits);
-        }
-      }
-    }
-
-    /**
      * @return a map from encoded region ID to the number of edits written out
      * for that region.
      */
@@ -850,6 +1140,8 @@ public class HLogSplitter {
     }
   }
 
+
+
   /**
    *  Private data structure that wraps a Writer and its Path,
    *  also collecting statistics about the data written to this
@@ -877,4 +1169,11 @@ public class HLogSplitter {
       nanosSpent += nanos;
     }
   }
+
+  static class CorruptedLogFileException extends Exception {
+    private static final long serialVersionUID = 1L;
+    CorruptedLogFileException(String s) {
+      super(s);
+    }
+  }
 }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java?rev=1094662&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java Mon Apr 18 17:16:15 2011
@@ -0,0 +1,271 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Common methods and attributes used by {@link SplitLogManager} and
+ * {@link SplitLogWorker}
+ */
+public class ZKSplitLog {
+  private static final Log LOG = LogFactory.getLog(ZKSplitLog.class);
+
+  public static final int DEFAULT_TIMEOUT = 25000; // 25 sec
+  public static final int DEFAULT_ZK_RETRIES = 3;
+  public static final int DEFAULT_MAX_RESUBMIT = 3;
+  public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
+
+  /**
+   * Gets the full path node name for the log file being split
+   * @param zkw zk reference
+   * @param filename log file name (only the basename)
+   */
+  public static String getNodeName(ZooKeeperWatcher zkw, String filename) {
+      return ZKUtil.joinZNode(zkw.splitLogZNode, encode(filename));
+  }
+
+  public static String getFileName(String node) {
+    String basename = node.substring(node.lastIndexOf('/') + 1);
+    return decode(basename);
+  }
+
+
+  public static String encode(String s) {
+    try {
+      return URLEncoder.encode(s, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("URLENCODER doesn't support UTF-8");
+    }
+  }
+
+  public static String decode(String s) {
+    try {
+      return URLDecoder.decode(s, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("URLDecoder doesn't support UTF-8");
+    }
+  }
+
+  public static String getRescanNode(ZooKeeperWatcher zkw) {
+    return getNodeName(zkw, "RESCAN");
+  }
+
+  public static boolean isRescanNode(ZooKeeperWatcher zkw, String path) {
+    String prefix = getRescanNode(zkw);
+    if (path.length() <= prefix.length()) {
+      return false;
+    }
+    for (int i = 0; i < prefix.length(); i++) {
+      if (prefix.charAt(i) != path.charAt(i)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public static boolean isTaskPath(ZooKeeperWatcher zkw, String path) {
+    String dirname = path.substring(0, path.lastIndexOf('/'));
+    return dirname.equals(zkw.splitLogZNode);
+  }
+
+  public static enum TaskState {
+    TASK_UNASSIGNED("unassigned"),
+    TASK_OWNED("owned"),
+    TASK_RESIGNED("resigned"),
+    TASK_DONE("done"),
+    TASK_ERR("err");
+
+    private final byte[] state;
+    private TaskState(String s) {
+      state = s.getBytes();
+    }
+
+    public byte[] get(String serverName) {
+      return (Bytes.add(state, " ".getBytes(), serverName.getBytes()));
+    }
+
+    public String getWriterName(byte[] data) {
+      String str = Bytes.toString(data);
+      return str.substring(str.indexOf(' ') + 1);
+    }
+
+
+    /**
+     * @param s
+     * @return True if {@link #state} is a prefix of s. False otherwise.
+     */
+    public boolean equals(byte[] s) {
+      if (s.length < state.length) {
+        return (false);
+      }
+      for (int i = 0; i < state.length; i++) {
+        if (state[i] != s[i]) {
+          return (false);
+        }
+      }
+      return (true);
+    }
+
+    public boolean equals(byte[] s, String serverName) {
+      return (Arrays.equals(s, get(serverName)));
+    }
+    @Override
+    public String toString() {
+      return new String(state);
+    }
+  }
+
+  public static Path getSplitLogDir(Path rootdir, String tmpname) {
+    return new Path(new Path(rootdir, "splitlog"), tmpname);
+  }
+
+  public static Path stripSplitLogTempDir(Path rootdir, Path file) {
+    int skipDepth = rootdir.depth() + 2;
+    List<String> components = new ArrayList<String>(10);
+    do {
+      components.add(file.getName());
+      file = file.getParent();
+    } while (file.depth() > skipDepth);
+    Path ret = rootdir;
+    for (int i = components.size() - 1; i >= 0; i--) {
+      ret = new Path(ret, components.get(i));
+    }
+    return ret;
+  }
+
+  public static String getSplitLogDirTmpComponent(String worker, String file) {
+    return (worker + "_" + ZKSplitLog.encode(file));
+  }
+
+  public static void markCorrupted(Path rootdir, String tmpname,
+      FileSystem fs) {
+    Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
+    try {
+      fs.createNewFile(file);
+    } catch (IOException e) {
+      LOG.warn("Could not flag a log file as corrupted. Failed to create " +
+          file, e);
+    }
+  }
+
+  public static boolean isCorrupted(Path rootdir, String tmpname,
+      FileSystem fs) throws IOException {
+    Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
+    boolean isCorrupt;
+    isCorrupt = fs.exists(file);
+    return isCorrupt;
+  }
+
+  public static boolean isCorruptFlagFile(Path file) {
+    return file.getName().equals("corrupt");
+  }
+
+
+  public static class Counters {
+    //SplitLogManager counters
+    public static AtomicLong tot_mgr_log_split_batch_start = new AtomicLong(0);
+    public static AtomicLong tot_mgr_log_split_batch_success =
+      new AtomicLong(0);
+    public static AtomicLong tot_mgr_log_split_batch_err = new AtomicLong(0);
+    public static AtomicLong tot_mgr_new_unexpected_hlogs = new AtomicLong(0);
+    public static AtomicLong tot_mgr_log_split_start = new AtomicLong(0);
+    public static AtomicLong tot_mgr_log_split_success = new AtomicLong(0);
+    public static AtomicLong tot_mgr_log_split_err = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_create_queued = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_create_result = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_already_exists = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_create_err = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0);
+    public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0);
+    public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0);
+    public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0);
+    public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_delete_result = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_delete_err = new AtomicLong(0);
+    public static AtomicLong tot_mgr_resubmit = new AtomicLong(0);
+    public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0);
+    public static AtomicLong tot_mgr_null_data = new AtomicLong(0);
+    public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0);
+    public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0);
+    public static AtomicLong tot_mgr_resubmit_threshold_reached =
+      new AtomicLong(0);
+    public static AtomicLong tot_mgr_missing_state_in_delete =
+      new AtomicLong(0);
+    public static AtomicLong tot_mgr_heartbeat = new AtomicLong(0);
+    public static AtomicLong tot_mgr_rescan = new AtomicLong(0);
+    public static AtomicLong tot_mgr_rescan_deleted = new AtomicLong(0);
+    public static AtomicLong tot_mgr_task_deleted = new AtomicLong(0);
+    public static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0);
+    public static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0);
+
+
+
+    // SplitLogWorker counters
+    public static AtomicLong tot_wkr_failed_to_grab_task_no_data =
+      new AtomicLong(0);
+    public static AtomicLong tot_wkr_failed_to_grab_task_exception =
+      new AtomicLong(0);
+    public static AtomicLong tot_wkr_failed_to_grab_task_owned =
+      new AtomicLong(0);
+    public static AtomicLong tot_wkr_failed_to_grab_task_lost_race =
+      new AtomicLong(0);
+    public static AtomicLong tot_wkr_task_acquired = new AtomicLong(0);
+    public static AtomicLong tot_wkr_task_resigned = new AtomicLong(0);
+    public static AtomicLong tot_wkr_task_done = new AtomicLong(0);
+    public static AtomicLong tot_wkr_task_err = new AtomicLong(0);
+    public static AtomicLong tot_wkr_task_heartbeat = new AtomicLong(0);
+    public static AtomicLong tot_wkr_task_acquired_rescan = new AtomicLong(0);
+    public static AtomicLong tot_wkr_get_data_queued = new AtomicLong(0);
+    public static AtomicLong tot_wkr_get_data_result = new AtomicLong(0);
+    public static AtomicLong tot_wkr_get_data_retry = new AtomicLong(0);
+    public static AtomicLong tot_wkr_preempt_task = new AtomicLong(0);
+    public static AtomicLong tot_wkr_task_heartbeat_failed = new AtomicLong(0);
+    public static AtomicLong tot_wkr_final_transistion_failed =
+      new AtomicLong(0);
+
+    public static void resetCounters() throws Exception {
+      Class<?> cl = (new Counters()).getClass();
+      Field[] flds = cl.getDeclaredFields();
+      for (Field fld : flds) {
+        ((AtomicLong)fld.get(null)).set(0);
+      }
+    }
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1094662&r1=1094661&r2=1094662&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Mon Apr 18 17:16:15 2011
@@ -575,7 +575,7 @@ public class ZKUtil {
    *
    * @param zkw zk reference
    * @param znode path of node
-   * @param stat node status to set if node exists
+   * @param stat node status to get if node exists
    * @return data of the specified znode, or null if node does not exist
    * @throws KeeperException if unexpected zookeeper exception
    */
@@ -583,7 +583,7 @@ public class ZKUtil {
       Stat stat)
   throws KeeperException {
     try {
-      byte [] data = zkw.getZooKeeper().getData(znode, zkw, stat);
+      byte [] data = zkw.getZooKeeper().getData(znode, null, stat);
       logRetrievedMsg(zkw, znode, data, false);
       return data;
     } catch (KeeperException.NoNodeException e) {
@@ -879,8 +879,7 @@ public class ZKUtil {
    */
   public static void asyncCreate(ZooKeeperWatcher zkw,
       String znode, byte [] data, final AsyncCallback.StringCallback cb,
-      final Object ctx)
-  throws KeeperException, KeeperException.NodeExistsException {
+      final Object ctx) {
     zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
        CreateMode.PERSISTENT, cb, ctx);
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=1094662&r1=1094661&r2=1094662&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Mon Apr 18 17:16:15 2011
@@ -89,6 +89,8 @@ public class ZooKeeperWatcher implements
   public String tableZNode;
   // znode containing the unique cluster ID
   public String clusterIdZNode;
+  // znode used for log splitting work assignment
+  public String splitLogZNode;
 
   private final Configuration conf;
 
@@ -165,6 +167,7 @@ public class ZooKeeperWatcher implements
       ZKUtil.createAndFailSilent(this, assignmentZNode);
       ZKUtil.createAndFailSilent(this, rsZNode);
       ZKUtil.createAndFailSilent(this, tableZNode);
+      ZKUtil.createAndFailSilent(this, splitLogZNode);
     } catch (KeeperException e) {
       throw new ZooKeeperConnectionException(
           prefix("Unexpected KeeperException creating base node"), e);
@@ -210,6 +213,8 @@ public class ZooKeeperWatcher implements
         conf.get("zookeeper.znode.tableEnableDisable", "table"));
     clusterIdZNode = ZKUtil.joinZNode(baseZNode,
         conf.get("zookeeper.znode.clusterId", "hbaseid"));
+    splitLogZNode = ZKUtil.joinZNode(baseZNode,
+        conf.get("zookeeper.znode.splitlog", "splitlog"));
   }
 
   /**
@@ -247,7 +252,7 @@ public class ZooKeeperWatcher implements
 
   /**
    * Method called from ZooKeeper for events and connection status.
-   *
+   * <p>
    * Valid events are passed along to listeners.  Connection status changes
    * are dealt with locally.
    */
@@ -302,12 +307,12 @@ public class ZooKeeperWatcher implements
 
   /**
    * Called when there is a connection-related event via the Watcher callback.
-   *
+   * <p>
    * If Disconnected or Expired, this should shutdown the cluster. But, since
    * we send a KeeperException.SessionExpiredException along with the abort
    * call, it's possible for the Abortable to catch it and try to create a new
    * session with ZooKeeper. This is what the client does in HCM.
-   *
+   * <p>
    * @param event
    */
   private void connectionEvent(WatchedEvent event) {
@@ -376,11 +381,11 @@ public class ZooKeeperWatcher implements
 
   /**
    * Handles KeeperExceptions in client calls.
-   *
+   * <p>
    * This may be temporary but for now this gives one place to deal with these.
-   *
+   * <p>
    * TODO: Currently this method rethrows the exception to let the caller handle
-   *
+   * <p>
    * @param ke
    * @throws KeeperException
    */
@@ -392,13 +397,13 @@ public class ZooKeeperWatcher implements
 
   /**
    * Handles InterruptedExceptions in client calls.
-   *
+   * <p>
    * This may be temporary but for now this gives one place to deal with these.
-   *
+   * <p>
    * TODO: Currently, this method does nothing.
    *       Is this ever expected to happen?  Do we abort or can we let it run?
    *       Maybe this should be logged as WARN?  It shouldn't happen?
-   *
+   * <p>
    * @param ie
    */
   public void interruptedException(InterruptedException ie) {

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1094662&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Mon Apr 18 17:16:15 2011
@@ -0,0 +1,445 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestDistributedLogSplitting {
+  private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
+  static {
+    Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
+  }
+
+  // Start a cluster with 2 masters and 3 regionservers
+  final int NUM_MASTERS = 2;
+  final int NUM_RS = 6;
+
+  MiniHBaseCluster cluster;
+  HMaster master;
+  Configuration conf;
+  HBaseTestingUtility TEST_UTIL;
+
+  @Before
+  public void before() throws Exception {
+
+  }
+
+  private void startCluster(int num_rs) throws Exception{
+    ZKSplitLog.Counters.resetCounters();
+    LOG.info("Starting cluster");
+    conf = HBaseConfiguration.create();
+    conf.getLong("hbase.splitlog.max.resubmit", 0);
+    TEST_UTIL = new HBaseTestingUtility(conf);
+    TEST_UTIL.startMiniCluster(NUM_MASTERS, num_rs);
+    cluster = TEST_UTIL.getHBaseCluster();
+    LOG.info("Waiting for active/ready master");
+    cluster.waitForActiveAndReadyMaster();
+    master = cluster.getMaster();
+  }
+
+  @After
+  public void after() throws Exception {
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testThreeRSAbort() throws Exception {
+    LOG.info("testThreeRSAbort");
+    final int NUM_REGIONS_TO_CREATE = 40;
+    final int NUM_ROWS_PER_REGION = 100;
+
+    startCluster(NUM_RS);
+
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf,
+        "distributed log splitting test", null);
+
+    HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+    populateDataInTable(NUM_ROWS_PER_REGION, "family");
+
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    assertEquals(NUM_RS, rsts.size());
+    rsts.get(0).getRegionServer().abort("testing");
+    rsts.get(1).getRegionServer().abort("testing");
+    rsts.get(2).getRegionServer().abort("testing");
+
+    long start = EnvironmentEdgeManager.currentTimeMillis();
+    while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) {
+      if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
+        assertTrue(false);
+      }
+      Thread.sleep(200);
+    }
+
+    start = EnvironmentEdgeManager.currentTimeMillis();
+    while (getAllOnlineRegions(cluster).size() < (NUM_REGIONS_TO_CREATE + 2)) {
+      if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
+        assertTrue(false);
+      }
+      Thread.sleep(200);
+    }
+
+    assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION,
+        TEST_UTIL.countRows(ht));
+  }
+
+  @Test(expected=OrphanHLogAfterSplitException.class)
+  public void testOrphanLogCreation() throws Exception {
+    LOG.info("testOrphanLogCreation");
+    startCluster(NUM_RS);
+    final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    HRegionServer hrs = rsts.get(0).getRegionServer();
+    Path rootdir = FSUtils.getRootDir(conf);
+    final Path logDir = new Path(rootdir,
+        HLog.getHLogDirectoryName(hrs.getServerName()));
+
+    installTable(new ZooKeeperWatcher(conf, "table-creation", null),
+        "table", "family", 40);
+
+    makeHLog(hrs.getWAL(), hrs.getOnlineRegions(), "table",
+        1000, 100);
+
+    new Thread() {
+      public void run() {
+        while (true) {
+          int i = 0;
+          try {
+            while(ZKSplitLog.Counters.tot_mgr_log_split_batch_start.get() ==
+              0) {
+              Thread.yield();
+            }
+            fs.createNewFile(new Path(logDir, "foo" + i++));
+          } catch (Exception e) {
+            LOG.debug("file creation failed", e);
+            return;
+          }
+        }
+      }
+    }.start();
+    slm.splitLogDistributed(logDir);
+    FileStatus[] files = fs.listStatus(logDir);
+    if (files != null) {
+      for (FileStatus file : files) {
+        LOG.debug("file still there " + file.getPath());
+      }
+    }
+  }
+
+  @Test
+  public void testRecoveredEdits() throws Exception {
+    LOG.info("testRecoveredEdits");
+    startCluster(NUM_RS);
+    final int NUM_LOG_LINES = 1000;
+    final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+    // turn off load balancing to prevent regions from moving around otherwise
+    // they will consume recovered.edits
+    master.balanceSwitch(false);
+    FileSystem fs = master.getMasterFileSystem().getFileSystem();
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    HRegionServer hrs = rsts.get(0).getRegionServer();
+    Path rootdir = FSUtils.getRootDir(conf);
+    final Path logDir = new Path(rootdir,
+        HLog.getHLogDirectoryName(hrs.getServerName()));
+
+    installTable(new ZooKeeperWatcher(conf, "table-creation", null),
+        "table", "family", 40);
+    byte[] table = Bytes.toBytes("table");
+    List<HRegionInfo> regions = hrs.getOnlineRegions();
+    LOG.info("#regions = " + regions.size());
+    Iterator<HRegionInfo> it = regions.iterator();
+    while (it.hasNext()) {
+      HRegionInfo region = it.next();
+      if (region.isMetaRegion() || region.isRootRegion()) {
+        it.remove();
+      }
+    }
+    makeHLog(hrs.getWAL(), regions, "table",
+        NUM_LOG_LINES, 100);
+
+    slm.splitLogDistributed(logDir);
+
+    int count = 0;
+    for (HRegionInfo hri : regions) {
+
+      Path tdir = HTableDescriptor.getTableDir(rootdir, table);
+      Path editsdir =
+        HLog.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
+        hri.getEncodedName()));
+      LOG.debug("checking edits dir " + editsdir);
+      FileStatus[] files = fs.listStatus(editsdir);
+      assertEquals(1, files.length);
+      int c = countHLog(files[0].getPath(), fs, conf);
+      count += c;
+      LOG.info(c + " edits in " + files[0].getPath());
+    }
+    assertEquals(NUM_LOG_LINES, count);
+  }
+
+  @Test
+  public void testWorkerAbort() throws Exception {
+    LOG.info("testWorkerAbort");
+    startCluster(1);
+    final int NUM_LOG_LINES = 10000;
+    final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+    FileSystem fs = master.getMasterFileSystem().getFileSystem();
+
+    final List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    HRegionServer hrs = rsts.get(0).getRegionServer();
+    Path rootdir = FSUtils.getRootDir(conf);
+    final Path logDir = new Path(rootdir,
+        HLog.getHLogDirectoryName(hrs.getServerName()));
+
+    installTable(new ZooKeeperWatcher(conf, "table-creation", null),
+        "table", "family", 40);
+    byte[] table = Bytes.toBytes("table");
+    makeHLog(hrs.getWAL(), hrs.getOnlineRegions(), "table",
+        NUM_LOG_LINES, 100);
+
+    new Thread() {
+      public void run() {
+        waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+        for (RegionServerThread rst : rsts) {
+          rst.getRegionServer().abort("testing");
+        }
+      }
+    }.start();
+    // slm.splitLogDistributed(logDir);
+    FileStatus[] logfiles = fs.listStatus(logDir);
+    TaskBatch batch = new TaskBatch();
+    slm.installTask(logfiles[0].getPath().toString(), batch);
+    //waitForCounter but for one of the 2 counters
+    long curt = System.currentTimeMillis();
+    long endt = curt + 30000;
+    while (curt < endt) {
+      if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() + 
+          tot_wkr_final_transistion_failed.get()) == 0) {
+        Thread.yield();
+        curt = System.currentTimeMillis();
+      } else {
+        assertEquals(1, (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
+            tot_wkr_final_transistion_failed.get()));
+        return;
+      }
+    }
+    assertEquals(1, batch.done);
+    // fail("region server completed the split before aborting");
+    return;
+  }
+
+  HTable installTable(ZooKeeperWatcher zkw, String tname, String fname,
+      int nrs ) throws Exception {
+    // Create a table with regions
+    byte [] table = Bytes.toBytes(tname);
+    byte [] family = Bytes.toBytes(fname);
+    LOG.info("Creating table with " + nrs + " regions");
+    HTable ht = TEST_UTIL.createTable(table, family);
+    int numRegions = TEST_UTIL.createMultiRegions(conf, ht, family, nrs);
+    assertEquals(nrs, numRegions);
+    LOG.info("Waiting for no more RIT\n");
+    blockUntilNoRIT(zkw, master);
+    // disable-enable cycle to get rid of table's dead regions left behind
+    // by createMultiRegions
+    LOG.debug("Disabling table\n");
+    TEST_UTIL.getHBaseAdmin().disableTable(table);
+    LOG.debug("Waiting for no more RIT\n");
+    blockUntilNoRIT(zkw, master);
+    NavigableSet<String> regions = getAllOnlineRegions(cluster);
+    LOG.debug("Verifying only catalog regions are assigned\n");
+    if (regions.size() != 2) {
+      for (String oregion : regions)
+        LOG.debug("Region still online: " + oregion);
+    }
+    assertEquals(2, regions.size());
+    LOG.debug("Enabling table\n");
+    TEST_UTIL.getHBaseAdmin().enableTable(table);
+    LOG.debug("Waiting for no more RIT\n");
+    blockUntilNoRIT(zkw, master);
+    LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
+    regions = getAllOnlineRegions(cluster);
+    assertEquals(numRegions + 2, regions.size());
+    return ht;
+  }
+
+  void populateDataInTable(int nrows, String fname) throws Exception {
+    byte [] family = Bytes.toBytes(fname);
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    assertEquals(NUM_RS, rsts.size());
+
+    for (RegionServerThread rst : rsts) {
+      HRegionServer hrs = rst.getRegionServer();
+      List<HRegionInfo> hris = hrs.getOnlineRegions();
+      for (HRegionInfo hri : hris) {
+        if (hri.isMetaRegion() || hri.isRootRegion()) {
+          continue;
+        }
+        LOG.debug("adding data to rs = " + rst.getName() +
+            " region = "+ hri.getRegionNameAsString());
+        HRegion region = hrs.getOnlineRegion(hri.getRegionName());
+        assertTrue(region != null);
+        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family);
+      }
+    }
+  }
+
+  public void makeHLog(HLog log,
+      List<HRegionInfo> hris, String tname,
+      int num_edits, int edit_size) throws IOException {
+
+    byte[] table = Bytes.toBytes(tname);
+    byte[] value = new byte[edit_size];
+    for (int i = 0; i < edit_size; i++) {
+      value[i] = (byte)('a' + (i % 26));
+    }
+    int n = hris.size();
+    int[] counts = new int[n];
+    int j = 0;
+    for (int i = 0; i < num_edits; i += 1) {
+      WALEdit e = new WALEdit();
+      byte [] row = Bytes.toBytes("r" + Integer.toString(i));
+      byte [] family = Bytes.toBytes("f");
+      byte [] qualifier = Bytes.toBytes("c" + Integer.toString(i));
+      e.add(new KeyValue(row, family, qualifier,
+          System.currentTimeMillis(), value));
+      // LOG.info("Region " + i + ": " + e);
+      j++;
+      log.append(hris.get(j % n), table, e, System.currentTimeMillis());
+      counts[j % n] += 1;
+      // if ((i % 8096) == 0) {
+        // log.sync();
+      //  }
+    }
+    log.sync();
+    log.close();
+    for (int i = 0; i < n; i++) {
+      LOG.info("region " + hris.get(i).getRegionNameAsString() +
+          " has " + counts[i] + " edits");
+    }
+    return;
+  }
+
+  private int countHLog(Path log, FileSystem fs, Configuration conf)
+  throws IOException {
+    int count = 0;
+    HLog.Reader in = HLog.getReader(fs, log, conf);
+    while (in.next() != null) {
+      count++;
+    }
+    return count;
+  }
+
+  private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master)
+  throws KeeperException, InterruptedException {
+    ZKAssign.blockUntilNoRIT(zkw);
+    master.assignmentManager.waitUntilNoRegionsInTransition(60000);
+  }
+
+  private void blockUntilRIT(ZooKeeperWatcher zkw)
+  throws KeeperException, InterruptedException {
+    ZKAssign.blockUntilRIT(zkw);
+  }
+
+  private void putData(HRegion region, byte[] startRow, int numRows, byte [] qf,
+      byte [] ...families)
+  throws IOException {
+    for(int i = 0; i < numRows; i++) {
+      Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
+      for(byte [] family : families) {
+        put.add(family, qf, null);
+      }
+      region.put(put);
+    }
+  }
+
+  private NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster) {
+    NavigableSet<String> online = new TreeSet<String>();
+    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
+      for (HRegionInfo region : rst.getRegionServer().getOnlineRegions()) {
+        online.add(region.getRegionNameAsString());
+      }
+    }
+    return online;
+  }
+
+  private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+      long timems) {
+    long curt = System.currentTimeMillis();
+    long endt = curt + timems;
+    while (curt < endt) {
+      if (ctr.get() == oldval) {
+        Thread.yield();
+        curt = System.currentTimeMillis();
+      } else {
+        assertEquals(newval, ctr.get());
+        return;
+      }
+    }
+    assertTrue(false);
+  }
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java?rev=1094662&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java Mon Apr 18 17:16:15 2011
@@ -0,0 +1,432 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.master.SplitLogManager.Task;
+import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
+import org.apache.hadoop.hbase.regionserver.TestMasterAddressManager.NodeCreationListener;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestSplitLogManager {
+  private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
+  static {
+    Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
+  }
+
+  private ZooKeeperWatcher zkw;
+  private static boolean stopped = false;
+  private SplitLogManager slm;
+  private Configuration conf;
+
+  private final static HBaseTestingUtility TEST_UTIL =
+    new HBaseTestingUtility();
+
+  static Stoppable stopper = new Stoppable() {
+    @Override
+    public void stop(String why) {
+      stopped = true;
+    }
+
+    @Override
+    public boolean isStopped() {
+      return stopped;
+    }
+
+  };
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+  }
+
+  @Before
+  public void setup() throws Exception {
+    TEST_UTIL.startMiniZKCluster();
+    conf = TEST_UTIL.getConfiguration();
+    zkw = new ZooKeeperWatcher(conf, "split-log-manager-tests", null);
+    ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
+    ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
+    assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
+    LOG.debug(zkw.baseZNode + " created");
+    ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
+    assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
+    LOG.debug(zkw.splitLogZNode + " created");
+
+    stopped = false;
+    resetCounters();
+  }
+
+  @After
+  public void teardown() throws IOException, KeeperException {
+    stopper.stop("");
+    slm.stop();
+    TEST_UTIL.shutdownMiniZKCluster();
+  }
+
+  private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+      long timems) {
+    long curt = System.currentTimeMillis();
+    long endt = curt + timems;
+    while (curt < endt) {
+      if (ctr.get() == oldval) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+        }
+        curt = System.currentTimeMillis();
+      } else {
+        assertEquals(newval, ctr.get());
+        return;
+      }
+    }
+    assertTrue(false);
+  }
+
+  private int numRescanPresent() throws KeeperException {
+    int num = 0;
+    List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
+    for (String node : nodes) {
+      if (ZKSplitLog.isRescanNode(zkw, ZKSplitLog.getNodeName(zkw, node))) {
+        num++;
+      }
+    }
+    return num;
+  }
+
+  private void setRescanNodeDone(int count) throws KeeperException {
+    List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
+    for (String node : nodes) {
+      if (ZKSplitLog.isRescanNode(zkw, ZKSplitLog.getNodeName(zkw, node))) {
+        ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, node),
+            TaskState.TASK_DONE.get("some-worker"));
+        count--;
+      }
+    }
+    assertEquals(0, count);
+  }
+
+  private String submitTaskAndWait(TaskBatch batch, String name)
+  throws KeeperException, InterruptedException {
+    String tasknode = ZKSplitLog.getNodeName(zkw, "foo");
+    NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
+    zkw.registerListener(listener);
+    ZKUtil.watchAndCheckExists(zkw, tasknode);
+
+    slm.installTask("foo", batch);
+    assertEquals(1, batch.installed);
+    assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch);
+    assertEquals(1L, tot_mgr_node_create_queued.get());
+
+    LOG.debug("waiting for task node creation");
+    listener.waitForCreation();
+    LOG.debug("task created");
+    return tasknode;
+  }
+
+  /**
+   * Test whether the splitlog correctly creates a task in zookeeper
+   * @throws Exception
+   */
+  @Test
+  public void testTaskCreation() throws Exception {
+    LOG.info("TestTaskCreation - test the creation of a task in zk");
+
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    TaskBatch batch = new TaskBatch();
+
+    String tasknode = submitTaskAndWait(batch, "foo");
+
+    byte[] data = ZKUtil.getData(zkw, tasknode);
+    LOG.info("Task node created " + new String(data));
+    assertTrue(TaskState.TASK_UNASSIGNED.equals(data, "dummy-master"));
+  }
+
+  @Test
+  public void testOrphanTaskAcquisition() throws Exception {
+    LOG.info("TestOrphanTaskAcquisition");
+
+    String tasknode = ZKSplitLog.getNodeName(zkw, "orphan");
+    zkw.getZooKeeper().create(tasknode,
+        TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    int to = 1000;
+    conf.setInt("hbase.splitlog.manager.timeout", to);
+    conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+    to = to + 2 * 100;
+
+
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
+    Task task = slm.findOrCreateOrphanTask(tasknode);
+    assertTrue(task.isOrphan());
+    waitForCounter(tot_mgr_heartbeat, 0, 1, 100);
+    assertFalse(task.isUnassigned());
+    long curt = System.currentTimeMillis();
+    assertTrue((task.last_update <= curt) &&
+        (task.last_update > (curt - 1000)));
+    LOG.info("waiting for manager to resubmit the orphan task");
+    waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
+    assertTrue(task.isUnassigned());
+    waitForCounter(tot_mgr_rescan, 0, 1, to + 100);
+    assertEquals(1, numRescanPresent());
+  }
+
+  @Test
+  public void testUnassignedOrphan() throws Exception {
+    LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
+        " startup");
+    String tasknode = ZKSplitLog.getNodeName(zkw, "orphan");
+    //create an unassigned orphan task
+    zkw.getZooKeeper().create(tasknode,
+        TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+    int version = ZKUtil.checkExists(zkw, tasknode);
+
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
+    Task task = slm.findOrCreateOrphanTask(tasknode);
+    assertTrue(task.isOrphan());
+    assertTrue(task.isUnassigned());
+    // wait for RESCAN node to be created
+    waitForCounter(tot_mgr_rescan, 0, 1, 500);
+    Task task2 = slm.findOrCreateOrphanTask(tasknode);
+    assertTrue(task == task2);
+    LOG.debug("task = " + task);
+    assertEquals(1L, tot_mgr_resubmit.get());
+    assertEquals(1, task.incarnation);
+    assertEquals(0, task.unforcedResubmits);
+    assertTrue(task.isOrphan());
+    assertTrue(task.isUnassigned());
+    assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
+    assertEquals(1, numRescanPresent());
+  }
+
+  @Test
+  public void testMultipleResubmits() throws Exception {
+    LOG.info("TestMultipleResbmits - no indefinite resubmissions");
+
+    int to = 1000;
+    conf.setInt("hbase.splitlog.manager.timeout", to);
+    conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+    to = to + 2 * 100;
+
+    conf.setInt("hbase.splitlog.max.resubmit", 2);
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    TaskBatch batch = new TaskBatch();
+
+    String tasknode = submitTaskAndWait(batch, "foo");
+    int version = ZKUtil.checkExists(zkw, tasknode);
+
+    ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
+    waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
+    waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
+    int version1 = ZKUtil.checkExists(zkw, tasknode);
+    assertTrue(version1 > version);
+    ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker2"));
+    waitForCounter(tot_mgr_heartbeat, 1, 2, 1000);
+    waitForCounter(tot_mgr_resubmit, 1, 2, to + 100);
+    int version2 = ZKUtil.checkExists(zkw, tasknode);
+    assertTrue(version2 > version1);
+    ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker3"));
+    waitForCounter(tot_mgr_heartbeat, 1, 2, 1000);
+    waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + 100);
+    assertEquals(2, numRescanPresent());
+    Thread.sleep(to + 100);
+    assertEquals(2L, tot_mgr_resubmit.get());
+  }
+
+  @Test
+  public void testRescanCleanup() throws Exception {
+    LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
+
+    int to = 1000;
+    conf.setInt("hbase.splitlog.manager.timeout", to);
+    conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+    to = to + 2 * 100;
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    TaskBatch batch = new TaskBatch();
+
+    String tasknode = submitTaskAndWait(batch, "foo");
+    int version = ZKUtil.checkExists(zkw, tasknode);
+
+    ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
+    waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
+    waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
+    int version1 = ZKUtil.checkExists(zkw, tasknode);
+    assertTrue(version1 > version);
+    assertEquals(1, numRescanPresent());
+    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
+    assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
+        taskstate));
+
+    setRescanNodeDone(1);
+
+    waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
+
+    assertEquals(0, numRescanPresent());
+    return;
+  }
+
+  @Test
+  public void testTaskDone() throws Exception {
+    LOG.info("TestTaskDone - cleanup task node once in DONE state");
+
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    TaskBatch batch = new TaskBatch();
+    String tasknode = submitTaskAndWait(batch, "foo");
+    ZKUtil.setData(zkw, tasknode, TaskState.TASK_DONE.get("worker"));
+    synchronized (batch) {
+      while (batch.installed != batch.done) {
+        batch.wait();
+      }
+    }
+    waitForCounter(tot_mgr_task_deleted, 0, 1, 1000);
+    assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
+  }
+
+  @Test
+  public void testTaskErr() throws Exception {
+    LOG.info("TestTaskErr - cleanup task node once in ERR state");
+
+    conf.setInt("hbase.splitlog.max.resubmit", 0);
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    TaskBatch batch = new TaskBatch();
+
+    String tasknode = submitTaskAndWait(batch, "foo");
+    ZKUtil.setData(zkw, tasknode, TaskState.TASK_ERR.get("worker"));
+    synchronized (batch) {
+      while (batch.installed != batch.error) {
+        batch.wait();
+      }
+    }
+    waitForCounter(tot_mgr_task_deleted, 0, 1, 1000);
+    assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
+    conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT);
+  }
+
+  @Test
+  public void testTaskResigned() throws Exception {
+    LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
+
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    TaskBatch batch = new TaskBatch();
+    String tasknode = submitTaskAndWait(batch, "foo");
+    ZKUtil.setData(zkw, tasknode, TaskState.TASK_RESIGNED.get("worker"));
+    int version = ZKUtil.checkExists(zkw, tasknode);
+
+    waitForCounter(tot_mgr_resubmit, 0, 1, 1000);
+    int version1 = ZKUtil.checkExists(zkw, tasknode);
+    assertTrue(version1 > version);
+    assertEquals(1, numRescanPresent());
+
+    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
+    assertTrue(Arrays.equals(taskstate,
+        TaskState.TASK_UNASSIGNED.get("dummy-master")));
+  }
+
+  @Test
+  public void testUnassignedTimeout() throws Exception {
+    LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
+        " resubmit");
+
+    // create an orphan task in OWNED state
+    String tasknode1 = ZKSplitLog.getNodeName(zkw, "orphan");
+    zkw.getZooKeeper().create(tasknode1,
+        TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    int to = 1000;
+    conf.setInt("hbase.splitlog.manager.timeout", to);
+    conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
+    conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+
+
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
+
+
+    // submit another task which will stay in unassigned mode
+    TaskBatch batch = new TaskBatch();
+    submitTaskAndWait(batch, "foo");
+
+    // keep updating the orphan owned node every to/2 seconds
+    for (int i = 0; i < (3 * to)/100; i++) {
+      Thread.sleep(100);
+      ZKUtil.setData(zkw, tasknode1,
+          TaskState.TASK_OWNED.get("dummy-worker"));
+    }
+
+    // since all the nodes in the system are not unassigned the
+    // unassigned_timeout must not have kicked in
+    assertEquals(0, numRescanPresent());
+
+    // since we have stopped heartbeating the owned node therefore it should
+    // get resubmitted
+    LOG.info("waiting for manager to resubmit the orphan task");
+    waitForCounter(tot_mgr_resubmit, 0, 1, to + 500);
+    assertEquals(1, numRescanPresent());
+
+    // now all the nodes are unassigned. manager should post another rescan
+    waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + 500);
+    assertEquals(2, numRescanPresent());
+  }
+}