You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/25 00:30:32 UTC

svn commit: r1188419 [2/3] - in /hbase/branches/0.89-fb: ./ src/main/java/org/apache/hadoop/hbase/mapreduce/ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/regions...

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Mon Oct 24 22:30:31 2011
@@ -1606,7 +1606,7 @@ public class HLog implements Syncable {
     return pattern.matcher(filename).matches();
   }
 
-  private static Path getHLogArchivePath(Path oldLogDir, Path p) {
+  static Path getHLogArchivePath(Path oldLogDir, Path p) {
     return new Path(oldLogDir, p.getName());
   }
 
@@ -1756,7 +1756,8 @@ public class HLog implements Syncable {
           WriterAndPath wap = logWriters.get(region);
           for (Entry logEntry: entries) {
             if (wap == null) {
-              Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir);
+              Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir,
+                  true);
               if (fs.exists(regionedits)) {
                 LOG.warn("Found existing old edits file. It could be the " +
                   "result of a previous failed split attempt. Deleting " +
@@ -1799,54 +1800,66 @@ public class HLog implements Syncable {
    * @param conf
    * @throws IOException
    */
-  private static void archiveLogs(final List<Path> corruptedLogs,
-    final List<Path> processedLogs, final Path oldLogDir,
-    final FileSystem fs, final Configuration conf)
+  static void archiveLogs(final List<Path> corruptedLogs,
+      final List<Path> processedLogs, final Path oldLogDir,
+      final FileSystem fs, final Configuration conf)
   throws IOException{
     final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR),
       conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
 
-    fs.mkdirs(corruptDir);
-    fs.mkdirs(oldLogDir);
-
+    if (!fs.exists(corruptDir) && !fs.mkdirs(corruptDir)) {
+      LOG.warn("Unable to mkdir " + corruptDir);
+    }
+    if (!fs.exists(oldLogDir) && !fs.mkdirs(oldLogDir)) {
+      LOG.warn("Unable to mkdir " + oldLogDir);
+    }
     for (Path corrupted: corruptedLogs) {
       Path p = new Path(corruptDir, corrupted.getName());
-      LOG.info("Moving corrupted log " + corrupted + " to " + p);
-      fs.rename(corrupted, p);
+      if (!fs.rename(corrupted, p)) {
+        LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
+      } else {
+        LOG.info("Moving corrupted log " + corrupted + " to " + p);
+      }
     }
 
     for (Path p: processedLogs) {
       Path newPath = getHLogArchivePath(oldLogDir, p);
-      fs.rename(p, newPath);
-      LOG.info("Archived processed log " + p + " to " + newPath);
+      if (!fs.rename(p, newPath)) {
+        LOG.warn("Unable to move processed log " + p + " to " + newPath);
+      } else {
+        LOG.info("Archived processed log " + p + " to " + newPath);
+      }
     }
   }
 
-  /*
+  /**
    * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
    * <code>logEntry</code> named for the sequenceid in the passed
-   * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
-   * This method also ensures existence of RECOVERED_EDITS_DIR under the region
-   * creating it if necessary.
+   * <code>logEntry</code>: e.g.
+   * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures
+   * existence of RECOVERED_EDITS_DIR under the region creating it if necessary.
+   *
    * @param fs
    * @param logEntry
    * @param rootDir HBase root dir.
+   * @param isCreate if true create the directory, otherwise just return the name
    * @return Path to file into which to dump split log edits.
    * @throws IOException
    */
-  private static Path getRegionSplitEditsPath(final FileSystem fs,
-      final Entry logEntry, final Path rootDir)
+  static Path getRegionSplitEditsPath(final FileSystem fs,
+      final Entry logEntry, final Path rootDir, boolean isCreate)
   throws IOException {
-    Path tableDir = HTableDescriptor.getTableDir(rootDir,
-      logEntry.getKey().getTablename());
+    Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey()
+        .getTablename());
     Path regiondir = HRegion.getRegionDir(tableDir,
-      HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
+        HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
     Path dir = getRegionDirRecoveredEditsDir(regiondir);
-    if (!fs.exists(dir)) {
+
+    if (isCreate && !fs.exists(dir)) {
       if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
     }
-    return new Path(dir,
-      formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum()));
+    return new Path(dir, formatRecoveredEditsFileName(logEntry.getKey()
+        .getLogSeqNum()));
    }
 
   static String formatRecoveredEditsFileName(final long seqid) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java Mon Oct 24 22:30:31 2011
@@ -179,6 +179,30 @@ public class HLogKey implements Writable
     return result;
   }
 
+  /**
+   * Drop this instance's tablename byte array and instead hold a reference to
+   * the provided tablename. This is not meant to be a general purpose setter -
+   * it's only used to collapse references to conserve memory.
+   */
+  void internTableName(byte[] tablename) {
+    // We should not use this as a setter - only to swap
+    // in a new reference to the same table name.
+    assert Bytes.equals(tablename, this.tablename);
+    this.tablename = tablename;
+  }
+
+  /**
+   * Drop this instance's region name byte array and instead hold a reference to
+   * the provided region name. This is not meant to be a general purpose setter
+   * - it's only used to collapse references to conserve memory.
+   */
+  void internEncodedRegionName(byte[] encodedRegionName) {
+    // We should not use this as a setter - only to swap
+    // in a new reference to the same table name.
+    assert Bytes.equals(this.regionName, encodedRegionName);
+    this.regionName = encodedRegionName;
+  }
+
   public void write(DataOutput out) throws IOException {
     Bytes.writeByteArray(out, this.regionName);
     Bytes.writeByteArray(out, this.tablename);

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,555 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.io.MultipleIOException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * This class is responsible for splitting up a bunch of regionserver commit log
+ * files that are no longer being written to, into new files, one per region for
+ * region to replay on startup. Delete the old log files when finished.
+ */
+public class HLogSplitter {
+
+  private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
+
+
+  static final Log LOG = LogFactory.getLog(HLogSplitter.class);
+
+
+  // Parameters for split process
+  protected final Path rootDir;
+  protected final Path srcDir;
+  protected final Path oldLogDir;
+  protected final FileSystem fs;
+  protected final Configuration conf;
+
+  // If an exception is thrown by one of the other threads, it will be
+  // stored here.
+  protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+
+  // Wait/notify for when data has been produced by the reader thread,
+  // consumed by the reader thread, or an exception occurred
+  Object dataAvailable = new Object();
+
+  private MonitoredTask status;
+
+
+  /**
+   * Create a new HLogSplitter using the given {@link Configuration} and the
+   * <code>hbase.hlog.splitter.impl</code> property to derived the instance
+   * class to use.
+   *
+   * @param rootDir hbase directory
+   * @param srcDir logs directory
+   * @param oldLogDir directory where processed logs are archived to
+   * @param logfiles the list of log files to split
+   */
+  public static HLogSplitter createLogSplitter(Configuration conf,
+      final Path rootDir, final Path srcDir,
+      Path oldLogDir, final FileSystem fs)  {
+
+    @SuppressWarnings("unchecked")
+    Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf
+        .getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
+    try {
+       Constructor<? extends HLogSplitter> constructor =
+         splitterClass.getConstructor(
+          Configuration.class, // conf
+          Path.class, // rootDir
+          Path.class, // srcDir
+          Path.class, // oldLogDir
+          FileSystem.class); // fs
+      return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs);
+    } catch (IllegalArgumentException e) {
+      throw new RuntimeException(e);
+    } catch (InstantiationException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException(e);
+    } catch (InvocationTargetException e) {
+      throw new RuntimeException(e);
+    } catch (SecurityException e) {
+      throw new RuntimeException(e);
+    } catch (NoSuchMethodException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
+      Path oldLogDir, FileSystem fs) {
+    this.conf = conf;
+    this.rootDir = rootDir;
+    this.srcDir = srcDir;
+    this.oldLogDir = oldLogDir;
+    this.fs = fs;
+  }
+
+  /**
+   * Splits a HLog file into a temporary staging area. tmpname is used to build
+   * the name of the staging area where the recovered-edits will be separated
+   * out by region and stored.
+   * <p>
+   * If the log file has N regions then N recovered.edits files will be
+   * produced. There is no buffering in this code. Instead it relies on the
+   * buffering in the SequenceFileWriter.
+   * <p>
+   * @param rootDir
+   * @param tmpname
+   * @param logfile
+   * @param fs
+   * @param conf
+   * @param reporter
+   * @return false if it is interrupted by the progress-able.
+   * @throws IOException
+   */
+  static public boolean splitLogFileToTemp(Path rootDir, String tmpname,
+      FileStatus logfile, FileSystem fs,
+      Configuration conf, CancelableProgressable reporter) throws IOException {
+    HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */,
+        fs);
+    return s.splitLogFileToTemp(logfile, tmpname, reporter);
+  }
+
+  public boolean splitLogFileToTemp(FileStatus logfile, String tmpname,
+      CancelableProgressable reporter)  throws IOException {
+    final Map<byte[], Object> logWriters = Collections.
+    synchronizedMap(new TreeMap<byte[], Object>(Bytes.BYTES_COMPARATOR));
+    boolean isCorrupted = false;
+
+    Preconditions.checkState(status == null);
+    status = TaskMonitor.get().createStatus(
+        "Splitting log file " + logfile.getPath() +
+        "into a temporary staging area.");
+
+    Object BAD_WRITER = new Object();
+
+    boolean progress_failed = false;
+
+    boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);
+    int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
+    // How often to send a progress report (default 1/2 master timeout)
+    int period = conf.getInt("hbase.splitlog.report.period",
+        conf.getInt("hbase.splitlog.manager.timeout",
+            ZKSplitLog.DEFAULT_TIMEOUT) / 2);
+    Path logPath = logfile.getPath();
+    long logLength = logfile.getLen();
+    LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
+    status.setStatus("Opening log file");
+    Reader in = null;
+    try {
+      in = getReader(fs, logfile, conf, skipErrors);
+    } catch (CorruptedLogFileException e) {
+      LOG.warn("Could not get reader, corrupted log file " + logPath, e);
+      ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+      isCorrupted = true;
+    }
+    if (in == null) {
+      status.markComplete("Was nothing to split in log file");
+      LOG.warn("Nothing to split in log file " + logPath);
+      return true;
+    }
+    long t = EnvironmentEdgeManager.currentTimeMillis();
+    long last_report_at = t;
+    if (reporter != null && reporter.progress() == false) {
+      status.markComplete("Failed: reporter.progress asked us to terminate");
+      return false;
+    }
+    int editsCount = 0;
+    Entry entry;
+    try {
+      while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
+        byte[] region = entry.getKey().getRegionName();
+        Object o = logWriters.get(region);
+        if (o == BAD_WRITER) {
+          continue;
+        }
+        WriterAndPath wap = (WriterAndPath)o;
+        if (wap == null) {
+          wap = createWAP(region, entry, rootDir, tmpname, fs, conf);
+          if (wap == null) {
+            // ignore edits from this region. It doesn't exist anymore.
+            // It was probably already split.
+            logWriters.put(region, BAD_WRITER);
+            continue;
+          } else {
+            logWriters.put(region, wap);
+          }
+        }
+        wap.w.append(entry);
+        editsCount++;
+        if (editsCount % interval == 0) {
+          status.setStatus("Split " + editsCount + " edits");
+          long t1 = EnvironmentEdgeManager.currentTimeMillis();
+          if ((t1 - last_report_at) > period) {
+            last_report_at = t;
+            if (reporter != null && reporter.progress() == false) {
+              status.setStatus("Failed: reporter.progress asked us to terminate");
+              progress_failed = true;
+              return false;
+            }
+          }
+        }
+      }
+    } catch (CorruptedLogFileException e) {
+      LOG.warn("Could not parse, corrupted log file " + logPath, e);
+      ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+      isCorrupted = true;
+    } catch (IOException e) {
+      e = RemoteExceptionHandler.checkIOException(e);
+      throw e;
+    } finally {
+      int n = 0;
+      for (Object o : logWriters.values()) {
+        long t1 = EnvironmentEdgeManager.currentTimeMillis();
+        if ((t1 - last_report_at) > period) {
+          last_report_at = t;
+          if ((progress_failed == false) && (reporter != null) &&
+              (reporter.progress() == false)) {
+            progress_failed = true;
+          }
+        }
+        if (o == BAD_WRITER) {
+          continue;
+        }
+        n++;
+        WriterAndPath wap = (WriterAndPath)o;
+        wap.w.close();
+        LOG.debug("Closed " + wap.p);
+      }
+      String msg = "processed " + editsCount + " edits across " + n +
+          " regions" + " threw away edits for " + (logWriters.size() - n) +
+          " regions" + " log file = " + logPath + " is corrupted = " +
+          isCorrupted + " progress interrupted? = " + progress_failed;
+      LOG.info(msg);
+      status.markComplete(msg);
+    }
+    if (progress_failed) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Completes the work done by splitLogFileToTemp by moving the
+   * recovered.edits from the staging area to the respective region server's
+   * directories.
+   * <p>
+   * It is invoked by SplitLogManager once it knows that one of the
+   * SplitLogWorkers have completed the splitLogFileToTemp() part. If the
+   * master crashes then this function might get called multiple times.
+   * <p>
+   * @param tmpname
+   * @param conf
+   * @throws IOException
+   */
+  public static void moveRecoveredEditsFromTemp(String tmpname,
+      String logfile, Configuration conf)
+  throws IOException{
+    Path rootdir = FSUtils.getRootDir(conf);
+    Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
+    moveRecoveredEditsFromTemp(tmpname, rootdir, oldLogDir, logfile, conf);
+  }
+
+  public static void moveRecoveredEditsFromTemp(String tmpname,
+      Path rootdir, Path oldLogDir,
+      String logfile, Configuration conf)
+  throws IOException{
+    List<Path> processedLogs = new ArrayList<Path>();
+    List<Path> corruptedLogs = new ArrayList<Path>();
+    FileSystem fs;
+    fs = rootdir.getFileSystem(conf);
+    Path logPath = new Path(logfile);
+    if (ZKSplitLog.isCorrupted(rootdir, tmpname, fs)) {
+      corruptedLogs.add(logPath);
+    } else {
+      processedLogs.add(logPath);
+    }
+    Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
+    List<FileStatus> files = listAll(fs, stagingDir);
+    for (FileStatus f : files) {
+      Path src = f.getPath();
+      Path dst = ZKSplitLog.stripSplitLogTempDir(rootdir, src);
+      if (ZKSplitLog.isCorruptFlagFile(dst)) {
+        continue;
+      }
+      if (fs.exists(dst)) {
+        fs.delete(dst, false);
+      } else {
+        Path dstdir = dst.getParent();
+        if (!fs.exists(dstdir)) {
+          if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
+        }
+      }
+      fs.rename(src, dst);
+      LOG.debug(" moved " + src + " => " + dst);
+    }
+    HLog.archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
+    fs.delete(stagingDir, true);
+    return;
+  }
+
+  private static List<FileStatus> listAll(FileSystem fs, Path dir)
+  throws IOException {
+    List<FileStatus> fset = new ArrayList<FileStatus>(100);
+    FileStatus [] files = fs.listStatus(dir);
+    if (files != null) {
+      for (FileStatus f : files) {
+        if (f.isDir()) {
+          fset.addAll(listAll(fs, f.getPath()));
+        } else {
+          fset.add(f);
+        }
+      }
+    }
+    return fset;
+  }
+
+  /**
+   * Create a new {@link Reader} for reading logs to split.
+   *
+   * @param fs
+   * @param file
+   * @param conf
+   * @return A new Reader instance
+   * @throws IOException
+   * @throws CorruptedLogFile
+   */
+  protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
+      boolean skipErrors)
+      throws IOException, CorruptedLogFileException {
+    Path path = file.getPath();
+    long length = file.getLen();
+    Reader in;
+
+
+    // Check for possibly empty file. With appends, currently Hadoop reports a
+    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
+    // HDFS-878 is committed.
+    if (length <= 0) {
+      LOG.warn("File " + path + " might be still open, length is 0");
+    }
+
+    try {
+      recoverFileLease(fs, path, conf);
+      try {
+        in = getReader(fs, path, conf);
+      } catch (EOFException e) {
+        if (length <= 0) {
+          // TODO should we ignore an empty, not-last log file if skip.errors
+          // is false? Either way, the caller should decide what to do. E.g.
+          // ignore if this is the last log in sequence.
+          // TODO is this scenario still possible if the log has been
+          // recovered (i.e. closed)
+          LOG.warn("Could not open " + path + " for reading. File is empty", e);
+          return null;
+        } else {
+          // EOFException being ignored
+          return null;
+        }
+      }
+    } catch (IOException e) {
+      if (!skipErrors) {
+        throw e;
+      }
+      CorruptedLogFileException t =
+        new CorruptedLogFileException("skipErrors=true Could not open hlog " +
+            path + " ignoring");
+      t.initCause(e);
+      throw t;
+    }
+    return in;
+  }
+
+  static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
+  throws CorruptedLogFileException, IOException {
+    try {
+      return in.next();
+    } catch (EOFException eof) {
+      // truncated files are expected if a RS crashes (see HBASE-2643)
+      LOG.info("EOF from hlog " + path + ".  continuing");
+      return null;
+    } catch (IOException e) {
+      // If the IOE resulted from bad file format,
+      // then this problem is idempotent and retrying won't help
+      if (e.getCause() != null &&
+          (e.getCause() instanceof ParseException ||
+           e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
+        LOG.warn("Parse exception " + e.getCause().toString() + " from hlog "
+           + path + ".  continuing");
+        return null;
+      }
+      if (!skipErrors) {
+        throw e;
+      }
+      CorruptedLogFileException t =
+        new CorruptedLogFileException("skipErrors=true Ignoring exception" +
+            " while parsing hlog " + path + ". Marking as corrupted");
+      t.initCause(e);
+      throw t;
+    }
+  }
+
+
+
+  /**
+   * Create a new {@link Writer} for writing log splits.
+   */
+  protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
+      throws IOException {
+    return HLog.createWriter(fs, logfile, conf);
+  }
+
+  /**
+   * Create a new {@link Reader} for reading logs to split.
+   */
+  protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
+      throws IOException {
+    return HLog.getReader(fs, curLogFile, conf);
+  }
+
+
+  private WriterAndPath createWAP(byte[] region, Entry entry,
+      Path rootdir, String tmpname, FileSystem fs, Configuration conf)
+  throws IOException {
+    Path regionedits = HLog.getRegionSplitEditsPath(fs, entry, rootdir,
+        tmpname == null);
+    if (regionedits == null) {
+      return null;
+    }
+    if ((tmpname == null) && fs.exists(regionedits)) {
+      LOG.warn("Found existing old edits file. It could be the "
+          + "result of a previous failed split attempt. Deleting "
+          + regionedits + ", length="
+          + fs.getFileStatus(regionedits).getLen());
+      if (!fs.delete(regionedits, false)) {
+        LOG.warn("Failed delete of old " + regionedits);
+      }
+    }
+    Path editsfile;
+    if (tmpname != null) {
+      // During distributed log splitting the output by each
+      // SplitLogWorker is written to a temporary area.
+      editsfile = convertRegionEditsToTemp(rootdir, regionedits, tmpname);
+    } else {
+      editsfile = regionedits;
+    }
+    Writer w = createWriter(fs, editsfile, conf);
+    LOG.debug("Creating writer path=" + editsfile + " region="
+        + Bytes.toStringBinary(region));
+    return (new WriterAndPath(editsfile, w));
+  }
+
+  Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
+    List<String> components = new ArrayList<String>(10);
+    do {
+      components.add(edits.getName());
+      edits = edits.getParent();
+    } while (edits.depth() > rootdir.depth());
+    Path ret = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
+    for (int i = components.size() - 1; i >= 0; i--) {
+      ret = new Path(ret, components.get(i));
+    }
+    try {
+      if (fs.exists(ret)) {
+        LOG.warn("Found existing old temporary edits file. It could be the "
+            + "result of a previous failed split attempt. Deleting "
+            + ret + ", length="
+            + fs.getFileStatus(ret).getLen());
+        if (!fs.delete(ret, false)) {
+          LOG.warn("Failed delete of old " + ret);
+        }
+      }
+      Path dir = ret.getParent();
+      if (!fs.exists(dir)) {
+        if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
+      }
+    } catch (IOException e) {
+      LOG.warn("Could not prepare temp staging area ", e);
+      // ignore, exceptions will be thrown elsewhere
+    }
+    return ret;
+  }
+
+
+
+
+  /**
+   *  Private data structure that wraps a Writer and its Path,
+   *  also collecting statistics about the data written to this
+   *  output.
+   */
+  private final static class WriterAndPath {
+    final Path p;
+    final Writer w;
+
+    WriterAndPath(final Path p, final Writer w) {
+      this.p = p;
+      this.w = w;
+    }
+  }
+
+  static class CorruptedLogFileException extends Exception {
+    private static final long serialVersionUID = 1L;
+    CorruptedLogFileException(String s) {
+      super(s);
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/OrphanHLogAfterSplitException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/OrphanHLogAfterSplitException.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/OrphanHLogAfterSplitException.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/OrphanHLogAfterSplitException.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,40 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+
+public class OrphanHLogAfterSplitException extends IOException {
+
+  /**
+   * Create this exception without a message
+   */
+  public OrphanHLogAfterSplitException() {
+    super();
+  }
+
+  /**
+   * Create this exception with a message
+   * @param message why it failed
+   */
+  public OrphanHLogAfterSplitException(String message) {
+    super(message);
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java Mon Oct 24 22:30:31 2011
@@ -28,6 +28,7 @@ import java.util.NavigableMap;
 import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.io.Writable;
@@ -66,7 +67,7 @@ import org.apache.hadoop.io.Writable;
  * is an old style KeyValue or the new style WALEdit.
  *
  */
-public class WALEdit implements Writable {
+public class WALEdit implements Writable, HeapSize {
 
   private final int VERSION_2 = -1;
 
@@ -172,4 +173,17 @@ public class WALEdit implements Writable
     return sb.toString();
   }
 
+  @Override
+  public long heapSize() {
+    long ret = 0;
+    for (KeyValue kv : kvs) {
+      ret += kv.heapSize();
+    }
+    if (scopes != null) {
+      ret += ClassSize.TREEMAP;
+      ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY);
+    }
+    return ret;
+  }
+
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+/**
+ * Similar interface as {@link org.apache.hadoop.util.Progressable} but returns
+ * a boolean to support canceling the operation.
+ * <p>
+ * Used for doing updating of OPENING znode during log replay on region open.
+ */
+public interface CancelableProgressable {
+
+  /**
+   * Report progress.  Returns true if operations should continue, false if the
+   * operation should be canceled and rolled back.
+   * @return whether to continue (true) or cancel (false) the operation
+   */
+  public boolean progress();
+
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java Mon Oct 24 22:30:31 2011
@@ -1,25 +1,23 @@
 package org.apache.hadoop.hbase.zookeeper;
 
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.ZooKeeper.States;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 /**
  * http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling
@@ -396,6 +394,13 @@ public class RecoverableZooKeeper {
     }
   }
 
+  public void asyncCreate(String path, byte[] data, List<ACL> acl,
+      CreateMode createMode, final AsyncCallback.StringCallback cb,
+      final Object ctx) {
+    byte[] newData = appendMetaData(data);
+    zk.create(path, newData, acl, createMode, cb, ctx);
+  }
+
   /**
    * <p>
    * NONSEQUENTIAL create is idempotent operation.
@@ -423,11 +428,11 @@ public class RecoverableZooKeeper {
     switch (createMode) {
       case EPHEMERAL:
       case PERSISTENT:
-        return createNonSequential(path, newData, acl, createMode);
+      return createNonSequential(path, newData, acl, createMode);
 
       case EPHEMERAL_SEQUENTIAL:
       case PERSISTENT_SEQUENTIAL:
-        return createSequential(path, newData, acl, createMode);
+      return createSequential(path, newData, acl, createMode);
 
       default:
         throw new IllegalArgumentException("Unrecognized CreateMode: " +
@@ -541,7 +546,7 @@ public class RecoverableZooKeeper {
     return null;
   }
 
-  public byte[] removeMetaData(byte[] data) {
+  public static byte[] removeMetaData(byte[] data) {
     if(data == null || data.length == 0) {
       return data;
     }
@@ -625,4 +630,9 @@ public class RecoverableZooKeeper {
     return lockChildren;
   }
 
+  @Override
+  public String toString() {
+    return "RZK{identifier=>" + identifier + "}";
+  }
+
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,273 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Common methods and attributes used by {@link SplitLogManager} and
+ * {@link SplitLogWorker}
+ */
+public class ZKSplitLog {
+  private static final Log LOG = LogFactory.getLog(ZKSplitLog.class);
+
+  public static final int DEFAULT_TIMEOUT = 25000; // 25 sec
+  public static final int DEFAULT_ZK_RETRIES = 3;
+  public static final int DEFAULT_MAX_RESUBMIT = 3;
+  public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
+
+  /**
+   * Gets the full path node name for the log file being split
+   * @param zkw zk reference
+   * @param filename log file name (only the basename)
+   */
+  public static String getEncodedNodeName(ZooKeeperWrapper zkw,
+      String filename) {
+    return zkw.getZNode(zkw.splitLogZNode, encode(filename));
+  }
+
+  public static String getFileName(String node) {
+    String basename = node.substring(node.lastIndexOf('/') + 1);
+    return decode(basename);
+  }
+
+
+  public static String encode(String s) {
+    try {
+      return URLEncoder.encode(s, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("URLENCODER doesn't support UTF-8");
+    }
+  }
+
+  public static String decode(String s) {
+    try {
+      return URLDecoder.decode(s, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("URLDecoder doesn't support UTF-8");
+    }
+  }
+
+  public static String getRescanNode(ZooKeeperWrapper zkw) {
+    return zkw.getZNode(zkw.splitLogZNode, "RESCAN");
+  }
+
+  public static boolean isRescanNode(ZooKeeperWrapper zkw, String path) {
+    String prefix = getRescanNode(zkw);
+    if (path.length() < prefix.length()) {
+      return false;
+    }
+    for (int i = 0; i < prefix.length(); i++) {
+      if (prefix.charAt(i) != path.charAt(i)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public static boolean isTaskPath(ZooKeeperWrapper zkw, String path) {
+    String dirname = path.substring(0, path.lastIndexOf('/'));
+    return dirname.equals(zkw.splitLogZNode);
+  }
+
+  public static enum TaskState {
+    TASK_UNASSIGNED("unassigned"),
+    TASK_OWNED("owned"),
+    TASK_RESIGNED("resigned"),
+    TASK_DONE("done"),
+    TASK_ERR("err");
+
+    private final byte[] state;
+    private TaskState(String s) {
+      state = s.getBytes();
+    }
+
+    public byte[] get(String serverName) {
+      return (Bytes.add(state, " ".getBytes(), serverName.getBytes()));
+    }
+
+    public String getWriterName(byte[] data) {
+      String str = Bytes.toString(data);
+      return str.substring(str.indexOf(' ') + 1);
+    }
+
+
+    /**
+     * @param s
+     * @return True if {@link #state} is a prefix of s. False otherwise.
+     */
+    public boolean equals(byte[] s) {
+      if (s.length < state.length) {
+        return (false);
+      }
+      for (int i = 0; i < state.length; i++) {
+        if (state[i] != s[i]) {
+          return (false);
+        }
+      }
+      return (true);
+    }
+
+    public boolean equals(byte[] s, String serverName) {
+      return (Arrays.equals(s, get(serverName)));
+    }
+    @Override
+    public String toString() {
+      return new String(state);
+    }
+  }
+
+  public static Path getSplitLogDir(Path rootdir, String tmpname) {
+    return new Path(new Path(rootdir, "splitlog"), tmpname);
+  }
+
+  public static Path stripSplitLogTempDir(Path rootdir, Path file) {
+    int skipDepth = rootdir.depth() + 2;
+    List<String> components = new ArrayList<String>(10);
+    do {
+      components.add(file.getName());
+      file = file.getParent();
+    } while (file.depth() > skipDepth);
+    Path ret = rootdir;
+    for (int i = components.size() - 1; i >= 0; i--) {
+      ret = new Path(ret, components.get(i));
+    }
+    return ret;
+  }
+
+  public static String getSplitLogDirTmpComponent(String worker, String file) {
+    return (worker + "_" + ZKSplitLog.encode(file));
+  }
+
+  public static void markCorrupted(Path rootdir, String tmpname, FileSystem fs)
+  throws IOException {
+    Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
+    try {
+      fs.createNewFile(file);
+    } catch (IOException e) {
+      LOG.error("Could not flag a log file as corrupted. Failed to create "
+          + file);
+      throw e;
+    }
+  }
+
+  public static boolean isCorrupted(Path rootdir, String tmpname,
+      FileSystem fs) throws IOException {
+    Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
+    boolean isCorrupt;
+    isCorrupt = fs.exists(file);
+    return isCorrupt;
+  }
+
+  public static boolean isCorruptFlagFile(Path file) {
+    return file.getName().equals("corrupt");
+  }
+
+
+  public static class Counters {
+    //SplitLogManager counters
+    public static AtomicLong tot_mgr_log_split_batch_start = new AtomicLong(0);
+    public static AtomicLong tot_mgr_log_split_batch_success =
+      new AtomicLong(0);
+    public static AtomicLong tot_mgr_log_split_batch_err = new AtomicLong(0);
+    public static AtomicLong tot_mgr_new_unexpected_hlogs = new AtomicLong(0);
+    public static AtomicLong tot_mgr_log_split_start = new AtomicLong(0);
+    public static AtomicLong tot_mgr_log_split_success = new AtomicLong(0);
+    public static AtomicLong tot_mgr_log_split_err = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_create_queued = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_create_result = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_already_exists = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_create_err = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0);
+    public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0);
+    public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0);
+    public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0);
+    public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_delete_result = new AtomicLong(0);
+    public static AtomicLong tot_mgr_node_delete_err = new AtomicLong(0);
+    public static AtomicLong tot_mgr_resubmit = new AtomicLong(0);
+    public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0);
+    public static AtomicLong tot_mgr_null_data = new AtomicLong(0);
+    public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0);
+    public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0);
+    public static AtomicLong tot_mgr_resubmit_threshold_reached =
+      new AtomicLong(0);
+    public static AtomicLong tot_mgr_missing_state_in_delete =
+      new AtomicLong(0);
+    public static AtomicLong tot_mgr_heartbeat = new AtomicLong(0);
+    public static AtomicLong tot_mgr_rescan = new AtomicLong(0);
+    public static AtomicLong tot_mgr_rescan_deleted = new AtomicLong(0);
+    public static AtomicLong tot_mgr_task_deleted = new AtomicLong(0);
+    public static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0);
+    public static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0);
+    public static AtomicLong tot_mgr_resubmit_dead_server_task =
+      new AtomicLong(0);
+
+
+
+    // SplitLogWorker counters
+    public static AtomicLong tot_wkr_failed_to_grab_task_no_data =
+      new AtomicLong(0);
+    public static AtomicLong tot_wkr_failed_to_grab_task_exception =
+      new AtomicLong(0);
+    public static AtomicLong tot_wkr_failed_to_grab_task_owned =
+      new AtomicLong(0);
+    public static AtomicLong tot_wkr_failed_to_grab_task_lost_race =
+      new AtomicLong(0);
+    public static AtomicLong tot_wkr_task_acquired = new AtomicLong(0);
+    public static AtomicLong tot_wkr_task_resigned = new AtomicLong(0);
+    public static AtomicLong tot_wkr_task_done = new AtomicLong(0);
+    public static AtomicLong tot_wkr_task_err = new AtomicLong(0);
+    public static AtomicLong tot_wkr_task_heartbeat = new AtomicLong(0);
+    public static AtomicLong tot_wkr_task_acquired_rescan = new AtomicLong(0);
+    public static AtomicLong tot_wkr_get_data_queued = new AtomicLong(0);
+    public static AtomicLong tot_wkr_get_data_result = new AtomicLong(0);
+    public static AtomicLong tot_wkr_get_data_retry = new AtomicLong(0);
+    public static AtomicLong tot_wkr_preempt_task = new AtomicLong(0);
+    public static AtomicLong tot_wkr_task_heartbeat_failed = new AtomicLong(0);
+    public static AtomicLong tot_wkr_final_transistion_failed =
+      new AtomicLong(0);
+
+    public static void resetCounters() throws Exception {
+      Class<?> cl = (new Counters()).getClass();
+      Field[] flds = cl.getDeclaredFields();
+      for (Field fld : flds) {
+        ((AtomicLong)fld.get(null)).set(0);
+      }
+    }
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java Mon Oct 24 22:30:31 2011
@@ -20,6 +20,8 @@
 package org.apache.hadoop.hbase.zookeeper;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.InterruptedIOException;
@@ -29,7 +31,10 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -39,9 +44,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.Calendar;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -52,14 +54,16 @@ import org.apache.hadoop.hbase.HServerIn
 import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
 import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
 import org.apache.zookeeper.data.Stat;
 
@@ -141,6 +145,10 @@ public class ZooKeeperWrapper implements
    */
   private final String rgnsInTransitZNode;
   /*
+   * ZNode used for log splitting work assignment
+   */
+  public final String splitLogZNode;
+  /*
    * List of ZNodes in the unassgined region that are already being watched
    */
   private Set<String> unassignedZNodesWatched = new HashSet<String>();
@@ -201,6 +209,7 @@ public class ZooKeeperWrapper implements
     String masterAddressZNodeName = conf.get("zookeeper.znode.master", "master");
     String stateZNodeName      = conf.get("zookeeper.znode.state", "shutdown");
     String regionsInTransitZNodeName = conf.get("zookeeper.znode.regionInTransition", "UNASSIGNED");
+    String splitLogZNodeName   = conf.get("zookeeper.znode.splitlog", "splitlog");
 
     rootRegionZNode     = getZNode(parentZNode, rootServerZNodeName);
     rsZNode             = getZNode(parentZNode, rsZNodeName);
@@ -209,8 +218,9 @@ public class ZooKeeperWrapper implements
     clusterStateZNode   = getZNode(parentZNode, stateZNodeName);
     int retryNum = conf.getInt("zookeeper.connection.retry.num", 6);
     int retryFreq = conf.getInt("zookeeper.connection.retry.freq", 1000);
-    zkDumpConnectionTimeOut = conf.getInt("zookeeper.dump.connection.timeout", 
+    zkDumpConnectionTimeOut = conf.getInt("zookeeper.dump.connection.timeout",
         1000);
+    splitLogZNode       = getZNode(parentZNode, splitLogZNodeName);
     connectToZk(retryNum,retryFreq);
   }
 
@@ -1002,22 +1012,33 @@ public class ZooKeeperWrapper implements
   }
 
   public byte[] getData(String parentZNode, String znode) {
-    return getDataAndWatch(parentZNode, znode, null);
+    return getData(parentZNode, znode, null);
+  }
+
+  public byte[] getData(String parentZNode, String znode, Stat stat) {
+    return getDataAndWatch(parentZNode, znode, null, stat);
   }
 
-  public byte[] getDataAndWatch(String parentZNode,
-                                String znode, Watcher watcher) {
+  public byte[] getDataAndWatch(String parentZNode, String znode,
+      Watcher watcher) {
+    return getDataAndWatch(parentZNode, znode, watcher, null);
+  }
+
+  public byte[] getDataAndWatch(String parentZNode, String znode,
+      Watcher watcher, Stat stat) {
     byte[] data = null;
     try {
-      String path = joinPath(parentZNode, znode);
-      // TODO: ZK-REFACTOR: remove existance check?
+      String path = getZNode(parentZNode, znode);
+      // TODO: ZK-REFACTOR: remove existence check?
       if (checkExistenceOf(path)) {
-        data = recoverableZK.getData(path, watcher, null);
+        data = recoverableZK.getData(path, watcher, stat);
       }
     } catch (KeeperException e) {
-      LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e);
+      LOG.warn("<" + instanceName + ">" + "Failed to read " + znode
+          + " znode in ZooKeeper: " + e);
     } catch (InterruptedException e) {
-      LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e);
+      LOG.warn("<" + instanceName + ">" + "Failed to read " + znode
+          + " znode in ZooKeeper: " + e);
     }
     return data;
   }
@@ -1164,10 +1185,10 @@ public class ZooKeeperWrapper implements
       LOG.error("<" + instanceName + ">" + "Failed to create ZNode " + fullyQualifiedZNodeName + " in ZooKeeper", e);
       return null;
     }
-      // watch the znode for deletion, data change, creation of children
-      if(watch) {
-        watchZNode(zNodeName);
-      }
+    // watch the znode for deletion, data change, creation of children
+    if (watch) {
+      watchZNode(zNodeName);
+    }
 
     return fullyQualifiedZNodeName;
   }
@@ -1371,6 +1392,289 @@ public class ZooKeeperWrapper implements
     return newNodes;
   }
 
+  /**
+   * Check if the specified node exists. Sets no watches.
+   *
+   * Returns true if node exists, false if not. Returns an exception if there
+   * is an unexpected zookeeper exception.
+   *
+   * @param znode path of node to watch
+   * @return version of the node if it exists, -1 if does not exist
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public int checkExists(String znode)
+  throws KeeperException {
+    try {
+      Stat s = recoverableZK.exists(znode, null);
+      return s != null ? s.getVersion() : -1;
+    } catch (KeeperException e) {
+      LOG.warn(recoverableZK + " Unable to set watcher on znode (" + znode + ")", e);
+      keeperException(e);
+      return -1;
+    } catch (InterruptedException e) {
+      LOG.warn(recoverableZK + " Unable to set watcher on znode (" + znode + ")", e);
+      interruptedException(e);
+      return -1;
+    }
+  }
+
+  /**
+   * Watch the specified znode for delete/create/change events.  The watcher is
+   * set whether or not the node exists.  If the node already exists, the method
+   * returns true.  If the node does not exist, the method returns false.
+   *
+   * @param znode path of node to watch
+   * @return true if znode exists, false if does not exist or error
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public boolean watchAndCheckExists(String znode)
+  throws KeeperException {
+    try {
+      Stat s = recoverableZK.exists(znode, this);
+      LOG.debug(this + " Set watcher on existing znode " + znode);
+      return s != null;
+    } catch (KeeperException e) {
+      LOG.warn(this + " Unable to set watcher on znode " + znode, e);
+      keeperException(e);
+      return false;
+    } catch (InterruptedException e) {
+      LOG.warn(this + " Unable to set watcher on znode " + znode, e);
+      interruptedException(e);
+      return false;
+    }
+  }
+
+  /**
+   * Lists the children of the specified znode without setting any watches.
+   *
+   * Used to list the currently online regionservers and their addresses.
+   *
+   * Sets no watches at all, this method is best effort.
+   *
+   * Returns an empty list if the node has no children.  Returns null if the
+   * parent node itself does not exist.
+   *
+   * @param znode node to get children of as addresses
+   * @return list of data of children of specified znode, empty if no children,
+   *         null if parent does not exist
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public List<String> listChildrenNoWatch(String znode)
+  throws KeeperException {
+    List<String> children = null;
+    try {
+      // List the children without watching
+      children = recoverableZK.getChildren(znode, null);
+    } catch(KeeperException.NoNodeException nne) {
+      return null;
+    } catch(InterruptedException ie) {
+      interruptedException(ie);
+    }
+    return children;
+  }
+
+  /**
+   * Lists the children znodes of the specified znode.  Also sets a watch on
+   * the specified znode which will capture a NodeDeleted event on the specified
+   * znode as well as NodeChildrenChanged if any children of the specified znode
+   * are created or deleted.
+   *
+   * Returns null if the specified node does not exist.  Otherwise returns a
+   * list of children of the specified node.  If the node exists but it has no
+   * children, an empty list will be returned.
+   *
+   * @param znode path of node to list and watch children of
+   * @return list of children of the specified node, an empty list if the node
+   *          exists but has no children, and null if the node does not exist
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public List<String> listChildrenAndWatchForNewChildren(String znode)
+  throws KeeperException {
+    try {
+      List<String> children = recoverableZK.getChildren(znode, this);
+      return children;
+    } catch(KeeperException.NoNodeException ke) {
+      LOG.debug(recoverableZK + " Unable to list children of znode " + znode +
+          " because node does not exist (not an error)");
+      return null;
+    } catch (KeeperException e) {
+      LOG.warn(recoverableZK + " Unable to list children of znode " + znode, e);
+      keeperException(e);
+      return null;
+    } catch (InterruptedException e) {
+      LOG.warn(recoverableZK + " Unable to list children of znode " + znode, e);
+      interruptedException(e);
+      return null;
+    }
+  }
+
+  /**
+   * Sets the data of the existing znode to be the specified data.  The node
+   * must exist but no checks are done on the existing data or version.
+   *
+   * <p>If the node does not exist, a {@link NoNodeException} will be thrown.
+   *
+   * <p>No watches are set but setting data will trigger other watchers of this
+   * node.
+   *
+   * <p>If there is another problem, a KeeperException will be thrown.
+   *
+   * @param znode path of node
+   * @param data data to set for node
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public boolean setData(String znode, byte[] data)
+  throws KeeperException, KeeperException.NoNodeException {
+    return setData(znode, data, -1);
+  }
+
+  /**
+   * Sets the data of the existing znode to be the specified data.  Ensures that
+   * the current data has the specified expected version.
+   *
+   * <p>If the node does not exist, a {@link NoNodeException} will be thrown.
+   *
+   * <p>If their is a version mismatch, method returns null.
+   *
+   * <p>No watches are set but setting data will trigger other watchers of this
+   * node.
+   *
+   * <p>If there is another problem, a KeeperException will be thrown.
+   *
+   * @param znode path of node
+   * @param data data to set for node
+   * @param expectedVersion version expected when setting data
+   * @return true if data set, false if version mismatch
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public boolean setData(String znode, byte [] data, int expectedVersion)
+  throws KeeperException, KeeperException.NoNodeException {
+    try {
+      return recoverableZK.setData(znode, data, expectedVersion) != null;
+    } catch (InterruptedException e) {
+      interruptedException(e);
+      return false;
+    }
+  }
+
+  /**
+   * Sets the data of the existing znode to be the specified data.  Ensures that
+   * the current data has the specified expected version.
+   *
+   * <p>If the node does not exist, a {@link NoNodeException} will be thrown.
+   *
+   * <p>If their is a version mismatch, method returns null.
+   *
+   * <p>No watches are set but setting data will trigger other watchers of this
+   * node.
+   *
+   * <p>If there is another problem, a KeeperException will be thrown.
+   *
+   * @param znode path of node
+   * @param data data to set for node
+   * @param expectedVersion version expected when setting data
+   * @return stat of which returned by setData
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public Stat setDataGetStat(String znode, byte [] data, int expectedVersion)
+  throws KeeperException, KeeperException.NoNodeException, InterruptedException {
+    return recoverableZK.setData(znode, data, expectedVersion);
+  }
+
+  /**
+   * Async creates the specified node with the specified data.
+   *
+   * <p>Throws an exception if the node already exists.
+   *
+   * <p>The node created is persistent and open access.
+   *
+   * @param znode path of node to create
+   * @param data data of node to create
+   * @param cb
+   * @param ctx
+   * @throws KeeperException if unexpected zookeeper exception
+   * @throws KeeperException.NodeExistsException if node already exists
+   */
+  public void asyncCreate(String znode, byte[] data, CreateMode createMode,
+      final AsyncCallback.StringCallback cb, final Object ctx) {
+    recoverableZK.asyncCreate(znode, data, Ids.OPEN_ACL_UNSAFE,
+       createMode, cb, ctx);
+  }
+
+  /**
+   * Delete the specified node and all of it's children.
+   *
+   * Sets no watches.  Throws all exceptions besides dealing with deletion of
+   * children.
+   */
+  public void deleteNodeRecursively(String node)
+  throws KeeperException {
+    try {
+      List<String> children = listChildrenNoWatch(node);
+      if (!children.isEmpty()) {
+        for (String child : children) {
+          deleteNodeRecursively(joinPath(node, child));
+        }
+      }
+      recoverableZK.delete(node, -1);
+    } catch(InterruptedException ie) {
+      interruptedException(ie);
+    }
+  }
+
+  /**
+   * Delete all the children of the specified node but not the node itself.
+   *
+   * Sets no watches.  Throws all exceptions besides dealing with deletion of
+   * children.
+   */
+  public void deleteChildrenRecursively(String node)
+  throws KeeperException {
+    List<String> children = listChildrenNoWatch(node);
+    if (children != null && !children.isEmpty()) {
+      for (String child : children) {
+        deleteNodeRecursively(joinPath(node, child));
+      }
+    }
+  }
+
+  /**
+   * Handles InterruptedExceptions in client calls.
+   * <p>
+   * This may be temporary but for now this gives one place to deal with these.
+   * <p>
+   * TODO: Currently, this method does nothing. Is this ever expected to happen?
+   * Do we abort or can we let it run? Maybe this should be logged as WARN? It
+   * shouldn't happen?
+   * <p>
+   *
+   * @param ie
+   */
+  public void interruptedException(InterruptedException ie) {
+    LOG.debug(recoverableZK
+        + " Received InterruptedException, doing nothing here", ie);
+    // At least preserver interrupt.
+    Thread.currentThread().interrupt();
+    // no-op
+  }
+
+  /**
+   * Handles KeeperExceptions in client calls.
+   * <p>
+   * This may be temporary but for now this gives one place to deal with these.
+   * <p>
+   * TODO: Currently this method rethrows the exception to let the caller handle
+   * <p>
+   *
+   * @param ke
+   * @throws KeeperException
+   */
+  public void keeperException(KeeperException ke) throws KeeperException {
+    LOG.error(recoverableZK
+        + " Received unexpected KeeperException, re-throwing exception", ke);
+    throw ke;
+  }
+
   public static class ZNodePathAndData {
     private String zNodePath;
     private byte[] data;

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Mon Oct 24 22:30:31 2011
@@ -418,6 +418,27 @@ public class HBaseTestingUtility {
     return createTable(tableName, new byte[][]{family});
   }
 
+
+  public HTable createTable(byte[] tableName, byte[][] families,
+      int numVersions, byte[] startKey, byte[] endKey, int numRegions)
+  throws IOException{
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    for (byte[] family : families) {
+      HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions,
+          HColumnDescriptor.DEFAULT_COMPRESSION,
+          HColumnDescriptor.DEFAULT_IN_MEMORY,
+          HColumnDescriptor.DEFAULT_BLOCKCACHE,
+          Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL,
+          HColumnDescriptor.DEFAULT_BLOOMFILTER,
+          HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
+      desc.addFamily(hcd);
+    }
+    (new HBaseAdmin(getConfiguration())).createTable(desc, startKey,
+        endKey, numRegions);
+    return new HTable(getConfiguration(), tableName);
+  }
+
+
   /**
    * Create a table.
    * @param tableName

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,412 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestDistributedLogSplitting {
+  private static final Log LOG = LogFactory.getLog(TestDistributedLogSplitting.class);
+  static {
+    Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
+  }
+
+  final int NUM_RS = 6;
+
+  MiniHBaseCluster cluster;
+  HMaster master;
+  Configuration conf;
+  HBaseTestingUtility TEST_UTIL;
+
+  @Before
+  public void before() throws Exception {
+
+  }
+
+  private void startCluster(int num_rs) throws Exception{
+    ZKSplitLog.Counters.resetCounters();
+    LOG.info("Starting cluster");
+    conf = HBaseConfiguration.create();
+    conf.setInt("hbase.regionserver.info.port", -1);
+    conf.setFloat("hbase.regions.slop", (float)100.0); // no load balancing
+    conf.setBoolean("hbase.master.distributed.log.splitting", true);
+    TEST_UTIL = new HBaseTestingUtility(conf);
+    cluster = TEST_UTIL.startMiniCluster(num_rs);
+    int live_rs;
+    while ((live_rs = cluster.getLiveRegionServerThreads().size()) < num_rs) {
+      LOG.info(live_rs + " out of " + num_rs + " started, waiting ...");
+      Thread.sleep(500);
+    }
+    master = cluster.getMaster();
+    while (!master.isActiveMaster() || master.isClosed() || !master.getIsSplitLogAfterStartupDone()) {
+      LOG.info("waiting for master to be ready");
+      Thread.sleep(500);
+    }
+  }
+
+  @After
+  public void after() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testThreeRSAbort() throws Exception {
+    LOG.info("testThreeRSAbort");
+    final int NUM_REGIONS_TO_CREATE = 40;
+    final int NUM_ROWS_PER_REGION = 100;
+
+    startCluster(NUM_RS);
+
+
+    HTable ht = installTable("table", "family", NUM_REGIONS_TO_CREATE);
+    populateDataInTable(NUM_ROWS_PER_REGION, "family");
+
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    assertEquals(NUM_RS, rsts.size());
+    rsts.get(0).getRegionServer().abort("testing");
+    rsts.get(1).getRegionServer().abort("testing");
+    rsts.get(2).getRegionServer().abort("testing");
+
+    long start = EnvironmentEdgeManager.currentTimeMillis();
+    while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) {
+      if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
+        assertTrue(false);
+      }
+      Thread.sleep(200);
+    }
+
+    start = EnvironmentEdgeManager.currentTimeMillis();
+    while (getAllOnlineRegions(cluster).size() < (NUM_REGIONS_TO_CREATE + 2)) {
+      if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
+        assertTrue(false);
+      }
+      Thread.sleep(200);
+    }
+
+    assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION,
+        TEST_UTIL.countRows(ht));
+  }
+
+  @Test(expected=OrphanHLogAfterSplitException.class)
+  public void testOrphanLogCreation() throws Exception {
+    LOG.info("testOrphanLogCreation");
+    startCluster(NUM_RS);
+    final SplitLogManager slm = master.getSplitLogManager();
+    final FileSystem fs = master.getFileSystem();
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    HRegionServer hrs = rsts.get(0).getRegionServer();
+    Path rootdir = FSUtils.getRootDir(conf);
+    final Path logDir = new Path(rootdir,
+        HLog.getHLogDirectoryName(hrs.getServerInfo().getServerName()));
+
+    installTable("table", "family", 40);
+
+    makeHLog(hrs.getLog(), hrs.getOnlineRegions(), "table",
+        1000, 100);
+
+    new Thread() {
+      public void run() {
+        while (true) {
+          int i = 0;
+          try {
+            while(ZKSplitLog.Counters.tot_mgr_log_split_batch_start.get() ==
+              0) {
+              Thread.yield();
+            }
+            fs.createNewFile(new Path(logDir, "foo" + i++));
+          } catch (Exception e) {
+            LOG.debug("file creation failed", e);
+            return;
+          }
+        }
+      }
+    }.start();
+    slm.splitLogDistributed(logDir);
+    FileStatus[] files = fs.listStatus(logDir);
+    if (files != null) {
+      for (FileStatus file : files) {
+        LOG.debug("file still there " + file.getPath());
+      }
+    }
+  }
+
+  @Test
+  public void testRecoveredEdits() throws Exception {
+    LOG.info("testRecoveredEdits");
+    startCluster(NUM_RS);
+    final int NUM_LOG_LINES = 1000;
+    final SplitLogManager slm = master.getSplitLogManager();
+    FileSystem fs = master.getFileSystem();
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    HRegionServer hrs = rsts.get(0).getRegionServer();
+    Path rootdir = FSUtils.getRootDir(conf);
+    final Path logDir = new Path(rootdir,
+        HLog.getHLogDirectoryName(hrs.getServerInfo().getServerName()));
+
+    installTable("table", "family", 40);
+    byte[] table = Bytes.toBytes("table");
+    Collection<HRegion> regions = new LinkedList<HRegion>(hrs.getOnlineRegions());
+    LOG.info("#regions = " + regions.size());
+    Iterator<HRegion> it = regions.iterator();
+    while (it.hasNext()) {
+      HRegion region = it.next();
+      HRegionInfo hri = region.getRegionInfo();
+      if (hri.isMetaRegion() || hri.isRootRegion()) {
+        it.remove();
+      }
+    }
+    makeHLog(hrs.getLog(), regions, "table",
+        NUM_LOG_LINES, 100);
+
+    slm.splitLogDistributed(logDir);
+
+    int count = 0;
+    for (HRegion rgn : regions) {
+
+      Path tdir = HTableDescriptor.getTableDir(rootdir, table);
+      Path editsdir =
+        HLog.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
+        rgn.getRegionInfo().getEncodedName()));
+      LOG.debug("checking edits dir " + editsdir);
+      FileStatus[] files = fs.listStatus(editsdir);
+      assertEquals(1, files.length);
+      int c = countHLog(files[0].getPath(), fs, conf);
+      count += c;
+      LOG.info(c + " edits in " + files[0].getPath());
+    }
+    assertEquals(NUM_LOG_LINES, count);
+  }
+
+  @Test
+  public void testWorkerAbort() throws Exception {
+    LOG.info("testWorkerAbort");
+    startCluster(1);
+    final int NUM_LOG_LINES = 10000;
+    final SplitLogManager slm = master.getSplitLogManager();
+    FileSystem fs = master.getFileSystem();
+
+    final List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    HRegionServer hrs = rsts.get(0).getRegionServer();
+    Path rootdir = FSUtils.getRootDir(conf);
+    final Path logDir = new Path(rootdir,
+        HLog.getHLogDirectoryName(hrs.getServerInfo().getServerName()));
+
+    installTable("table", "family", 40);
+    byte[] table = Bytes.toBytes("table");
+    makeHLog(hrs.getLog(), hrs.getOnlineRegions(), "table",
+        NUM_LOG_LINES, 100);
+
+    new Thread() {
+      public void run() {
+        waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+        for (RegionServerThread rst : rsts) {
+          rst.getRegionServer().abort("testing");
+        }
+      }
+    }.start();
+    // slm.splitLogDistributed(logDir);
+    FileStatus[] logfiles = fs.listStatus(logDir);
+    TaskBatch batch = new TaskBatch();
+    slm.installTask(logfiles[0].getPath().toString(), batch);
+    //waitForCounter but for one of the 2 counters
+    long curt = System.currentTimeMillis();
+    long endt = curt + 30000;
+    while (curt < endt) {
+      if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
+          tot_wkr_final_transistion_failed.get()) == 0) {
+        Thread.yield();
+        curt = System.currentTimeMillis();
+      } else {
+        assertEquals(1, (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
+            tot_wkr_final_transistion_failed.get()));
+        return;
+      }
+    }
+    assertEquals(1, batch.done);
+    // fail("region server completed the split before aborting");
+    return;
+  }
+
+  HTable installTable(String tname, String fname, int nrs ) throws Exception {
+    // Create a table with regions
+    byte [] table = Bytes.toBytes(tname);
+    byte [] family = Bytes.toBytes(fname);
+    LOG.info("Creating table with " + nrs + " regions");
+    HTable ht = TEST_UTIL.createTable(table, new byte[][]{family},
+        3, Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), nrs);
+    NavigableSet<String> regions = getAllOnlineRegions(cluster);
+    assertEquals(nrs + 2, regions.size());
+    return ht;
+  }
+
+  void populateDataInTable(int nrows, String fname) throws Exception {
+    byte [] family = Bytes.toBytes(fname);
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    assertEquals(NUM_RS, rsts.size());
+
+    for (RegionServerThread rst : rsts) {
+      HRegionServer hrs = rst.getRegionServer();
+      Collection<HRegion> regions = hrs.getOnlineRegions();
+      for (HRegion r : regions) {
+        HRegionInfo hri = r.getRegionInfo();
+        if (hri.isMetaRegion() || hri.isRootRegion()) {
+          continue;
+        }
+        LOG.debug("adding data to rs = " + rst.getName() +
+            " region = "+ r.getRegionNameAsString());
+        HRegion region = hrs.getOnlineRegion(r.getRegionName());
+        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family);
+      }
+    }
+  }
+
+  public void makeHLog(HLog log,
+      Collection<HRegion> rgns, String tname,
+      int num_edits, int edit_size) throws IOException {
+
+    List<HRegion> regions = new ArrayList<HRegion>(rgns);
+    byte[] table = Bytes.toBytes(tname);
+    byte[] value = new byte[edit_size];
+    for (int i = 0; i < edit_size; i++) {
+      value[i] = (byte)('a' + (i % 26));
+    }
+    int n = regions.size();
+    int[] counts = new int[n];
+    int j = 0;
+    for (int i = 0; i < num_edits; i += 1) {
+      WALEdit e = new WALEdit();
+      byte [] row = Bytes.toBytes("r" + Integer.toString(i));
+      byte [] family = Bytes.toBytes("f");
+      byte [] qualifier = Bytes.toBytes("c" + Integer.toString(i));
+      e.add(new KeyValue(row, family, qualifier,
+          System.currentTimeMillis(), value));
+      // LOG.info("Region " + i + ": " + e);
+      j++;
+      log.append(regions.get(j % n).getRegionInfo(), table, e, System.currentTimeMillis());
+      counts[j % n] += 1;
+      // if ((i % 8096) == 0) {
+        // log.sync();
+      //  }
+    }
+    log.sync();
+    log.close();
+    for (int i = 0; i < n; i++) {
+      LOG.info("region " + regions.get(i).getRegionNameAsString() +
+          " has " + counts[i] + " edits");
+    }
+    return;
+  }
+
+  private int countHLog(Path log, FileSystem fs, Configuration conf)
+  throws IOException {
+    int count = 0;
+    HLog.Reader in = HLog.getReader(fs, log, conf);
+    while (in.next() != null) {
+      count++;
+    }
+    return count;
+  }
+
+  private void putData(HRegion region, byte[] startRow, int numRows, byte [] qf,
+      byte [] ...families)
+  throws IOException {
+    for(int i = 0; i < numRows; i++) {
+      Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
+      for(byte [] family : families) {
+        put.add(family, qf, null);
+      }
+      region.put(put);
+    }
+  }
+
+  private NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster) {
+    NavigableSet<String> online = new TreeSet<String>();
+    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
+      for (HRegion region : rst.getRegionServer().getOnlineRegions()) {
+        online.add(region.getRegionNameAsString());
+      }
+    }
+    return online;
+  }
+
+  private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+      long timems) {
+    long curt = System.currentTimeMillis();
+    long endt = curt + timems;
+    while (curt < endt) {
+      if (ctr.get() == oldval) {
+        Thread.yield();
+        curt = System.currentTimeMillis();
+      } else {
+        assertEquals(newval, ctr.get());
+        return;
+      }
+    }
+    assertTrue(false);
+  }
+}