You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2013/11/24 20:31:46 UTC

svn commit: r1545056 - in /hbase/branches/0.96: hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/ hbase...

Author: jeffreyz
Date: Sun Nov 24 19:31:45 2013
New Revision: 1545056

URL: http://svn.apache.org/r1545056
Log:
HBASE-9736: Allow more than one log splitter per RS

Added:
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
Modified:
    hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
    hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java

Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java?rev=1545056&r1=1545055&r2=1545056&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java Sun Nov 24 19:31:45 2013
@@ -250,7 +250,15 @@ public enum EventType {
    * 
    * RS_PARALLEL_SEEK
    */
-  RS_PARALLEL_SEEK          (80, ExecutorType.RS_PARALLEL_SEEK);
+  RS_PARALLEL_SEEK          (80, ExecutorType.RS_PARALLEL_SEEK),
+  
+  /**
+   * RS wal recovery work items(either creating recover.edits or directly replay wals)
+   * to be executed on the RS.<br>
+   * 
+   * RS_LOG_REPLAY
+   */
+  RS_LOG_REPLAY             (81, ExecutorType.RS_LOG_REPLAY_OPS);
 
   private final int code;
   private final ExecutorType executor;

Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java?rev=1545056&r1=1545055&r2=1545056&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java Sun Nov 24 19:31:45 2013
@@ -44,7 +44,8 @@ public enum ExecutorType {
   RS_CLOSE_REGION            (23),
   RS_CLOSE_ROOT              (24),
   RS_CLOSE_META              (25),
-  RS_PARALLEL_SEEK           (26);
+  RS_PARALLEL_SEEK           (26),
+  RS_LOG_REPLAY_OPS          (27);
 
   ExecutorType(int value) {}
 

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1545056&r1=1545055&r2=1545056&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Nov 24 19:31:45 2013
@@ -1526,6 +1526,8 @@ public class HRegionServer implements Cl
       this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
         conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
     }
+    this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
+      conf.getInt("hbase.regionserver.wal.max.splitters", SplitLogWorker.DEFAULT_MAX_SPLITTERS));
 
     Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
         uncaughtExceptionHandler);

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1545056&r1=1545055&r2=1545056&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Sun Nov 24 19:31:45 2013
@@ -25,23 +25,27 @@ import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -78,12 +82,17 @@ import org.apache.zookeeper.data.Stat;
  */
 @InterfaceAudience.Private
 public class SplitLogWorker extends ZooKeeperListener implements Runnable {
+  public static final int DEFAULT_MAX_SPLITTERS = 2;
+
   private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
   private static final int checkInterval = 5000; // 5 seconds
+  private static final int FAILED_TO_OWN_TASK = -1;
 
   Thread worker;
   private final ServerName serverName;
   private final TaskExecutor splitTaskExecutor;
+  // thread pool which executes recovery work
+  private final ExecutorService executorService;
 
   private final Object taskReadyLock = new Object();
   volatile int taskReadySeq = 0;
@@ -95,9 +104,11 @@ public class SplitLogWorker extends ZooK
   private final int report_period;
   private RegionServerServices server = null;
   private Configuration conf = null;
+  protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
+  private int maxConcurrentTasks = 0;
 
-  public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
-      RegionServerServices server, TaskExecutor splitTaskExecutor) {
+  public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server,
+      TaskExecutor splitTaskExecutor) {
     super(watcher);
     this.server = server;
     this.serverName = server.getServerName();
@@ -105,16 +116,9 @@ public class SplitLogWorker extends ZooK
     report_period = conf.getInt("hbase.splitlog.report.period",
       conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
     this.conf = conf;
-  }
-
-  public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName,
-      TaskExecutor splitTaskExecutor) {
-    super(watcher);
-    this.serverName = serverName;
-    this.splitTaskExecutor = splitTaskExecutor;
-    report_period = conf.getInt("hbase.splitlog.report.period",
-      conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
-    this.conf = conf;
+    this.executorService = this.server.getExecutorService();
+    this.maxConcurrentTasks =
+        conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS);
   }
 
   public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
@@ -238,11 +242,18 @@ public class SplitLogWorker extends ZooK
           break;
         }
       }
-      for (int i = 0; i < paths.size(); i ++) {
+      int numTasks = paths.size();
+      for (int i = 0; i < numTasks; i++) {
         int idx = (i + offset) % paths.size();
         // don't call ZKSplitLog.getNodeName() because that will lead to
         // double encoding of the path name
-        grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
+        if (this.calculateAvailableSplitters(numTasks) > 0) {
+          grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
+        } else {
+          LOG.debug("Current region server " + this.serverName + " has "
+              + this.tasksInProgress.get() + " tasks in progress and can't take more.");
+          break;
+        }
         if (exitWorker) {
           return;
         }
@@ -337,72 +348,33 @@ public class SplitLogWorker extends ZooK
         return;
       }
 
-      currentVersion = stat.getVersion();
-      if (!attemptToOwnTask(true)) {
+      currentVersion = attemptToOwnTask(true, watcher, serverName, path, stat.getVersion());
+      if (currentVersion < 0) {
         SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
         return;
       }
 
       if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
-        endTask(new SplitLogTask.Done(this.serverName),
-          SplitLogCounters.tot_wkr_task_acquired_rescan);
+        HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName),
+          SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion);
         return;
       }
+
       LOG.info("worker " + serverName + " acquired task " + path);
       SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
       getDataSetWatchAsync();
 
-      t = System.currentTimeMillis();
-      TaskExecutor.Status status;
-
-      status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask),
-          new CancelableProgressable() {
-
-        private long last_report_at = 0;
+      submitTask(path, currentVersion, this.report_period);
 
-        @Override
-        public boolean progress() {
-          long t = EnvironmentEdgeManager.currentTimeMillis();
-          if ((t - last_report_at) > report_period) {
-            last_report_at = t;
-            if (!attemptToOwnTask(false)) {
-              LOG.warn("Failed to heartbeat the task" + currentTask);
-              return false;
-            }
-          }
-          return true;
-        }
-      });
-
-      switch (status) {
-        case DONE:
-          endTask(new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_done);
-          break;
-        case PREEMPTED:
-          SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
-          LOG.warn("task execution prempted " + path);
-          break;
-        case ERR:
-          if (!exitWorker) {
-            endTask(new SplitLogTask.Err(this.serverName), SplitLogCounters.tot_wkr_task_err);
-            break;
-          }
-          // if the RS is exiting then there is probably a tons of stuff
-          // that can go wrong. Resign instead of signaling error.
-          //$FALL-THROUGH$
-        case RESIGNED:
-          if (exitWorker) {
-            LOG.info("task execution interrupted because worker is exiting " + path);
-          }
-          endTask(new SplitLogTask.Resigned(this.serverName), 
-            SplitLogCounters.tot_wkr_task_resigned);
-          break;
+      // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
+      try {
+        int sleepTime = RandomUtils.nextInt(500) + 500;
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while yielding for other region servers", e);
+        Thread.currentThread().interrupt();
       }
     } finally {
-      if (t > 0) {
-        LOG.info("worker " + serverName + " done with task " + path +
-            " in " + (System.currentTimeMillis() - t) + "ms");
-      }
       synchronized (grabTaskLock) {
         workerInGrabTask = false;
         // clear the interrupt from stopTask() otherwise the next task will
@@ -412,76 +384,109 @@ public class SplitLogWorker extends ZooK
     }
   }
 
+
   /**
-   * Try to own the task by transitioning the zk node data from UNASSIGNED to
-   * OWNED.
+   * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED.
    * <p>
-   * This method is also used to periodically heartbeat the task progress by
-   * transitioning the node from OWNED to OWNED.
+   * This method is also used to periodically heartbeat the task progress by transitioning the node
+   * from OWNED to OWNED.
    * <p>
-   * @return true if task path is successfully locked
+   * @param isFirstTime
+   * @param zkw
+   * @param server
+   * @param task
+   * @param taskZKVersion
+   * @return non-negative integer value when task can be owned by current region server otherwise -1
    */
-  private boolean attemptToOwnTask(boolean isFirstTime) {
+  protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
+      ServerName server, String task, int taskZKVersion) {
+    int latestZKVersion = FAILED_TO_OWN_TASK;
     try {
-      SplitLogTask slt = new SplitLogTask.Owned(this.serverName);
-      Stat stat =
-        this.watcher.getRecoverableZooKeeper().setData(currentTask, slt.toByteArray(), currentVersion);
+      SplitLogTask slt = new SplitLogTask.Owned(server);
+      Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
       if (stat == null) {
-        LOG.warn("zk.setData() returned null for path " + currentTask);
+        LOG.warn("zk.setData() returned null for path " + task);
         SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
-        return (false);
+        return FAILED_TO_OWN_TASK;
       }
-      currentVersion = stat.getVersion();
+      latestZKVersion = stat.getVersion();
       SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
-      return (true);
+      return latestZKVersion;
     } catch (KeeperException e) {
       if (!isFirstTime) {
         if (e.code().equals(KeeperException.Code.NONODE)) {
-          LOG.warn("NONODE failed to assert ownership for " + currentTask, e);
+          LOG.warn("NONODE failed to assert ownership for " + task, e);
         } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
-          LOG.warn("BADVERSION failed to assert ownership for " +
-              currentTask, e);
+          LOG.warn("BADVERSION failed to assert ownership for " + task, e);
         } else {
-          LOG.warn("failed to assert ownership for " + currentTask, e);
+          LOG.warn("failed to assert ownership for " + task, e);
         }
       }
     } catch (InterruptedException e1) {
       LOG.warn("Interrupted while trying to assert ownership of " +
-          currentTask + " " + StringUtils.stringifyException(e1));
+          task + " " + StringUtils.stringifyException(e1));
       Thread.currentThread().interrupt();
     }
     SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
-    return (false);
+    return FAILED_TO_OWN_TASK;
   }
 
   /**
-   * endTask() can fail and the only way to recover out of it is for the
-   * {@link SplitLogManager} to timeout the task node.
-   * @param slt
-   * @param ctr
+   * This function calculates how many splitters it could create based on expected average tasks per
+   * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
+   * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
+   * @param numTasks current total number of available tasks
+   * @return
    */
-  private void endTask(SplitLogTask slt, AtomicLong ctr) {
-    String path = currentTask;
-    currentTask = null;
+  private int calculateAvailableSplitters(int numTasks) {
+    // at lease one RS(itself) available
+    int availableRSs = 1;
     try {
-      if (ZKUtil.setData(this.watcher, path, slt.toByteArray(),
-          currentVersion)) {
-        LOG.info("successfully transitioned task " + path + " to final state " + slt);
-        ctr.incrementAndGet();
-        return;
-      }
-      LOG.warn("failed to transistion task " + path + " to end state " + slt +
-          " because of version mismatch ");
-    } catch (KeeperException.BadVersionException bve) {
-      LOG.warn("transisition task " + path + " to " + slt +
-          " failed because of version mismatch", bve);
-    } catch (KeeperException.NoNodeException e) {
-      LOG.fatal("logic error - end task " + path + " " + slt +
-          " failed because task doesn't exist", e);
+      List<String> regionServers = ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
+      availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
     } catch (KeeperException e) {
-      LOG.warn("failed to end task, " + path + " " + slt, e);
+      // do nothing
+      LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
     }
-    SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
+
+    int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
+    expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
+    // calculate how many more splitters we could spawn
+    return Math.min(expectedTasksPerRS, this.maxConcurrentTasks) - this.tasksInProgress.get();
+  }
+
+  /**
+   * Submit a log split task to executor service
+   * @param curTask
+   * @param curTaskZKVersion
+   */
+  void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) {
+    final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
+
+    CancelableProgressable reporter = new CancelableProgressable() {
+      private long last_report_at = 0;
+
+      @Override
+      public boolean progress() {
+        long t = EnvironmentEdgeManager.currentTimeMillis();
+        if ((t - last_report_at) > reportPeriod) {
+          last_report_at = t;
+          int latestZKVersion =
+              attemptToOwnTask(false, watcher, serverName, curTask, zkVersion.intValue());
+          if (latestZKVersion < 0) {
+            LOG.warn("Failed to heartbeat the task" + curTask);
+            return false;
+          }
+          zkVersion.setValue(latestZKVersion);
+        }
+        return true;
+      }
+    };
+    
+    HLogSplitterHandler hsh =
+        new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, this.tasksInProgress,
+            this.splitTaskExecutor);
+    this.executorService.submit(hsh);
   }
 
   void getDataSetWatchAsync() {

Added: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java?rev=1545056&view=auto
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java (added)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java Sun Nov 24 19:31:45 2013
@@ -0,0 +1,141 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.handler;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
+import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Handles log splitting a wal
+ */
+@InterfaceAudience.Private
+public class HLogSplitterHandler extends EventHandler {
+  private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class);
+  private final ServerName serverName;
+  private final String curTask;
+  private final String wal;
+  private final ZooKeeperWatcher zkw;
+  private final CancelableProgressable reporter;
+  private final AtomicInteger inProgressTasks;
+  private final MutableInt curTaskZKVersion;
+  private final TaskExecutor splitTaskExecutor;
+
+  public HLogSplitterHandler(final Server server, String curTask,
+      final MutableInt curTaskZKVersion,
+      CancelableProgressable reporter,
+      AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) {
+	  super(server, EventType.RS_LOG_REPLAY);
+    this.curTask = curTask;
+    this.wal = ZKSplitLog.getFileName(curTask);
+    this.reporter = reporter;
+    this.inProgressTasks = inProgressTasks;
+    this.inProgressTasks.incrementAndGet();
+    this.serverName = server.getServerName();
+    this.zkw = server.getZooKeeper();
+    this.curTaskZKVersion = curTaskZKVersion;
+    this.splitTaskExecutor = splitTaskExecutor;
+  }
+
+  @Override
+  public void process() throws IOException {
+    long startTime = System.currentTimeMillis();
+    try {
+      Status status = this.splitTaskExecutor.exec(wal, reporter);
+      switch (status) {
+      case DONE:
+        endTask(zkw, new SplitLogTask.Done(this.serverName),
+          SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue());
+        break;
+      case PREEMPTED:
+        SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
+        LOG.warn("task execution prempted " + wal);
+        break;
+      case ERR:
+        if (server != null && !server.isStopped()) {
+          endTask(zkw, new SplitLogTask.Err(this.serverName),
+            SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue());
+          break;
+        }
+        // if the RS is exiting then there is probably a tons of stuff
+        // that can go wrong. Resign instead of signaling error.
+        //$FALL-THROUGH$
+      case RESIGNED:
+        if (server != null && server.isStopped()) {
+          LOG.info("task execution interrupted because worker is exiting " + curTask);
+        }
+        endTask(zkw, new SplitLogTask.Resigned(this.serverName),
+          SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue());
+        break;
+      }
+    } finally {
+      LOG.info("worker " + serverName + " done with task " + curTask + " in "
+          + (System.currentTimeMillis() - startTime) + "ms");
+      this.inProgressTasks.decrementAndGet();
+    }
+  }
+  
+  /**
+   * endTask() can fail and the only way to recover out of it is for the
+   * {@link SplitLogManager} to timeout the task node.
+   * @param slt
+   * @param ctr
+   */
+  public static void endTask(ZooKeeperWatcher zkw, SplitLogTask slt, AtomicLong ctr, String task,
+      int taskZKVersion) {
+    try {
+      if (ZKUtil.setData(zkw, task, slt.toByteArray(), taskZKVersion)) {
+        LOG.info("successfully transitioned task " + task + " to final state " + slt);
+        ctr.incrementAndGet();
+        return;
+      }
+      LOG.warn("failed to transistion task " + task + " to end state " + slt
+          + " because of version mismatch ");
+    } catch (KeeperException.BadVersionException bve) {
+      LOG.warn("transisition task " + task + " to " + slt
+          + " failed because of version mismatch", bve);
+    } catch (KeeperException.NoNodeException e) {
+      LOG.fatal(
+        "logic error - end task " + task + " " + slt
+          + " failed because task doesn't exist", e);
+    } catch (KeeperException e) {
+      LOG.warn("failed to end task, " + task + " " + slt, e);
+    }
+    SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
+  }
+}

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1545056&r1=1545055&r2=1545056&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Sun Nov 24 19:31:45 2013
@@ -798,7 +798,7 @@ public class HLogSplitter {
     private OutputSink outputSink = null;
 
     WriterThread(OutputSink sink, int i) {
-      super("WriterThread-" + i);
+      super(Thread.currentThread().getName() + "-Writer-" + i);
       outputSink = sink;
     }
 

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1545056&r1=1545055&r2=1545056&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Sun Nov 24 19:31:45 2013
@@ -148,6 +148,7 @@ public class TestDistributedLogSplitting
     conf.setInt("zookeeper.recovery.retry", 0);
     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
     conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
+    conf.setInt("hbase.regionserver.wal.max.splitters", 3);
     TEST_UTIL = new HBaseTestingUtility(conf);
     TEST_UTIL.setDFSCluster(dfsCluster);
     TEST_UTIL.setZkCluster(zkCluster);
@@ -185,6 +186,7 @@ public class TestDistributedLogSplitting
   @Test (timeout=300000)
   public void testRecoveredEdits() throws Exception {
     LOG.info("testRecoveredEdits");
+    conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
     startCluster(NUM_RS);
 
@@ -241,10 +243,12 @@ public class TestDistributedLogSplitting
         HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
       LOG.debug("checking edits dir " + editsdir);
       FileStatus[] files = fs.listStatus(editsdir);
-      assertEquals(1, files.length);
-      int c = countHLog(files[0].getPath(), fs, conf);
-      count += c;
-      LOG.info(c + " edits in " + files[0].getPath());
+      assertTrue(files.length > 1);
+      for (int i = 0; i < files.length; i++) {
+        int c = countHLog(files[i].getPath(), fs, conf);
+        count += c;
+      }
+      LOG.info(count + " edits in " + files.length + " recovered edits files.");
     }
     assertEquals(NUM_LOG_LINES, count);
   }
@@ -252,6 +256,7 @@ public class TestDistributedLogSplitting
   @Test(timeout = 300000)
   public void testLogReplayWithNonMetaRSDown() throws Exception {
     LOG.info("testLogReplayWithNonMetaRSDown");
+    conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
     conf.setLong("hbase.regionserver.hlog.blocksize", 100*1024);
     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
     startCluster(NUM_RS);

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java?rev=1545056&r1=1545055&r2=1545056&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java (original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java Sun Nov 24 19:31:45 2013
@@ -20,18 +20,25 @@ package org.apache.hadoop.hbase.regionse
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -56,6 +63,7 @@ public class TestSplitLogWorker {
     new HBaseTestingUtility();
   private ZooKeeperWatcher zkw;
   private SplitLogWorker slw;
+  private ExecutorService executorService;
 
   private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
       throws Exception {
@@ -69,14 +77,14 @@ public class TestSplitLogWorker {
     return waitForCounterBoolean(ctr, oldval, newval, timems, true);
   }
 
-  private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval,
+  private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, final long newval,
       long timems, boolean failIfTimeout) throws Exception {
 
     long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout,
       new Waiter.Predicate<Exception>() {
       @Override
       public boolean evaluate() throws Exception {
-        return (ctr.get() != oldval);
+            return (ctr.get() >= newval);
       }
     });
 
@@ -99,11 +107,18 @@ public class TestSplitLogWorker {
     ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
     assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
     LOG.debug(zkw.splitLogZNode + " created");
+    ZKUtil.createAndFailSilent(zkw, zkw.rsZNode);
+    assertTrue(ZKUtil.checkExists(zkw, zkw.rsZNode) != -1);
     SplitLogCounters.resetCounters();
+    executorService = new ExecutorService("TestSplitLogWorker");
+    executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
   }
 
   @After
   public void teardown() throws Exception {
+    if (executorService != null) {
+      executorService.shutdown();
+    }
     TEST_UTIL.shutdownMiniZKCluster();
   }
 
@@ -132,11 +147,13 @@ public class TestSplitLogWorker {
     SplitLogCounters.resetCounters();
     final String TATAS = "tatas";
     final ServerName RS = ServerName.valueOf("rs,1,1");
+    RegionServerServices mockedRS = getRegionServer(RS);
     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
       new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
-    SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), RS, neverEndingTask);
+    SplitLogWorker slw =
+        new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
     slw.start();
     try {
       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500);
@@ -169,9 +186,12 @@ public class TestSplitLogWorker {
     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
       new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
-
-    SplitLogWorker slw1 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SVR1, neverEndingTask);
-    SplitLogWorker slw2 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SVR2, neverEndingTask);
+    RegionServerServices mockedRS1 = getRegionServer(SVR1);
+    RegionServerServices mockedRS2 = getRegionServer(SVR2);
+    SplitLogWorker slw1 =
+        new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
+    SplitLogWorker slw2 =
+        new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
     slw1.start();
     slw2.start();
     try {
@@ -195,7 +215,9 @@ public class TestSplitLogWorker {
     SplitLogCounters.resetCounters();
     final ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
     final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
-    SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask);
+    RegionServerServices mockedRS = getRegionServer(SRV);
+    SplitLogWorker slw =
+        new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
     slw.start();
     try {
       Thread.yield(); // let the worker start
@@ -226,7 +248,9 @@ public class TestSplitLogWorker {
     SplitLogCounters.resetCounters();
     final ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
     final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
-    SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask);
+    RegionServerServices mockedRS = getRegionServer(SRV);
+    SplitLogWorker slw =
+        new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
     slw.start();
     try {
       Thread.yield(); // let the worker start
@@ -266,7 +290,8 @@ public class TestSplitLogWorker {
     LOG.info("testRescan");
     SplitLogCounters.resetCounters();
     final ServerName SRV = ServerName.valueOf("svr,1,1");
-    slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask);
+    RegionServerServices mockedRS = getRegionServer(SRV);
+    slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
     slw.start();
     Thread.yield(); // let the worker start
     Thread.sleep(100);
@@ -312,4 +337,100 @@ public class TestSplitLogWorker {
     assertEquals(2, num);
   }
 
+  @Test
+  public void testAcquireMultiTasks() throws Exception {
+    LOG.info("testAcquireMultiTasks");
+    SplitLogCounters.resetCounters();
+    final String TATAS = "tatas";
+    final ServerName RS = ServerName.valueOf("rs,1,1");
+    final int maxTasks = 3;
+    Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+    testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
+    RegionServerServices mockedRS = getRegionServer(RS);
+
+    for (int i = 0; i < maxTasks; i++) {
+      zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
+        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
+        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+
+    SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
+    slw.start();
+    try {
+      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, 3000);
+      for (int i = 0; i < maxTasks; i++) {
+        byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
+        SplitLogTask slt = SplitLogTask.parseFrom(bytes);
+        assertTrue(slt.isOwned(RS));
+      }
+    } finally {
+      stopSplitLogWorker(slw);
+    }
+  }
+
+  /**
+   * The test checks SplitLogWorker should not spawn more splitters than expected num of tasks per
+   * RS
+   * @throws Exception
+   */
+  @Test
+  public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception {
+    LOG.info("testAcquireMultiTasks");
+    SplitLogCounters.resetCounters();
+    final String TATAS = "tatas";
+    final ServerName RS = ServerName.valueOf("rs,1,1");
+    final ServerName RS2 = ServerName.valueOf("rs,1,2");
+    final int maxTasks = 3;
+    Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+    testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
+    RegionServerServices mockedRS = getRegionServer(RS);
+
+    // create two RS nodes
+    String rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS.getServerName());
+    zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+    rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS2.getServerName());
+    zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+
+    for (int i = 0; i < maxTasks; i++) {
+      zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
+        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
+        Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+    }
+
+    SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
+    slw.start();
+    try {
+      int acquiredTasks = 0;
+      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, 3000);
+      for (int i = 0; i < maxTasks; i++) {
+        byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
+        SplitLogTask slt = SplitLogTask.parseFrom(bytes);
+        if (slt.isOwned(RS)) {
+          acquiredTasks++;
+        }
+      }
+      assertEquals(2, acquiredTasks);
+    } finally {
+      stopSplitLogWorker(slw);
+    }
+  }
+
+  /**
+   * Create a mocked region server service instance
+   * @param server
+   * @return
+   */
+  private RegionServerServices getRegionServer(ServerName name) {
+
+    RegionServerServices mockedServer = mock(RegionServerServices.class);
+    when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
+    when(mockedServer.getServerName()).thenReturn(name);
+    when(mockedServer.getZooKeeper()).thenReturn(zkw);
+    when(mockedServer.isStopped()).thenReturn(false);
+    when(mockedServer.getExecutorService()).thenReturn(executorService);
+
+    return mockedServer;
+  }
+
 }