You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/25 00:30:32 UTC

svn commit: r1188419 [1/3] - in /hbase/branches/0.89-fb: ./ src/main/java/org/apache/hadoop/hbase/mapreduce/ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/regions...

Author: nspiegelberg
Date: Mon Oct 24 22:30:31 2011
New Revision: 1188419

URL: http://svn.apache.org/viewvc?rev=1188419&view=rev
Log:
distributed log splitting port to hbase-trunk

**** 89 MASTER ONLY ****

Reviewers: kannan, jgray, liyintang
Reviewed By: kannan
CC: hbase-eng@lists, pkhemani, kannan

Differential Revision: 339729
Task ID: 694483

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/OrphanHLogAfterSplitException.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
Modified:
    hbase/branches/0.89-fb/pom.xml
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogSplitter.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java

Modified: hbase/branches/0.89-fb/pom.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/pom.xml?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/pom.xml (original)
+++ hbase/branches/0.89-fb/pom.xml Mon Oct 24 22:30:31 2011
@@ -226,7 +226,7 @@
         </plugin>
         <plugin>
           <artifactId>maven-surefire-plugin</artifactId>
-          <version>2.9</version>
+          <version>2.8</version>
           <configuration>
             <forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
             <argLine>-Xmx512m</argLine>
@@ -925,7 +925,7 @@
       -->
       <plugin>
         <artifactId>maven-surefire-report-plugin</artifactId>
-        <version>2.9</version>
+        <version>2.8</version>
       </plugin>
       <!--
       <plugin>

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogSplitter.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogSplitter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogSplitter.java Mon Oct 24 22:30:31 2011
@@ -49,6 +49,7 @@ import org.mortbay.log.Log;
  * to the mappers, there are no reducers. Each mapper processes the
  * directory it receives as input.
  */
+@Deprecated
 public class HLogSplitter {
   final static String NAME = "splitlogs";
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Mon Oct 24 22:30:31 2011
@@ -19,21 +19,18 @@
  */
 package org.apache.hadoop.hbase.master;
 
-import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
-import java.io.FileReader;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 import java.lang.reflect.Constructor;
 import java.net.UnknownHostException;
-import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -97,7 +94,9 @@ import org.apache.hadoop.hbase.monitorin
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.InfoServer;
 import org.apache.hadoop.hbase.util.Pair;
@@ -152,6 +151,8 @@ public class HMaster extends Thread impl
   private final MasterMetrics metrics;
 
   final Lock splitLogLock = new ReentrantLock();
+  final boolean distributedLogSplitting;
+  SplitLogManager splitLogManager;
 
   // Our zk client.
   private ZooKeeperWrapper zooKeeperWrapper;
@@ -180,7 +181,8 @@ public class HMaster extends Thread impl
   // True if this is the master that started the cluster.
   boolean isClusterStartup;
 
-  private long masterStartupTime = 0;
+  private long masterStartupTime = Long.MAX_VALUE;
+  private AtomicBoolean isSplitLogAfterStartupDone = new AtomicBoolean(false);
   private MapWritable preferredRegionToRegionServerMapping = null;
   private long applyPreferredAssignmentPeriod = 0l;
   private long holdRegionForBestLocalityPeriod = 0l;
@@ -236,6 +238,10 @@ public class HMaster extends Thread impl
     this.fs = FileSystem.get(this.conf);
     checkRootDir(this.rootdir, this.conf, this.fs);
 
+    this.distributedLogSplitting = conf.getBoolean(
+        "hbase.master.distributed.log.splitting", false);
+    this.splitLogManager = null;
+
     // Make sure the region servers can archive their old logs
     this.oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
     if(!this.fs.exists(this.oldLogDir)) {
@@ -564,6 +570,7 @@ public class HMaster extends Thread impl
       initPreferredAssignment();
       startupStatus.setStatus("Initializing master service threads");
       startServiceThreads();
+      masterStartupTime = System.currentTimeMillis();
       startupStatus.markComplete("Initialization successful");
     } catch (IOException e) {
       LOG.fatal("Unhandled exception. Master quits.", e);
@@ -656,8 +663,6 @@ public class HMaster extends Thread impl
       }
 
     }
-    // get the start time stamp after scanning the dfs
-    masterStartupTime = System.currentTimeMillis();
   }
 
   public static MapWritable getRegionLocalityFromSnapshot(Configuration conf) {
@@ -767,7 +772,6 @@ public class HMaster extends Thread impl
     // Check if this is a fresh start of the cluster
     if (addresses.isEmpty()) {
       LOG.debug("Master fresh start, proceeding with normal startup");
-      splitLogAfterStartup();
       return;
     }
     // Failover case.
@@ -811,7 +815,6 @@ public class HMaster extends Thread impl
     }
     LOG.info("Inspection found " + assignedRegions.size() + " regions, " +
       (isRootRegionAssigned ? "with -ROOT-" : "but -ROOT- was MIA"));
-    splitLogAfterStartup();
   }
 
   /*
@@ -819,60 +822,125 @@ public class HMaster extends Thread impl
    * ad active region server.
    */
   private void splitLogAfterStartup() {
-    Path logsDirPath =
-      new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
-    try {
-      if (!this.fs.exists(logsDirPath)) return;
-    } catch (IOException e) {
-      throw new RuntimeException("Could exists for " + logsDirPath, e);
-    }
-    FileStatus[] logFolders;
     try {
-      logFolders = this.fs.listStatus(logsDirPath);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed listing " + logsDirPath.toString(), e);
-    }
-    if (logFolders == null || logFolders.length == 0) {
-      LOG.debug("No log files to split, proceeding...");
-      return;
+      Path logsDirPath =
+          new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
+      try {
+        if (!this.fs.exists(logsDirPath)) return;
+      } catch (IOException e) {
+        throw new RuntimeException("Could exists for " + logsDirPath, e);
+      }
+      FileStatus[] logFolders;
+      try {
+        logFolders = this.fs.listStatus(logsDirPath);
+      } catch (IOException e) {
+        throw new RuntimeException("Failed listing " + logsDirPath.toString(), e);
+      }
+      if (logFolders == null || logFolders.length == 0) {
+        LOG.debug("No log files to split, proceeding...");
+        return;
+      }
+      List<String> serverNames = new ArrayList<String>();
+      for (FileStatus status : logFolders) {
+        Path logDir = status.getPath();
+        String serverName = logDir.getName();
+        LOG.info("Found log folder : " + serverName);
+        if (this.serverManager.getServerInfo(serverName) == null) {
+          LOG.info("Log folder " + status.getPath() + " doesn't belong " +
+              "to a known region server, splitting");
+          serverNames.add(serverName);
+        } else {
+          LOG.info("Log folder " + status.getPath() +
+              " belongs to an existing region server");
+        }
+      }
+      splitLog(serverNames);
+    } finally {
+      isSplitLogAfterStartupDone.set(true);
     }
-    for (FileStatus status : logFolders) {
-      Path logDir = status.getPath();
-      String serverName = logDir.getName();
-      LOG.info("Found log folder : " + serverName);
-      if(this.serverManager.getServerInfo(serverName) == null) {
-        LOG.info("Log folder doesn't belong " +
-          "to a known region server, splitting");
-        long splitTime = 0, splitSize = 0, splitCount = 0;
+  }
 
-        this.splitLogLock.lock();
+  public boolean getIsSplitLogAfterStartupDone() {
+    return (isSplitLogAfterStartupDone.get());
+  }
+
+  public void splitLog(final String serverName) {
+    List<String> serverNames = new ArrayList<String>();
+    serverNames.add(serverName);
+    splitLog(serverNames);
+  }
+
+  public void splitLog(final List<String> serverNames) {
+    long splitTime = 0, splitLogSize = 0, splitCount = 0;
+    List<String> realServerNames = new ArrayList<String>();
+    List<Path> logDirs = new ArrayList<Path>();
+    for (String serverName : serverNames) {
+      Path logDir = new Path(this.rootdir,
+          HLog.getHLogDirectoryName(serverName));
+      // rename the directory so a rogue RS doesn't create more HLogs
+      if (!serverName.endsWith(HConstants.HLOG_SPLITTING_EXT)) {
+        realServerNames.add(serverName);
+        Path splitDir = new Path(logDir.getParent(), logDir.getName()
+            + HConstants.HLOG_SPLITTING_EXT);
         try {
-          // rename the directory so a rogue RS doesn't create more HLogs
-          if (!serverName.endsWith(HConstants.HLOG_SPLITTING_EXT)) {
-            Path splitDir = new Path(logDir.getParent(),
-                                     logDir.getName()
-                                     + HConstants.HLOG_SPLITTING_EXT);
-            if (!this.fs.rename(logDir, splitDir)) {
-              throw new IOException("Failed fs.rename of " + logDir);
-            }
-            logDir = splitDir;
-            LOG.debug("Renamed region directory: " + splitDir);
+          if (!this.fs.rename(logDir, splitDir)) {
+            LOG.error("Failed fs.rename of " + logDir);
           }
-          ContentSummary contentSummary = fs.getContentSummary(logDir);
-          splitCount = contentSummary.getFileCount();
-          splitSize = contentSummary.getSpaceConsumed();
-          HLog.splitLog(this.rootdir, logDir, oldLogDir, this.fs, getConfiguration());
-          splitTime = HLog.lastSplitTime;
-          this.metrics.addSplit(splitTime, splitCount, splitSize );
+        } catch (IOException ioe) {
+          LOG.error("Failed fs.rename of " + logDir, ioe);
+        }
+        logDir = splitDir;
+        LOG.debug("Renamed region directory: " + splitDir);
+      } else {
+        realServerNames.add(serverName.substring(0, serverName.length()
+            - HConstants.HLOG_SPLITTING_EXT.length()));
+      }
+      logDirs.add(logDir);
+      ContentSummary contentSummary;
+      try {
+        contentSummary = fs.getContentSummary(logDir);
+        splitCount += contentSummary.getFileCount();
+        splitLogSize += contentSummary.getSpaceConsumed();
+      } catch (IOException e) {
+        LOG.error("Failed to get file system content summary", e);
+      }
+    }
+    splitTime = EnvironmentEdgeManager.currentTimeMillis();
+    if (distributedLogSplitting) {
+      // handleDeadWorker() is also called in serverManager
+      for (String realServerName : realServerNames) {
+        splitLogManager.handleDeadWorker(realServerName);
+      }
+      try {
+        try {
+          splitLogManager.splitLogDistributed(logDirs);
+        } catch (OrphanHLogAfterSplitException e) {
+          LOG.warn("Retrying distributed splitting for " + serverNames
+              + "because of:", e);
+          splitLogManager.splitLogDistributed(logDirs);
+        }
+      } catch (IOException e) {
+        LOG.error("Failed distributed splitting " + serverNames, e);
+      }
+    } else {
+      // splitLogLock ensures that dead region servers' logs are processed
+      // one at a time
+      for (Path logDir : logDirs) {
+        this.splitLogLock.lock();
+        try {
+          HLog.splitLog(this.rootdir, logDir, oldLogDir, this.fs,
+              getConfiguration());
         } catch (IOException e) {
           LOG.error("Failed splitting " + logDir.toString(), e);
         } finally {
           this.splitLogLock.unlock();
         }
-      } else {
-        LOG.info("Log folder belongs to an existing region server");
       }
     }
+    splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
+    if (this.metrics != null) {
+      this.metrics.addSplit(splitTime, splitCount, splitLogSize);
+    }
   }
 
   /*
@@ -893,12 +961,21 @@ public class HMaster extends Thread impl
         this.infoServer.setAttribute(MASTER, this);
         this.infoServer.start();
       }
-      // Start the server so everything else is running before we start
-      // receiving requests.
+      if (this.distributedLogSplitting) {
+        // splitLogManager must be started before starting rpcServer because
+        // region-servers dying will trigger log splitting
+        this.splitLogManager = new SplitLogManager(zooKeeperWrapper, conf,
+            this.shutdownRequested, address.toString());
+        this.splitLogManager.finishInitialization();
+      }
+      // Start the server so that region servers are running before we start
+      // splitting logs and before we start assigning regions. XXX What will happen
+      // if master starts receiving requests before regions are assigned?
       this.rpcServer.start();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Started service threads");
       }
+      splitLogAfterStartup();
     } catch (IOException e) {
       if (e instanceof RemoteException) {
         try {
@@ -984,6 +1061,9 @@ public class HMaster extends Thread impl
     LOG.info("Cluster shutdown requested. Starting to quiesce servers");
     this.shutdownRequested.set(true);
     this.zooKeeperWrapper.setClusterState(false);
+    if (splitLogManager != null) {
+      this.splitLogManager.stop();
+    }
   }
 
   @Override
@@ -1742,4 +1822,8 @@ public class HMaster extends Thread impl
     closed.set(true);
   }
 
+  public SplitLogManager getSplitLogManager() {
+    return this.splitLogManager;
+  }
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java Mon Oct 24 22:30:31 2011
@@ -19,9 +19,13 @@
  */
 package org.apache.hadoop.hbase.master;
 
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -31,18 +35,10 @@ import org.apache.hadoop.hbase.RemoteExc
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.master.RegionManager.RegionState;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.master.RegionManager.RegionState;
-import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 /**
  * Instantiated when a server's lease has expired, meaning it has crashed.
@@ -294,43 +290,7 @@ class ProcessServerShutdown extends Regi
       ", onlineMetaRegions.size(): " +
       master.getRegionManager().numOnlineMetaRegions());
     if (!isSplitFinished) {
-      long splitTime = 0, splitSize = 0, splitCount = 0;
-      FileSystem fs = this.master.getFileSystem();
-      // we rename during split, so check both names
-      Path rsSplitDir = new Path(rsLogDir.getParent(),
-                                 rsLogDir.getName()
-                                 + HConstants.HLOG_SPLITTING_EXT);
-      boolean logDirExists = fs.exists(rsLogDir);
-      boolean splitDirExists = fs.exists(rsSplitDir);
-      assert !(logDirExists && splitDirExists)
-        : "Both files shouldn't exist: " + rsLogDir + " and " + rsSplitDir;
-
-      if (logDirExists || splitDirExists) {
-        if (!master.splitLogLock.tryLock()) {
-          return false;
-        }
-        try {
-          // rename the directory so a rogue RS doesn't create more HLogs
-          if (logDirExists) {
-            if (!fs.rename(rsLogDir, rsSplitDir)) {
-              throw new IOException("Failed fs.rename of " + rsLogDir);
-            }
-            LOG.debug("Renamed region directory: " + rsSplitDir);
-          }
-          ContentSummary contentSum = fs.getContentSummary(rsSplitDir);
-          splitCount = contentSum.getFileCount();
-          splitSize = contentSum.getSpaceConsumed();
-          // Process the old log files
-          HLog.splitLog(master.getRootDir(), rsSplitDir,
-            this.master.getOldLogDir(), this.master.getFileSystem(),
-            this.master.getConfiguration());
-          splitTime = HLog.lastSplitTime;
-          this.master.getMetrics().addSplit(splitTime, splitCount, splitSize);
-        } finally {
-          master.splitLogLock.unlock();
-        }
-
-      }
+      this.master.splitLog(deadServer);
       isSplitFinished = true;
     }
     LOG.info("Log split complete, meta reassignment and scanning:");

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Mon Oct 24 22:30:31 2011
@@ -272,6 +272,11 @@ public class RegionManager {
    */
   void assignRegions(HServerInfo info, HRegionInfo[] mostLoadedRegions,
       ArrayList<HMsg> returnMsgs) {
+    if (this.master.getIsSplitLogAfterStartupDone() == false) {
+      // wait for log splitting at startup to complete. The regions will
+      // be assigned when the region server reports next
+      return;
+    }
     // the region may assigned to this region server
     Set<RegionState> regionsToAssign = null;
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Mon Oct 24 22:30:31 2011
@@ -935,6 +935,9 @@ public class ServerManager {
     LOG.debug("Added=" + serverName +
       " to dead servers, added shutdown processing operation");
     this.deadServers.add(serverName);
+    if (this.master.getSplitLogManager() != null) {
+      this.master.getSplitLogManager().handleDeadWorker(serverName);
+    }
     this.master.getRegionServerOperationQueue().
       put(new ProcessServerShutdown(master, info));
     this.master.getMetrics().incRegionServerExpired();

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,1077 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.*;
+import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.*;
+import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Distributes the task of log splitting to the available region servers.
+ * Coordination happens via zookeeper. For every log file that has to be split a
+ * znode is created under /hbase/splitlog. SplitLogWorkers race to grab a task.
+ *
+ * SplitLogManager monitors the task znodes that it creates using the
+ * {@link #timeoutMonitor} thread. If a task's progress is slow then
+ * {@link #resubmit(String, boolean)} will take away the task from the owner
+ * {@link SplitLogWorker} and the task will be
+ * upforgrabs again. When the task is done then the task's znode is deleted by
+ * SplitLogManager.
+ *
+ * Clients call {@link #splitLogDistributed(Path)} to split a region server's
+ * log files. The caller thread waits in this method until all the log files
+ * have been split.
+ *
+ * All the zookeeper calls made by this class are asynchronous. This is mainly
+ * to help reduce response time seen by the callers.
+ *
+ * There is race in this design between the SplitLogManager and the
+ * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
+ * already been completed by a SplitLogWorker. We rely on the idempotency of
+ * the log splitting task for correctness.
+ *
+ * It is also assumed that every log splitting task is unique and once
+ * completed (either with success or with error) it will be not be submitted
+ * again. If a task is resubmitted then there is a risk that old "delete task"
+ * can delete the re-submission.
+ */
+public class SplitLogManager implements Watcher {
+  private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
+
+  private final AtomicBoolean stopper;
+  private final String serverName;
+  private final TaskFinisher taskFinisher;
+  private FileSystem fs;
+  private Configuration conf;
+  protected ZooKeeperWrapper watcher;
+
+  private long zkretries;
+  private long resubmit_threshold;
+  private long timeout;
+  private long unassignedTimeout;
+  private long lastNodeCreateTime = Long.MAX_VALUE;
+
+  private ConcurrentMap<String, Task> tasks =
+    new ConcurrentHashMap<String, Task>();
+  private TimeoutMonitor timeoutMonitor;
+
+  private Set<String> deadWorkers = null;
+  private Object deadWorkersLock = new Object();
+
+  /**
+   * Its OK to construct this object even when region-servers are not online. It
+   * does lookup the orphan tasks in zk but it doesn't block for them to be
+   * done.
+   *
+   * @param zkw
+   * @param conf
+   * @param stopper
+   * @param serverName
+   * @param services
+   * @param service
+   */
+  public SplitLogManager(ZooKeeperWrapper zkw, final Configuration conf,
+      AtomicBoolean stopper, String serverName) {
+    this(zkw, conf, stopper, serverName, new TaskFinisher() {
+      @Override
+      public Status finish(String workerName, String logfile) {
+        String tmpname =
+          ZKSplitLog.getSplitLogDirTmpComponent(workerName, logfile);
+        try {
+          HLogSplitter.moveRecoveredEditsFromTemp(tmpname, logfile, conf);
+        } catch (IOException e) {
+          LOG.warn("Could not finish splitting of log file " + logfile);
+          return Status.ERR;
+        }
+        return Status.DONE;
+      }
+    });
+  }
+
+  public SplitLogManager(ZooKeeperWrapper zkw, Configuration conf,
+      AtomicBoolean stopper, String serverName, TaskFinisher tf) {
+    this.watcher = zkw;
+    this.watcher.createZNodeIfNotExists(this.watcher.splitLogZNode, new byte[0],
+        CreateMode.PERSISTENT, false /* set watch? */);
+    this.taskFinisher = tf;
+    this.conf = conf;
+    this.stopper = stopper;
+    this.zkretries = conf.getLong("hbase.splitlog.zk.retries",
+        ZKSplitLog.DEFAULT_ZK_RETRIES);
+    this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit",
+        ZKSplitLog.DEFAULT_MAX_RESUBMIT);
+    this.timeout = conf.getInt("hbase.splitlog.manager.timeout",
+        ZKSplitLog.DEFAULT_TIMEOUT);
+    this.unassignedTimeout =
+      conf.getInt("hbase.splitlog.manager.unassigned.timeout",
+        ZKSplitLog.DEFAULT_UNASSIGNED_TIMEOUT);
+    LOG.debug("timeout = " + timeout);
+    LOG.debug("unassigned timeout = " + unassignedTimeout);
+
+    this.serverName = serverName;
+    this.timeoutMonitor = new TimeoutMonitor(
+        conf.getInt("hbase.splitlog.manager.timeoutmonitor.period",
+            1000),
+        stopper);
+  }
+
+  public void finishInitialization() {
+    Threads.setDaemonThreadRunning(timeoutMonitor, serverName
+        + ".splitLogManagerTimeoutMonitor");
+    this.watcher.registerListener(this);
+    lookForOrphans();
+  }
+
+  private FileStatus[] getFileList(List<Path> logDirs) throws IOException {
+    List<FileStatus> fileStatus = new ArrayList<FileStatus>();
+    for (Path hLogDir : logDirs) {
+      this.fs = hLogDir.getFileSystem(conf);
+      if (!fs.exists(hLogDir)) {
+        LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
+        continue;
+      }
+      FileStatus[] logfiles = fs.listStatus(hLogDir); // TODO filter filenames?
+      if (logfiles == null || logfiles.length == 0) {
+        LOG.info(hLogDir + " is empty dir, no logs to split");
+      } else {
+        for (FileStatus status : logfiles) {
+          fileStatus.add(status);
+        }
+      }
+    }
+    FileStatus[] a = new FileStatus[fileStatus.size()];
+    return fileStatus.toArray(a);
+  }
+
+  /**
+   * @param logDir
+   *            one region sever hlog dir path in .logs
+   * @throws IOException
+   *             if there was an error while splitting any log file
+   * @return cumulative size of the logfiles split
+   * @throws KeeperException
+   */
+  public long splitLogDistributed(final Path logDir) throws IOException {
+    List<Path> logDirs = new ArrayList<Path>();
+    logDirs.add(logDir);
+    return splitLogDistributed(logDirs);
+  }
+
+  /**
+   * The caller will block until all the log files of the given region server
+   * have been processed - successfully split or an error is encountered - by an
+   * available worker region server. This method must only be called after the
+   * region servers have been brought online.
+   *
+   * @param serverName
+   *          region server name
+   * @throws IOException
+   *          if there was an error while splitting any log file
+   * @return cumulative size of the logfiles split
+   */
+  public long splitLogDistributed(final List<Path> logDirs) throws IOException {
+    MonitoredTask status = TaskMonitor.get().createStatus(
+          "Doing distributed log split in " + logDirs);
+    FileStatus[] logfiles = getFileList(logDirs);
+    status.setStatus("Checking directory contents...");
+    LOG.debug("Scheduling batch of logs to split");
+    tot_mgr_log_split_batch_start.incrementAndGet();
+    LOG.info("started splitting logs in " + logDirs);
+    try {
+      long t = EnvironmentEdgeManager.currentTimeMillis();
+      long totalSize = 0;
+      TaskBatch batch = new TaskBatch();
+      for (FileStatus lf : logfiles) {
+        // TODO If the log file is still being written to - which is most likely
+        // the case for the last log file - then its length will show up here
+        // as zero. The size of such a file can only be retrieved after after
+        // recover-lease is done. totalSize will be under in most cases and the
+        // metrics that it drives will also be under-reported.
+        totalSize += lf.getLen();
+        if (installTask(lf.getPath().toString(), batch) == false) {
+          throw new IOException("duplicate log split scheduled for "
+              + lf.getPath());
+        }
+      }
+      waitTasks(batch, status);
+      if (batch.done != batch.installed) {
+        stopTrackingTasks(batch);
+        tot_mgr_log_split_batch_err.incrementAndGet();
+        LOG.warn("error while splitting logs in " + logDirs + " installed = " +
+            batch.installed + " but only " + batch.done + " done");
+        throw new IOException("error or interrupt while splitting logs in "
+            + logDirs + " Task = " + batch);
+      }
+      for (Path logDir : logDirs) {
+        if (!fs.exists(logDir)) {
+          continue;
+        }
+        if (anyNewLogFiles(logDir, logfiles)) {
+          tot_mgr_new_unexpected_hlogs.incrementAndGet();
+          LOG.warn("new hlogs were produced while logs in " + logDir +
+              " were being split");
+          throw new OrphanHLogAfterSplitException();
+        }
+        status.setStatus("Cleaning up log directory...");
+        if (!fs.delete(logDir, true)) {
+          throw new IOException("Unable to delete src dir: " + logDir);
+        }
+      }
+      tot_mgr_log_split_batch_success.incrementAndGet();
+      String msg = "finished splitting (more than or equal to) " + totalSize
+          + " bytes in " + batch.installed + " log files in " + logDirs
+          + " in " + (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms";
+      status.markComplete(msg);
+      LOG.info(msg);
+      return totalSize;
+    } finally {
+      status.cleanup();
+    }
+  }
+
+  boolean installTask(String taskname, TaskBatch batch) {
+    tot_mgr_log_split_start.incrementAndGet();
+    String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
+    Task oldtask = createTaskIfAbsent(path, batch);
+    if (oldtask == null) {
+      // publish the task in zk
+      createNode(path, zkretries);
+      return true;
+    }
+    LOG.warn(path + "is already being split. " +
+        "Two threads cannot wait for the same task");
+    return false;
+  }
+
+  private void waitTasks(TaskBatch batch, MonitoredTask status) {
+    synchronized (batch) {
+      while ((batch.done + batch.error) != batch.installed) {
+        try {
+          status.setStatus("Waiting for distributed tasks to finish. "
+              + " scheduled=" + batch.installed
+              + " done=" + batch.done
+              + " error=" + batch.error);
+          batch.wait(100);
+          if (stopper.get()) {
+            LOG.warn("Stopped while waiting for log splits to be completed");
+            return;
+          }
+        } catch (InterruptedException e) {
+          LOG.warn("Interrupted while waiting for log splits to be completed");
+          Thread.currentThread().interrupt();
+          return;
+        }
+      }
+    }
+  }
+
+  private void setDone(String path, TerminationStatus status) {
+    if (!ZKSplitLog.isRescanNode(watcher, path)) {
+      if (status == SUCCESS) {
+        tot_mgr_log_split_success.incrementAndGet();
+        LOG.info("Done splitting " + path);
+      } else {
+        tot_mgr_log_split_err.incrementAndGet();
+        LOG.warn("Error splitting " + path);
+      }
+    }
+    Task task = tasks.get(path);
+    if (task == null) {
+      if (!ZKSplitLog.isRescanNode(watcher, path)) {
+        tot_mgr_unacquired_orphan_done.incrementAndGet();
+        LOG.debug("unacquired orphan task is done " + path);
+      }
+    } else {
+      // if in stopTrackingTasks() we were to make tasks orphan instead of
+      // forgetting about them then we will have to handle the race when
+      // accessing task.batch here.
+      if (!task.isOrphan()) {
+        synchronized (task.batch) {
+          if (status == SUCCESS) {
+            task.batch.done++;
+          } else {
+            task.batch.error++;
+          }
+          if ((task.batch.done + task.batch.error) == task.batch.installed) {
+            task.batch.notify();
+          }
+        }
+      }
+      task.deleted = true;
+    }
+    // delete the task node in zk. Keep trying indefinitely - its an async
+    // call and no one is blocked waiting for this node to be deleted. All
+    // task names are unique (log.<timestamp>) there is no risk of deleting
+    // a future task.
+    deleteNode(path, Long.MAX_VALUE);
+    return;
+  }
+
+  private void createNode(String path, Long retry_count) {
+    this.watcher.asyncCreate(path, TaskState.TASK_UNASSIGNED.get(serverName),
+        CreateMode.PERSISTENT, new CreateAsyncCallback(), retry_count);
+    tot_mgr_node_create_queued.incrementAndGet();
+    return;
+  }
+
+  private void createNodeSuccess(String path) {
+    lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
+    LOG.debug("put up splitlog task at znode " + path);
+    getDataSetWatch(path, zkretries);
+  }
+
+  private void createNodeFailure(String path) {
+    // TODO the Manger should split the log locally instead of giving up
+    LOG.warn("failed to create task node" + path);
+    setDone(path, FAILURE);
+  }
+
+  private void getDataSetWatch(String path, Long retry_count) {
+    this.watcher.getZooKeeper().getData(path, this.watcher,
+        new GetDataAsyncCallback(), retry_count);
+    tot_mgr_get_data_queued.incrementAndGet();
+  }
+
+  private void getDataSetWatchSuccess(String path, byte[] data, int version) {
+    if (data == null) {
+      tot_mgr_null_data.incrementAndGet();
+      LOG.fatal("logic error - got null data " + path);
+      setDone(path, FAILURE);
+      return;
+    }
+    // LOG.debug("set watch on " + path + " got data " + new String(data));
+    if (TaskState.TASK_UNASSIGNED.equals(data)) {
+      LOG.debug("task not yet acquired " + path + " ver = " + version);
+      handleUnassignedTask(path);
+    } else if (TaskState.TASK_OWNED.equals(data)) {
+      heartbeat(path, version,
+          TaskState.TASK_OWNED.getWriterName(data));
+    } else if (TaskState.TASK_RESIGNED.equals(data)) {
+      LOG.info("task " + path + " entered state " + new String(data));
+      resubmitOrFail(path, FORCE);
+    } else if (TaskState.TASK_DONE.equals(data)) {
+      boolean isRescan = ZKSplitLog.isRescanNode(watcher, path);
+      if (!isRescan) {
+        LOG.info("task " + path + " entered state " + new String(data));
+      }
+      if (taskFinisher != null && !isRescan) {
+        if (taskFinisher.finish(TaskState.TASK_DONE.getWriterName(data),
+            ZKSplitLog.getFileName(path)) == Status.DONE) {
+          setDone(path, SUCCESS);
+        } else {
+          resubmitOrFail(path, CHECK);
+        }
+      } else {
+        setDone(path, SUCCESS);
+      }
+    } else if (TaskState.TASK_ERR.equals(data)) {
+      LOG.info("task " + path + " entered state " + new String(data));
+      resubmitOrFail(path, CHECK);
+    } else {
+      LOG.fatal("logic error - unexpected zk state for path = " + path
+          + " data = " + new String(data));
+      setDone(path, FAILURE);
+    }
+  }
+
+  private void getDataSetWatchFailure(String path) {
+    LOG.warn("failed to set data watch " + path);
+    setDone(path, FAILURE);
+  }
+
+  /**
+   * It is possible for a task to stay in UNASSIGNED state indefinitely - say
+   * SplitLogManager wants to resubmit a task. It forces the task to UNASSIGNED
+   * state but it dies before it could create the RESCAN task node to signal
+   * the SplitLogWorkers to pick up the task. To prevent this scenario the
+   * SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup.
+   *
+   * @param path
+   */
+  private void handleUnassignedTask(String path) {
+    if (ZKSplitLog.isRescanNode(watcher, path)) {
+      return;
+    }
+    Task task = findOrCreateOrphanTask(path);
+    if (task.isOrphan() && (task.incarnation == 0)) {
+      LOG.info("resubmitting unassigned orphan task " + path);
+      // ignore failure to resubmit. The timeout-monitor will handle it later
+      // albeit in a more crude fashion
+      resubmit(path, task, FORCE);
+    }
+  }
+
+  private void heartbeat(String path, int new_version,
+      String workerName) {
+    Task task = findOrCreateOrphanTask(path);
+    if (new_version != task.last_version) {
+      if (task.isUnassigned()) {
+        LOG.info("task " + path + " acquired by " + workerName);
+      }
+      task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(),
+          new_version, workerName);
+      tot_mgr_heartbeat.incrementAndGet();
+    } else {
+      assert false;
+      LOG.warn("got dup heartbeat for " + path + " ver = " + new_version);
+    }
+    return;
+  }
+
+  private boolean resubmit(String path, Task task,
+      ResubmitDirective directive) {
+    // its ok if this thread misses the update to task.deleted. It will
+    // fail later
+    if (task.deleted) {
+      return false;
+    }
+    int version;
+    if (directive != FORCE) {
+      if ((EnvironmentEdgeManager.currentTimeMillis() - task.last_update) <
+          timeout) {
+        return false;
+      }
+      if (task.unforcedResubmits >= resubmit_threshold) {
+        if (task.unforcedResubmits == resubmit_threshold) {
+          tot_mgr_resubmit_threshold_reached.incrementAndGet();
+          LOG.info("Skipping resubmissions of task " + path +
+              " because threshold " + resubmit_threshold + " reached");
+        }
+        return false;
+      }
+      // race with heartbeat that might be changing last_version
+      version = task.last_version;
+    } else {
+      version = -1;
+    }
+    LOG.info("resubmitting task " + path);
+    task.incarnation++;
+    try {
+      // blocking zk call but this is done from the timeout thread
+      if (watcher.setData(path, TaskState.TASK_UNASSIGNED.get(serverName),
+          version) == false) {
+        LOG.debug("failed to resubmit task " + path +
+            " version changed");
+        return false;
+      }
+    } catch (NoNodeException e) {
+      LOG.debug("failed to resubmit " + path + " task done");
+      return false;
+    } catch (KeeperException e) {
+      tot_mgr_resubmit_failed.incrementAndGet();
+      LOG.warn("failed to resubmit " + path, e);
+      return false;
+    }
+    // don't count forced resubmits
+    if (directive != FORCE) {
+      task.unforcedResubmits++;
+    }
+    task.setUnassigned();
+    createRescanNode(Long.MAX_VALUE);
+    tot_mgr_resubmit.incrementAndGet();
+    return true;
+  }
+
+  private void resubmitOrFail(String path,
+      ResubmitDirective directive) {
+    if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) {
+      setDone(path, FAILURE);
+    }
+  }
+
+  private void deleteNode(String path, Long retries) {
+    tot_mgr_node_delete_queued.incrementAndGet();
+    this.watcher.getZooKeeper().delete(path, -1, new DeleteAsyncCallback(),
+        retries);
+  }
+
+  private void deleteNodeSuccess(String path) {
+    Task task;
+    task = tasks.remove(path);
+    if (task == null) {
+      if (ZKSplitLog.isRescanNode(watcher, path)) {
+        tot_mgr_rescan_deleted.incrementAndGet();
+        return;
+      }
+      tot_mgr_missing_state_in_delete.incrementAndGet();
+      LOG.debug("deleted task without in memory state " + path);
+      return;
+    }
+    tot_mgr_task_deleted.incrementAndGet();
+  }
+
+  private void deleteNodeFailure(String path) {
+    LOG.fatal("logic failure, failing to delete a node should never happen " +
+        "because delete has infinite retries");
+    return;
+  }
+
+  /**
+   * signal the workers that a task was resubmitted by creating the
+   * RESCAN node.
+   */
+  private void createRescanNode(long retries) {
+    // The RESCAN node will be deleted almost immediately by the
+    // SplitLogManager as soon as it is created because it is being
+    // created in the DONE state. This behavior prevents a buildup
+    // of RESCAN nodes. But there is also a chance that a SplitLogWorker
+    // might miss the watch-trigger that creation of RESCAN node provides.
+    // Since the TimeoutMoitor will keep resubmitting UNASSIGNED tasks
+    // therefore this behavior is safe.
+    watcher.asyncCreate(ZKSplitLog.getRescanNode(watcher),
+        TaskState.TASK_DONE.get(serverName), CreateMode.EPHEMERAL_SEQUENTIAL,
+        new CreateRescanAsyncCallback(), new Long(retries));
+  }
+
+  private void createRescanSuccess(String path) {
+    lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
+    tot_mgr_rescan.incrementAndGet();
+    getDataSetWatch(path, zkretries);
+  }
+
+  private void createRescanFailure() {
+    LOG.fatal("logic failure, rescan failure must not happen");
+  }
+
+  /**
+   * @param path
+   * @param batch
+   * @return null on success, existing task on error
+   */
+  private Task createTaskIfAbsent(String path, TaskBatch batch) {
+    Task oldtask;
+    oldtask = tasks.putIfAbsent(path, new Task(batch));
+    if (oldtask != null && oldtask.isOrphan()) {
+        LOG.info("Previously orphan task " + path +
+            " is now being waited upon");
+        oldtask.setBatch(batch);
+        return (null);
+    }
+    return oldtask;
+  }
+
+  /**
+   * This function removes any knowledge of this batch's tasks from the
+   * manager. It doesn't actually stop the active tasks. If the tasks are
+   * resubmitted then the active tasks will be reacquired and monitored by the
+   * manager. It is important to call this function when batch processing
+   * terminates prematurely, otherwise if the tasks are re-submitted
+   * then they might fail.
+   * <p>
+   * there is a slight race here. even after a task has been removed from
+   * {@link #tasks} someone who had acquired a reference to it will continue to
+   * process the task. That is OK since we don't actually change the task and
+   * the batch objects.
+   * <p>
+   * TODO Its  probably better to convert these to orphan tasks but then we
+   * have to deal with race conditions as we nullify Task's batch pointer etc.
+   * <p>
+   * @param batch
+   */
+  void stopTrackingTasks(TaskBatch batch) {
+    for (Map.Entry<String, Task> e : tasks.entrySet()) {
+      String path = e.getKey();
+      Task t = e.getValue();
+      if (t.batch == batch) { // == is correct. equals not necessary.
+        tasks.remove(path);
+      }
+    }
+  }
+
+  Task findOrCreateOrphanTask(String path) {
+    Task orphanTask = new Task(null);
+    Task task;
+    task = tasks.putIfAbsent(path, orphanTask);
+    if (task == null) {
+      LOG.info("creating orphan task " + path);
+      tot_mgr_orphan_task_acquired.incrementAndGet();
+      task = orphanTask;
+    }
+    return task;
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    switch (event.getType()) {
+    case NodeCreated:
+      // no-op
+      break;
+    case NodeDeleted:
+      // no-op
+      break;
+    case NodeDataChanged:
+      nodeDataChanged(event.getPath());
+      break;
+    case NodeChildrenChanged:
+      // no-op
+      break;
+    }
+  }
+
+  private void nodeDataChanged(String path) {
+    if (tasks.get(path) != null || ZKSplitLog.isRescanNode(watcher, path)) {
+      getDataSetWatch(path, zkretries);
+    }
+  }
+
+  public void stop() {
+    if (timeoutMonitor != null) {
+      timeoutMonitor.interrupt();
+    }
+  }
+
+  private void lookForOrphans() {
+    List<String> orphans;
+    try {
+      orphans = watcher.listChildrenNoWatch(this.watcher.splitLogZNode);
+      if (orphans == null) {
+        LOG.warn("could not get children of " + this.watcher.splitLogZNode);
+        return;
+      }
+    } catch (KeeperException e) {
+      LOG.warn("could not get children of " + this.watcher.splitLogZNode +
+          " " + StringUtils.stringifyException(e));
+      return;
+    }
+    int rescan_nodes = 0;
+    for (String path : orphans) {
+      String nodepath = watcher.getZNode(watcher.splitLogZNode, path);
+      if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
+        rescan_nodes++;
+        LOG.debug("found orphan rescan node " + path);
+      } else {
+        LOG.info("found orphan task " + path);
+      }
+      getDataSetWatch(nodepath, zkretries);
+    }
+    LOG.info("found " + (orphans.size() - rescan_nodes) + " orphan tasks and " +
+        rescan_nodes + " rescan nodes");
+  }
+
+  /**
+   * Keeps track of the batch of tasks submitted together by a caller in
+   * splitLogDistributed(). Clients threads use this object to wait for all
+   * their tasks to be done.
+   * <p>
+   * All access is synchronized.
+   */
+  static class TaskBatch {
+    int installed;
+    int done;
+    int error;
+
+    @Override
+    public String toString() {
+      return ("installed = " + installed + " done = " + done + " error = "
+          + error);
+    }
+  }
+
+  /**
+   * in memory state of an active task.
+   */
+  static class Task {
+    long last_update;
+    int last_version;
+    String cur_worker_name;
+    TaskBatch batch;
+    boolean deleted;
+    int incarnation;
+    int unforcedResubmits;
+
+    @Override
+    public String toString() {
+      return ("last_update = " + last_update +
+          " last_version = " + last_version +
+          " cur_worker_name = " + cur_worker_name +
+          " deleted = " + deleted +
+          " incarnation = " + incarnation +
+          " resubmits = " + unforcedResubmits +
+          " batch = " + batch);
+    }
+
+    Task(TaskBatch tb) {
+      incarnation = 0;
+      last_version = -1;
+      deleted = false;
+      setBatch(tb);
+      setUnassigned();
+    }
+
+    public void setBatch(TaskBatch batch) {
+      if (batch != null && this.batch != null) {
+        LOG.fatal("logic error - batch being overwritten");
+      }
+      this.batch = batch;
+      if (batch != null) {
+        batch.installed++;
+      }
+    }
+
+    public boolean isOrphan() {
+      return (batch == null);
+    }
+
+    public boolean isUnassigned() {
+      return (last_update == -1);
+    }
+
+    public void heartbeat(long time, int version, String worker) {
+      last_version = version;
+      last_update = time;
+      cur_worker_name = worker;
+    }
+
+    public void setUnassigned() {
+      cur_worker_name = null;
+      last_update = -1;
+    }
+  }
+
+  public void handleDeadWorker(String worker_name) {
+    // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
+    // to reason about concurrency. Makes it easier to retry.
+    synchronized (deadWorkersLock) {
+      if (deadWorkers == null) {
+        deadWorkers = new HashSet<String>(100);
+      }
+      deadWorkers.add(worker_name);
+    }
+    LOG.info("dead splitlog worker " + worker_name);
+  }
+
+  /**
+   * Periodically checks all active tasks and resubmits the ones that have timed
+   * out
+   */
+  private class TimeoutMonitor extends Chore {
+    private int reported_tot = -1;
+    private int reported_unassigned = -1;
+    public TimeoutMonitor(final int period, AtomicBoolean stopper) {
+      super("SplitLogManager Timeout Monitor", period, stopper);
+    }
+
+    @Override
+    protected void chore() {
+      int resubmitted = 0;
+      int unassigned = 0;
+      int tot = 0;
+      boolean found_assigned_task = false;
+      Set<String> localDeadWorkers;
+
+      synchronized (deadWorkersLock) {
+        localDeadWorkers = deadWorkers;
+        deadWorkers = null;
+      }
+
+      for (Map.Entry<String, Task> e : tasks.entrySet()) {
+        String path = e.getKey();
+        Task task = e.getValue();
+        String cur_worker = task.cur_worker_name;
+        tot++;
+        // don't easily resubmit a task which hasn't been picked up yet. It
+        // might be a long while before a SplitLogWorker is free to pick up a
+        // task. This is because a SplitLogWorker picks up a task one at a
+        // time. If we want progress when there are no region servers then we
+        // will have to run a SplitLogWorker thread in the Master.
+        if (task.isUnassigned()) {
+          unassigned++;
+          continue;
+        }
+        found_assigned_task = true;
+        if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
+          tot_mgr_resubmit_dead_server_task.incrementAndGet();
+          if (resubmit(path, task, FORCE)) {
+            resubmitted++;
+          } else {
+            handleDeadWorker(cur_worker);
+            LOG.warn("Failed to resubmit task " + path + " owned by dead " +
+                cur_worker + ", will retry.");
+          }
+        } else if (resubmit(path, task, CHECK)) {
+          resubmitted++;
+        }
+      }
+      if (reported_tot != tot || reported_unassigned != unassigned) {
+        LOG.info("total tasks = " + tot + " unassigned = " + unassigned);
+        reported_tot = tot;
+        reported_unassigned = unassigned;
+      }
+      if (resubmitted > 0) {
+        LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
+      }
+      // If there are pending tasks and all of them have been unassigned for
+      // some time then put up a RESCAN node to ping the workers.
+      // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
+      // because a. it is very unlikely that every worker had a
+      // transient error when trying to grab the task b. if there are no
+      // workers then all tasks wills stay unassigned indefinitely and the
+      // manager will be indefinitely creating RESCAN nodes. TODO may be the
+      // master should spawn both a manager and a worker thread to guarantee
+      // that there is always one worker in the system
+      if (tot > 0 && !found_assigned_task &&
+          ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) >
+          unassignedTimeout)) {
+        createRescanNode(Long.MAX_VALUE);
+        tot_mgr_resubmit_unassigned.incrementAndGet();
+        LOG.debug("resubmitting unassigned task(s) after timeout");
+      }
+    }
+  }
+
+  /**
+   * Asynchronous handler for zk create node results.
+   * Retries on failures.
+   */
+  class CreateAsyncCallback implements AsyncCallback.StringCallback {
+    private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
+
+    @Override
+    public void processResult(int rc, String path, Object ctx, String name) {
+      tot_mgr_node_create_result.incrementAndGet();
+      if (rc != 0) {
+        if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
+          LOG.debug("found pre-existing znode " + path);
+          tot_mgr_node_already_exists.incrementAndGet();
+        } else {
+          Long retry_count = (Long)ctx;
+          LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
+              path + " retry=" + retry_count);
+          if (retry_count == 0) {
+            tot_mgr_node_create_err.incrementAndGet();
+            createNodeFailure(path);
+          } else {
+            tot_mgr_node_create_retry.incrementAndGet();
+            createNode(path, retry_count - 1);
+          }
+          return;
+        }
+      }
+      createNodeSuccess(path);
+    }
+  }
+
+  /**
+   * Asynchronous handler for zk get-data-set-watch on node results.
+   * Retries on failures.
+   */
+  class GetDataAsyncCallback implements AsyncCallback.DataCallback {
+    private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
+
+    @Override
+    public void processResult(int rc, String path, Object ctx, byte[] data,
+        Stat stat) {
+      byte[] newData = RecoverableZooKeeper.removeMetaData(data);
+      tot_mgr_get_data_result.incrementAndGet();
+      if (rc != 0) {
+        Long retry_count = (Long) ctx;
+        LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
+            path + " retry=" + retry_count);
+        if (retry_count == 0) {
+          tot_mgr_get_data_err.incrementAndGet();
+          getDataSetWatchFailure(path);
+        } else {
+          tot_mgr_get_data_retry.incrementAndGet();
+          getDataSetWatch(path, retry_count - 1);
+        }
+        return;
+      }
+      getDataSetWatchSuccess(path, newData, stat.getVersion());
+      return;
+    }
+  }
+
+  /**
+   * Asynchronous handler for zk delete node results.
+   * Retries on failures.
+   */
+  class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
+    private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
+
+    @Override
+    public void processResult(int rc, String path, Object ctx) {
+      tot_mgr_node_delete_result.incrementAndGet();
+      if (rc != 0) {
+        if (rc != KeeperException.Code.NONODE.intValue()) {
+          tot_mgr_node_delete_err.incrementAndGet();
+          Long retry_count = (Long) ctx;
+          LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " +
+              path + " retry=" + retry_count);
+          if (retry_count == 0) {
+            LOG.warn("delete failed " + path);
+            deleteNodeFailure(path);
+          } else {
+            deleteNode(path, retry_count - 1);
+          }
+          return;
+        } else {
+        LOG.debug(path
+            + " does not exist, either was never created or was deleted"
+            + " in earlier rounds, zkretries = " + (Long) ctx);
+        }
+      } else {
+        LOG.debug("deleted " + path);
+      }
+      deleteNodeSuccess(path);
+    }
+  }
+
+  /**
+   * Asynchronous handler for zk create RESCAN-node results.
+   * Retries on failures.
+   * <p>
+   * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal
+   * for all the {@link SplitLogWorker}s to rescan for new tasks.
+   */
+  class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
+    private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
+
+    @Override
+    public void processResult(int rc, String path, Object ctx, String name) {
+      if (rc != 0) {
+        Long retry_count = (Long)ctx;
+        LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path +
+            " retry=" + retry_count);
+        if (retry_count == 0) {
+          createRescanFailure();
+        } else {
+          createRescanNode(retry_count - 1);
+        }
+        return;
+      }
+      // path is the original arg, name is the actual name that was created
+      createRescanSuccess(name);
+    }
+  }
+
+  /**
+   * checks whether any new files have appeared in logDir which were
+   * not present in the original logfiles set
+   * @param logdir
+   * @param logfiles
+   * @return True if a new log file is found
+   * @throws IOException
+   */
+  public boolean anyNewLogFiles(Path logdir, FileStatus[] logfiles)
+  throws IOException {
+    if (logdir == null) {
+      return false;
+    }
+    LOG.debug("re-listing " + logdir);
+    tot_mgr_relist_logdir.incrementAndGet();
+    FileStatus[] newfiles = fs.listStatus(logdir);
+    if (newfiles == null) {
+      return false;
+    }
+    boolean matched;
+    for (FileStatus newfile : newfiles) {
+      matched = false;
+      for (FileStatus origfile : logfiles) {
+        if (origfile.equals(newfile)) {
+          matched = true;
+          break;
+        }
+      }
+      if (matched == false) {
+        LOG.warn("Discovered orphan hlog " + newfile + " after split." +
+        " Maybe HRegionServer was not dead when we started");
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * {@link SplitLogManager} can use objects implementing this interface to
+   * finish off a partially done task by {@link SplitLogWorker}. This provides
+   * a serialization point at the end of the task processing.
+   */
+  static public interface TaskFinisher {
+    /**
+     * status that can be returned finish()
+     */
+    static public enum Status {
+      /**
+       * task completed successfully
+       */
+      DONE(),
+      /**
+       * task completed with error
+       */
+      ERR();
+    }
+    /**
+     * finish the partially done task. workername provides clue to where the
+     * partial results of the partially done tasks are present. taskname is the
+     * name of the task that was put up in zookeeper.
+     * <p>
+     * @param workerName
+     * @param taskname
+     * @return DONE if task completed successfully, ERR otherwise
+     */
+    public Status finish(String workerName, String taskname);
+  }
+  enum ResubmitDirective {
+    CHECK(),
+    FORCE();
+  }
+  enum TerminationStatus {
+    SUCCESS(),
+    FAILURE();
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Oct 24 22:30:31 2011
@@ -246,6 +246,9 @@ public class HRegionServer implements HR
 
   private ZooKeeperWrapper zooKeeperWrapper;
 
+  // Log Splitting Worker
+  private SplitLogWorker splitLogWorker;
+
   // A sleeper that sleeps for msgInterval.
   private final Sleeper sleeper;
 
@@ -604,6 +607,9 @@ public class HRegionServer implements HR
     this.leases.closeAfterLeasesExpire();
     this.worker.stop();
     this.server.stop();
+    if (this.splitLogWorker != null) {
+      splitLogWorker.stop();
+    }
     if (this.infoServer != null) {
       LOG.info("Stopping infoServer");
       try {
@@ -1247,6 +1253,10 @@ public class HRegionServer implements HR
     // Start Server.  This service is like leases in that it internally runs
     // a thread.
     this.server.start();
+    // Create the log splitting worker and start it
+    this.splitLogWorker = new SplitLogWorker(this.zooKeeperWrapper,
+        this.getConfiguration(), this.serverInfo.getServerName());
+    splitLogWorker.start();
     LOG.info("HRegionServer started at: " +
       this.serverInfo.getServerAddress().toString());
   }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,576 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * This worker is spawned in every regionserver (should we also spawn one in
+ * the master?). The Worker waits for log splitting tasks to be put up by the
+ * {@link SplitLogManager} running in the master and races with other workers
+ * in other serves to acquire those tasks. The coordination is done via
+ * zookeeper. All the action takes place at /hbase/splitlog znode.
+ * <p>
+ * If a worker has successfully moved the task from state UNASSIGNED to
+ * OWNED then it owns the task. It keeps heart beating the manager by
+ * periodically moving the task from OWNED to OWNED state. On success it
+ * moves the task to SUCCESS. On unrecoverable error it moves task state to
+ * ERR. If it cannot continue but wants the master to retry the task then it
+ * moves the task state to RESIGNED.
+ * <p>
+ * The manager can take a task away from a worker by moving the task from
+ * OWNED to UNASSIGNED. In the absence of a global lock there is a
+ * unavoidable race here - a worker might have just finished its task when it
+ * is stripped of its ownership. Here we rely on the idempotency of the log
+ * splitting task for correctness
+ */
+public class SplitLogWorker implements Runnable, Watcher {
+  private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
+
+  Thread worker;
+  private final String serverName;
+  private final TaskExecutor executor;
+  private long zkretries;
+
+  private Object taskReadyLock = new Object();
+  volatile int taskReadySeq = 0;
+  private volatile String currentTask = null;
+  private int currentVersion;
+  private volatile boolean exitWorker;
+  private Object grabTaskLock = new Object();
+  private boolean workerInGrabTask = false;
+  protected ZooKeeperWrapper watcher;
+
+
+  public SplitLogWorker(ZooKeeperWrapper watcher, Configuration conf,
+      String serverName, TaskExecutor executor) {
+    this.watcher = watcher;
+    this.serverName = serverName;
+    this.executor = executor;
+    this.zkretries = conf.getLong("hbase.splitlog.zk.retries", 3);
+  }
+
+  public SplitLogWorker(ZooKeeperWrapper watcher, final Configuration conf,
+      final String serverName) {
+    this(watcher, conf, serverName, new TaskExecutor () {
+      @Override
+      public Status exec(String filename, CancelableProgressable p) {
+        Path rootdir;
+        FileSystem fs;
+        try {
+          rootdir = FSUtils.getRootDir(conf);
+          fs = rootdir.getFileSystem(conf);
+        } catch (IOException e) {
+          LOG.warn("could not find root dir or fs", e);
+          return Status.RESIGNED;
+        }
+        // TODO have to correctly figure out when log splitting has been
+        // interrupted or has encountered a transient error and when it has
+        // encountered a bad non-retry-able persistent error.
+        try {
+          FileStatus st;
+          try {
+          st = fs.getFileStatus(new Path(filename));
+          } catch (FileNotFoundException ex) {
+            LOG.warn("nothing to do, file doesn't exist - " + filename);
+            return Status.DONE;
+          }
+          if (st == null) {
+            LOG.warn("file status is null for file " + filename);
+            return Status.ERR;
+          }
+          String tmpname =
+            ZKSplitLog.getSplitLogDirTmpComponent(serverName, filename);
+          if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname,
+              st, fs, conf, p) == false) {
+            return Status.PREEMPTED;
+          }
+        } catch (InterruptedIOException iioe) {
+          LOG.warn("log splitting of " + filename + " interrupted, resigning",
+              iioe);
+          return Status.RESIGNED;
+        } catch (IOException e) {
+          Throwable cause = e.getCause();
+          if (cause instanceof InterruptedException) {
+            LOG.warn("log splitting of " + filename + " interrupted, resigning",
+                e);
+            return Status.RESIGNED;
+          }
+          LOG.warn("log splitting of " + filename + " failed, returning error",
+              e);
+          return Status.ERR;
+        }
+        return Status.DONE;
+      }
+    });
+  }
+
+  @Override
+  public void run() {
+    try {
+      LOG.info("SplitLogWorker starting");
+      this.watcher.registerListener(this);
+      int res;
+      // wait for master to create the splitLogZnode
+      res = -1;
+      while (res == -1) {
+        try {
+          res = watcher.checkExists(watcher.splitLogZNode);
+        } catch (KeeperException e) {
+          // ignore
+          LOG.warn("Exception when checking for " + watcher.splitLogZNode +
+              " ... retrying", e);
+        }
+        if (res == -1) {
+          try {
+            LOG.info(watcher.splitLogZNode + " znode does not exist," +
+            " waiting for master to create one");
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode);
+            assert exitWorker == true;
+          }
+        }
+      }
+
+      taskLoop();
+    } catch (Throwable t) {
+      // only a logical error can cause an exception here. Printing it out
+      // to make debugging easier
+      LOG.error("Unexpected error ", t);
+    } finally {
+      LOG.info("SplitLogWorker exiting");
+    }
+  }
+
+  /**
+   * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task
+   * one at a time. This policy puts an upper-limit on the number of
+   * simultaneous log splitting that could be happening in a cluster.
+   * <p>
+   * Synchronization using {@link #task_ready_signal_seq} ensures that it will
+   * try to grab every task that has been put up
+   */
+  private void taskLoop() {
+    while (true) {
+      int seq_start = taskReadySeq;
+      List<String> paths = getTaskList();
+      if (paths == null) {
+        LOG.warn("Could not get tasks, did someone remove " +
+            this.watcher.splitLogZNode + " ... worker thread exiting.");
+        return;
+      }
+      int offset = (int)(Math.random() * paths.size());
+      for (int i = 0; i < paths.size(); i ++) {
+        int idx = (i + offset) % paths.size();
+        // don't call ZKSplitLog.getNodeName() because that will lead to
+        // double encoding of the path name
+        grabTask(watcher.getZNode(watcher.splitLogZNode, paths.get(idx)));
+        if (exitWorker == true) {
+          return;
+        }
+      }
+      synchronized (taskReadyLock) {
+        while (seq_start == taskReadySeq) {
+          try {
+            taskReadyLock.wait();
+          } catch (InterruptedException e) {
+            LOG.warn("SplitLogWorker inteurrpted while waiting for task," +
+                " exiting", e);
+            assert exitWorker == true;
+            return;
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * try to grab a 'lock' on the task zk node to own and execute the task.
+   * <p>
+   * @param path zk node for the task
+   */
+  private void grabTask(String path) {
+    Stat stat = new Stat();
+    long t = -1;
+    byte[] data;
+    synchronized (grabTaskLock) {
+      currentTask = path;
+      workerInGrabTask = true;
+      if (Thread.interrupted()) {
+        return;
+      }
+    }
+    try {
+      if ((data = watcher.getData("", path, stat)) == null) {
+        tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
+        return;
+      }
+      if (TaskState.TASK_UNASSIGNED.equals(data) == false) {
+        tot_wkr_failed_to_grab_task_owned.incrementAndGet();
+        return;
+      }
+      assert ZKSplitLog.isRescanNode(watcher, currentTask) == false;
+      currentVersion = stat.getVersion();
+      if (ownTask(true) == false) {
+        tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
+        return;
+      }
+      LOG.info("worker " + serverName + " acquired task " + path);
+      tot_wkr_task_acquired.incrementAndGet();
+      getDataSetWatchAsync();
+
+      t = System.currentTimeMillis();
+      TaskExecutor.Status status;
+
+      status = executor.exec(ZKSplitLog.getFileName(currentTask),
+          new CancelableProgressable() {
+
+        @Override
+        public boolean progress() {
+          if (ownTask(false) == false) {
+            LOG.warn("Failed to heartbeat the task" + currentTask);
+            return false;
+          }
+          return true;
+        }
+      });
+      switch (status) {
+        case DONE:
+          endTask(TaskState.TASK_DONE, tot_wkr_task_done);
+          break;
+        case PREEMPTED:
+          tot_wkr_preempt_task.incrementAndGet();
+          LOG.warn("task execution prempted " + path);
+          break;
+        case ERR:
+          if (!exitWorker) {
+            endTask(TaskState.TASK_ERR, tot_wkr_task_err);
+            break;
+          }
+          // if the RS is exiting then there is probably a tons of stuff
+          // that can go wrong. Resign instead of signaling error.
+          //$FALL-THROUGH$
+        case RESIGNED:
+          if (exitWorker) {
+            LOG.info("task execution interrupted because worker is exiting " +
+                path);
+            endTask(TaskState.TASK_RESIGNED, tot_wkr_task_resigned);
+          } else {
+            tot_wkr_preempt_task.incrementAndGet();
+            LOG.info("task execution interrupted via zk by manager " +
+                path);
+          }
+          break;
+      }
+    } finally {
+      if (t > 0) {
+        LOG.info("worker " + serverName + " done with task " + path +
+            " in " + (System.currentTimeMillis() - t) + "ms");
+      }
+      synchronized (grabTaskLock) {
+        workerInGrabTask = false;
+        // clear the interrupt from stopTask() otherwise the next task will
+        // suffer
+        Thread.interrupted();
+      }
+    }
+    return;
+  }
+
+  /**
+   * Try to own the task by transitioning the zk node data from UNASSIGNED to
+   * OWNED.
+   * <p>
+   * This method is also used to periodically heartbeat the task progress by
+   * transitioning the node from OWNED to OWNED.
+   * <p>
+   * @return true if task path is successfully locked
+   */
+  private boolean ownTask(boolean isFirstTime) {
+    try {
+      Stat stat = this.watcher.setDataGetStat(currentTask,
+          TaskState.TASK_OWNED.get(serverName), currentVersion);
+      if (stat == null) {
+        LOG.warn("zk.setData() returned null for path " + currentTask);
+        tot_wkr_task_heartbeat_failed.incrementAndGet();
+        return (false);
+      }
+      currentVersion = stat.getVersion();
+      tot_wkr_task_heartbeat.incrementAndGet();
+      return (true);
+    } catch (KeeperException e) {
+      if (!isFirstTime) {
+        if (e.code().equals(KeeperException.Code.NONODE)) {
+          LOG.debug("NONODE failed to assert ownership for " + currentTask, e);
+        } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
+          LOG.debug("BADVERSION failed to assert ownership for " +
+              currentTask, e);
+        } else {
+          LOG.warn("failed to assert ownership for " + currentTask, e);
+        }
+      }
+    } catch (InterruptedException e1) {
+      LOG.warn("Interrupted while trying to assert ownership of " +
+          currentTask + " " + StringUtils.stringifyException(e1));
+      Thread.currentThread().interrupt();
+    }
+    tot_wkr_task_heartbeat_failed.incrementAndGet();
+    return (false);
+  }
+
+  /**
+   * endTask() can fail and the only way to recover out of it is for the
+   * {@link SplitLogManager} to timeout the task node.
+   * @param ts
+   * @param ctr
+   */
+  private void endTask(ZKSplitLog.TaskState ts, AtomicLong ctr) {
+    String path = currentTask;
+    currentTask = null;
+    try {
+      if (watcher.setData(path, ts.get(serverName), currentVersion)) {
+        LOG.info("successfully transitioned task " + path +
+            " to final state " + ts);
+        ctr.incrementAndGet();
+        return;
+      }
+      LOG.warn("failed to transistion task " + path + " to end state " + ts +
+          " because of version mismatch ");
+    } catch (KeeperException.BadVersionException bve) {
+      LOG.warn("transisition task " + path + " to " + ts +
+          " failed because of version mismatch", bve);
+    } catch (KeeperException.NoNodeException e) {
+      LOG.fatal("logic error - end task " + path + " " + ts +
+          " failed because task doesn't exist", e);
+    } catch (KeeperException e) {
+      LOG.warn("failed to end task, " + path + " " + ts, e);
+    }
+    tot_wkr_final_transistion_failed.incrementAndGet();
+    return;
+  }
+
+  void getDataSetWatchAsync() {
+    this.watcher.getZooKeeper().getData(currentTask, this.watcher,
+        new GetDataAsyncCallback(), null);
+    tot_wkr_get_data_queued.incrementAndGet();
+  }
+
+  void getDataSetWatchSuccess(String path, byte[] data) {
+    synchronized (grabTaskLock) {
+      if (workerInGrabTask) {
+        // currentTask can change but that's ok
+        String taskpath = currentTask;
+        if (taskpath != null && taskpath.equals(path)) {
+          // have to compare data. cannot compare version because then there
+          // will be race with ownTask()
+          // cannot just check whether the node has been transitioned to
+          // UNASSIGNED because by the time this worker sets the data watch
+          // the node might have made two transitions - from owned by this
+          // worker to unassigned to owned by another worker
+          if (! TaskState.TASK_OWNED.equals(data, serverName) &&
+              ! TaskState.TASK_DONE.equals(data, serverName) &&
+              ! TaskState.TASK_ERR.equals(data, serverName) &&
+              ! TaskState.TASK_RESIGNED.equals(data, serverName)) {
+            LOG.info("task " + taskpath + " preempted from server " +
+                serverName + " ... current task state and owner - " +
+                new String(data));
+            stopTask();
+          }
+        }
+      }
+    }
+  }
+
+  void getDataSetWatchFailure(String path) {
+    synchronized (grabTaskLock) {
+      if (workerInGrabTask) {
+        // currentTask can change but that's ok
+        String taskpath = currentTask;
+        if (taskpath != null && taskpath.equals(path)) {
+          LOG.info("retrying data watch on " + path);
+          tot_wkr_get_data_retry.incrementAndGet();
+          getDataSetWatchAsync();
+        } else {
+          // no point setting a watch on the task which this worker is not
+          // working upon anymore
+        }
+      }
+    }
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    switch (event.getType()) {
+    case NodeCreated:
+      // no-op
+      break;
+    case NodeDeleted:
+      // no-op
+      break;
+    case NodeDataChanged:
+      nodeDataChanged(event.getPath());
+      break;
+    case NodeChildrenChanged:
+      nodeChildrenChanged(event.getPath());
+      break;
+    }
+  }
+
+  private void nodeDataChanged(String path) {
+    // there will be a self generated dataChanged event every time ownTask()
+    // heartbeats the task znode by upping its version
+    synchronized (grabTaskLock) {
+      if (workerInGrabTask) {
+        // currentTask can change
+        String taskpath = currentTask;
+        if (taskpath!= null && taskpath.equals(path)) {
+          getDataSetWatchAsync();
+        }
+      }
+    }
+  }
+
+  private void nodeChildrenChanged(String path) {
+    if (path.equals(watcher.splitLogZNode)) {
+      LOG.debug("tasks arrived or departed");
+      synchronized (taskReadyLock) {
+        taskReadySeq++;
+        taskReadyLock.notify();
+      }
+    }
+  }
+
+  private List<String> getTaskList() {
+    for (int i = 0; i < zkretries; i++) {
+      try {
+        return (this.watcher
+            .listChildrenAndWatchForNewChildren(this.watcher.splitLogZNode));
+      } catch (KeeperException e) {
+        LOG.warn("Could not get children of znode " +
+            this.watcher.splitLogZNode, e);
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e1) {
+          LOG.warn("Interrupted while trying to get task list ...", e1);
+          Thread.currentThread().interrupt();
+          return null;
+        }
+      }
+    }
+    LOG.warn("Tried " + zkretries + " times, still couldn't fetch " +
+        "children of " + watcher.splitLogZNode + " giving up");
+    return null;
+  }
+
+  /**
+   * If the worker is doing a task i.e. splitting a log file then stop the task.
+   * It doesn't exit the worker thread.
+   */
+  void stopTask() {
+    LOG.info("Sending interrupt to stop the worker thread");
+    worker.interrupt(); // TODO interrupt often gets swallowed, do what else?
+  }
+
+
+  /**
+   * start the SplitLogWorker thread
+   */
+  public void start() {
+    worker = new Thread(null, this, "SplitLogWorker-" + serverName);
+    exitWorker = false;
+    worker.start();
+    return;
+  }
+
+  /**
+   * stop the SplitLogWorker thread
+   */
+  public void stop() {
+    exitWorker = true;
+    stopTask();
+  }
+
+  /**
+   * Asynchronous handler for zk get-data-set-watch on node results.
+   */
+  class GetDataAsyncCallback implements AsyncCallback.DataCallback {
+    private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
+
+    @Override
+    public void processResult(int rc, String path, Object ctx, byte[] data,
+        Stat stat) {
+      byte[] newData = RecoverableZooKeeper.removeMetaData(data);
+      tot_wkr_get_data_result.incrementAndGet();
+      if (rc != 0) {
+        LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
+        getDataSetWatchFailure(path);
+        return;
+      }
+      getDataSetWatchSuccess(path, newData);
+      return;
+    }
+  }
+
+  /**
+   * Objects implementing this interface actually do the task that has been
+   * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight
+   * guarantee that two workers will not be executing the same task therefore it
+   * is better to have workers prepare the task and then have the
+   * {@link SplitLogManager} commit the work in
+   * {@link SplitLogManager.TaskFinisher}
+   */
+  static public interface TaskExecutor {
+    static public enum Status {
+      DONE(),
+      ERR(),
+      RESIGNED(),
+      PREEMPTED();
+    }
+    public Status exec(String name, CancelableProgressable p);
+  }
+}