You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2014/06/15 20:43:56 UTC

[1/2] HBASE-11094: Distributed log replay is incompatible for rolling restarts

Repository: hbase
Updated Branches:
  refs/heads/0.98 9ce175146 -> 34ae4a94d


http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
index 4ac800e..9bfdeed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
 import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -55,11 +56,12 @@ public class HLogSplitterHandler extends EventHandler {
   private final AtomicInteger inProgressTasks;
   private final MutableInt curTaskZKVersion;
   private final TaskExecutor splitTaskExecutor;
+  private final RecoveryMode mode;
 
   public HLogSplitterHandler(final Server server, String curTask,
       final MutableInt curTaskZKVersion,
       CancelableProgressable reporter,
-      AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) {
+      AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) {
 	  super(server, EventType.RS_LOG_REPLAY);
     this.curTask = curTask;
     this.wal = ZKSplitLog.getFileName(curTask);
@@ -70,16 +72,17 @@ public class HLogSplitterHandler extends EventHandler {
     this.zkw = server.getZooKeeper();
     this.curTaskZKVersion = curTaskZKVersion;
     this.splitTaskExecutor = splitTaskExecutor;
+    this.mode = mode;
   }
 
   @Override
   public void process() throws IOException {
     long startTime = System.currentTimeMillis();
     try {
-      Status status = this.splitTaskExecutor.exec(wal, reporter);
+      Status status = this.splitTaskExecutor.exec(wal, mode, reporter);
       switch (status) {
       case DONE:
-        endTask(zkw, new SplitLogTask.Done(this.serverName),
+        endTask(zkw, new SplitLogTask.Done(this.serverName, this.mode),
           SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue());
         break;
       case PREEMPTED:
@@ -88,7 +91,7 @@ public class HLogSplitterHandler extends EventHandler {
         break;
       case ERR:
         if (server != null && !server.isStopped()) {
-          endTask(zkw, new SplitLogTask.Err(this.serverName),
+          endTask(zkw, new SplitLogTask.Err(this.serverName, this.mode),
             SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue());
           break;
         }
@@ -99,7 +102,7 @@ public class HLogSplitterHandler extends EventHandler {
         if (server != null && server.isStopped()) {
           LOG.info("task execution interrupted because worker is exiting " + curTask);
         }
-        endTask(zkw, new SplitLogTask.Resigned(this.serverName),
+        endTask(zkw, new SplitLogTask.Resigned(this.serverName, this.mode),
           SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue());
         break;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index d709b8b..545c253 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.Mut
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
@@ -167,7 +168,7 @@ public class HLogSplitter {
   private final int minBatchSize;
 
   HLogSplitter(Configuration conf, Path rootDir,
-      FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) {
+      FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw, RecoveryMode mode) {
     this.conf = HBaseConfiguration.create(conf);
     String codecClassName = conf
         .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
@@ -184,7 +185,7 @@ public class HLogSplitter {
     // a larger minBatchSize may slow down recovery because replay writer has to wait for
     // enough edits before replaying them
     this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
-    this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
+    this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
 
     this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
     if (zkw != null && this.distributedLogReplay) {
@@ -218,8 +219,8 @@ public class HLogSplitter {
    */
   public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
-      ZooKeeperWatcher zkw) throws IOException {
-    HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw);
+      ZooKeeperWatcher zkw, RecoveryMode mode) throws IOException {
+    HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, mode);
     return s.splitLogFile(logfile, reporter);
   }
 
@@ -233,7 +234,8 @@ public class HLogSplitter {
     List<Path> splits = new ArrayList<Path>();
     if (logfiles != null && logfiles.length > 0) {
       for (FileStatus logfile: logfiles) {
-        HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null);
+        HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, 
+          RecoveryMode.LOG_SPLITTING);
         if (s.splitLogFile(logfile, null)) {
           finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
           if (s.outputSink.splits != null) {
@@ -1972,14 +1974,4 @@ public class HLogSplitter {
 
     return mutations;
   }
-
-  /**
-   * Returns if distributed log replay is turned on or not
-   * @param conf
-   * @return true when distributed log replay is turned on
-   */
-  public static boolean isDistributedLogReplay(Configuration conf) {
-    return conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
-      HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
index d7082ed..b46ec28 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -120,7 +121,8 @@ public class TestSerialization {
 
   @Test
   public void testSplitLogTask() throws DeserializationException {
-    SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"));
+    SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), 
+      RecoveryMode.LOG_REPLAY);
     byte [] bytes = slt.toByteArray();
     SplitLogTask sltDeserialized = SplitLogTask.parseFrom(bytes);
     assertTrue(slt.equals(sltDeserialized));

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
index 8cdaef6..6595333 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
@@ -69,7 +69,7 @@ public class TestMultiParallel {
   private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
   private static final byte [][] KEYS = makeKeys();
 
-  private static final int slaves = 2; // also used for testing HTable pool size
+  private static final int slaves = 5; // also used for testing HTable pool size
 
   @BeforeClass public static void beforeClass() throws Exception {
     ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
@@ -696,4 +696,4 @@ public class TestMultiParallel {
       validateEmpty(result);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
index 8ba8012..1ba8eac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -638,10 +639,14 @@ public class TestAssignmentManager {
     DeadServer deadServers = new DeadServer();
     deadServers.add(SERVERNAME_A);
     // I need a services instance that will return the AM
+    MasterFileSystem fs = Mockito.mock(MasterFileSystem.class);
+    Mockito.doNothing().when(fs).setLogRecoveryMode();
+    Mockito.when(fs.getLogRecoveryMode()).thenReturn(RecoveryMode.LOG_REPLAY);
     MasterServices services = Mockito.mock(MasterServices.class);
     Mockito.when(services.getAssignmentManager()).thenReturn(am);
     Mockito.when(services.getServerManager()).thenReturn(this.serverManager);
     Mockito.when(services.getZooKeeper()).thenReturn(this.watcher);
+    Mockito.when(services.getMasterFileSystem()).thenReturn(fs);
     ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
       services, deadServers, SERVERNAME_A, false);
     am.failoverCleanupDone.set(true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index 72bd0d1..c5f869d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -159,6 +159,7 @@ public class TestDistributedLogSplitting {
     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
     conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
     conf.setInt("hbase.regionserver.wal.max.splitters", 3);
+    conf.setInt("hfile.format.version", 3);
     TEST_UTIL = new HBaseTestingUtility(conf);
     TEST_UTIL.setDFSCluster(dfsCluster);
     TEST_UTIL.setZkCluster(zkCluster);
@@ -1169,7 +1170,6 @@ public class TestDistributedLogSplitting {
     LOG.info("testSameVersionUpdatesRecovery");
     conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-    conf.setInt("hfile.format.version", 3);
     startCluster(NUM_RS);
     final AtomicLong sequenceId = new AtomicLong(100);
     final int NUM_REGIONS_TO_CREATE = 40;
@@ -1261,7 +1261,6 @@ public class TestDistributedLogSplitting {
     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
     conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
     conf.setInt("hbase.hstore.compactionThreshold", 3);
-    conf.setInt("hfile.format.version", 3);
     startCluster(NUM_RS);
     final AtomicLong sequenceId = new AtomicLong(100);
     final int NUM_REGIONS_TO_CREATE = 40;

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
index 2fbb849..8f57ee4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
@@ -94,8 +94,8 @@ public class TestMasterFileSystem {
     // Create a ZKW to use in the test
     ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(UTIL);
     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, walPath),
-      new SplitLogTask.Owned(inRecoveryServerName).toByteArray(), Ids.OPEN_ACL_UNSAFE,
-      CreateMode.PERSISTENT);
+      new SplitLogTask.Owned(inRecoveryServerName, fs.getLogRecoveryMode()).toByteArray(), 
+        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     String staleRegionPath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, staleRegion);
     ZKUtil.createWithParents(zkw, staleRegionPath);
     String inRecoveringRegionPath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, failedRegion);

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
index ed51484..4988742 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
@@ -48,16 +48,21 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
 import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -90,6 +95,7 @@ public class TestSplitLogManager {
   private SplitLogManager slm;
   private Configuration conf;
   private int to;
+  private RecoveryMode mode;
 
   private static HBaseTestingUtility TEST_UTIL;
 
@@ -133,7 +139,11 @@ public class TestSplitLogManager {
     conf.setInt("hbase.splitlog.manager.timeout", to);
     conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
     conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+    conf.setInt("hfile.format.version", 3);
     to = to + 4 * 100;
+    
+    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
+        RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
   }
 
   @After
@@ -213,7 +223,7 @@ public class TestSplitLogManager {
     LOG.info("TestOrphanTaskAcquisition");
 
     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
-    SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER);
+    SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode);
     zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
@@ -238,7 +248,7 @@ public class TestSplitLogManager {
         " startup");
     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
     //create an unassigned orphan task
-    SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER);
+    SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode);
     zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
     int version = ZKUtil.checkExists(zkw, tasknode);
@@ -274,19 +284,19 @@ public class TestSplitLogManager {
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
     final ServerName worker2 = ServerName.valueOf("worker2,1,1");
     final ServerName worker3 = ServerName.valueOf("worker3,1,1");
-    SplitLogTask slt = new SplitLogTask.Owned(worker1);
+    SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
     waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
     int version1 = ZKUtil.checkExists(zkw, tasknode);
     assertTrue(version1 > version);
-    slt = new SplitLogTask.Owned(worker2);
+    slt = new SplitLogTask.Owned(worker2, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
     waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
     int version2 = ZKUtil.checkExists(zkw, tasknode);
     assertTrue(version2 > version1);
-    slt = new SplitLogTask.Owned(worker3);
+    slt = new SplitLogTask.Owned(worker3, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
     waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
@@ -304,7 +314,7 @@ public class TestSplitLogManager {
     String tasknode = submitTaskAndWait(batch, "foo/1");
     int version = ZKUtil.checkExists(zkw, tasknode);
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-    SplitLogTask slt = new SplitLogTask.Owned(worker1);
+    SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
     waitForCounter(new Expr() {
@@ -331,7 +341,7 @@ public class TestSplitLogManager {
     TaskBatch batch = new TaskBatch();
     String tasknode = submitTaskAndWait(batch, "foo/1");
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-    SplitLogTask slt = new SplitLogTask.Done(worker1);
+    SplitLogTask slt = new SplitLogTask.Done(worker1, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     synchronized (batch) {
       while (batch.installed != batch.done) {
@@ -352,7 +362,7 @@ public class TestSplitLogManager {
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-    SplitLogTask slt = new SplitLogTask.Err(worker1);
+    SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
 
     synchronized (batch) {
@@ -376,7 +386,7 @@ public class TestSplitLogManager {
     assertEquals(tot_mgr_resubmit.get(), 0);
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
     assertEquals(tot_mgr_resubmit.get(), 0);
-    SplitLogTask slt = new SplitLogTask.Resigned(worker1);
+    SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode);
     assertEquals(tot_mgr_resubmit.get(), 0);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     int version = ZKUtil.checkExists(zkw, tasknode);
@@ -399,7 +409,7 @@ public class TestSplitLogManager {
     // create an orphan task in OWNED state
     String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-    SplitLogTask slt = new SplitLogTask.Owned(worker1);
+    SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
     zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
@@ -414,7 +424,7 @@ public class TestSplitLogManager {
     for (int i = 0; i < (3 * to)/100; i++) {
       Thread.sleep(100);
       final ServerName worker2 = ServerName.valueOf("worker1,1,1");
-      slt = new SplitLogTask.Owned(worker2);
+      slt = new SplitLogTask.Owned(worker2, this.mode);
       ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
     }
 
@@ -438,7 +448,7 @@ public class TestSplitLogManager {
     String tasknode = submitTaskAndWait(batch, "foo/1");
     int version = ZKUtil.checkExists(zkw, tasknode);
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-    SplitLogTask slt = new SplitLogTask.Owned(worker1);
+    SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
     slm.handleDeadWorker(worker1);
@@ -463,7 +473,7 @@ public class TestSplitLogManager {
     String tasknode = submitTaskAndWait(batch, "foo/1");
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
 
-    SplitLogTask slt = new SplitLogTask.Owned(worker1);
+    SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
 
@@ -513,4 +523,25 @@ public class TestSplitLogManager {
 
     assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty());
   }
+  
+  @Test(timeout=60000)
+  public void testGetPreviousRecoveryMode() throws Exception {
+    LOG.info("testGetPreviousRecoveryMode");
+    SplitLogCounters.resetCounters();
+    Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+    testConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+
+    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"),
+      new SplitLogTask.Unassigned(
+        ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(),
+        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+    slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER);
+    assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_SPLITTING);
+    
+    zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);
+    slm.setRecoveryMode(false);
+    assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_REPLAY);
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
index 42e9e8f..8ffc719 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
@@ -108,7 +108,8 @@ public class TestRegionServerNoMaster {
     // We reopen. We need a ZK node here, as a open is always triggered by a master.
     ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
     // first version is '0'
-    AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null);
+    AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
+      getRS().getServerName(), hri, 0, null, null);
     AdminProtos.OpenRegionResponse responseOpen = getRS().openRegion(null, orr);
     Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
     Assert.assertTrue(responseOpen.getOpeningState(0).
@@ -227,7 +228,8 @@ public class TestRegionServerNoMaster {
 
     // We're sending multiple requests in a row. The region server must handle this nicely.
     for (int i = 0; i < 10; i++) {
-      AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null);
+      AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
+        getRS().getServerName(), hri, 0, null, null);
       AdminProtos.OpenRegionResponse responseOpen = getRS().openRegion(null, orr);
       Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
 
@@ -248,7 +250,8 @@ public class TestRegionServerNoMaster {
     try {
       // fake region to be closing now, need to clear state afterwards
       getRS().regionsInTransitionInRS.put(hri.getEncodedNameAsBytes(), Boolean.FALSE);
-      AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null);
+      AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
+        getRS().getServerName(), hri, 0, null, null);
       getRS().openRegion(null, orr);
       Assert.fail("The closing region should not be opened");
     } catch (ServiceException se) {
@@ -403,7 +406,8 @@ public class TestRegionServerNoMaster {
     //actual close
     closeNoZK();
     try {
-      AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(earlierServerName, hri, 0, null);
+      AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
+        earlierServerName, hri, 0, null, null);
       getRS().openRegion(null, orr);
       Assert.fail("The openRegion should have been rejected");
     } catch (ServiceException se) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index eaf5547..dcb1e88 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -31,6 +32,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
@@ -38,6 +40,8 @@ import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.executor.ExecutorType;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -64,6 +68,7 @@ public class TestSplitLogWorker {
   private ZooKeeperWatcher zkw;
   private SplitLogWorker slw;
   private ExecutorService executorService;
+  private RecoveryMode mode;
 
   private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
       throws Exception {
@@ -98,6 +103,7 @@ public class TestSplitLogWorker {
   @Before
   public void setup() throws Exception {
     TEST_UTIL.startMiniZKCluster();
+    Configuration conf = TEST_UTIL.getConfiguration();
     zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
         "split-log-worker-tests", null);
     ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
@@ -112,6 +118,8 @@ public class TestSplitLogWorker {
     SplitLogCounters.resetCounters();
     executorService = new ExecutorService("TestSplitLogWorker");
     executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
+    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
+        RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
   }
 
   @After
@@ -126,7 +134,7 @@ public class TestSplitLogWorker {
     new SplitLogWorker.TaskExecutor() {
 
       @Override
-      public Status exec(String name, CancelableProgressable p) {
+      public Status exec(String name, RecoveryMode mode, CancelableProgressable p) {
         while (true) {
           try {
             Thread.sleep(1000);
@@ -149,7 +157,8 @@ public class TestSplitLogWorker {
     final ServerName RS = ServerName.valueOf("rs,1,1");
     RegionServerServices mockedRS = getRegionServer(RS);
     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
-      new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE,
+      new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), 
+        Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
     SplitLogWorker slw =
@@ -184,8 +193,8 @@ public class TestSplitLogWorker {
     final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
     final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
-      new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
-        CreateMode.PERSISTENT);
+      new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), 
+        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     RegionServerServices mockedRS1 = getRegionServer(SVR1);
     RegionServerServices mockedRS2 = getRegionServer(SVR2);
     SplitLogWorker slw1 =
@@ -227,15 +236,15 @@ public class TestSplitLogWorker {
 
       // this time create a task node after starting the splitLogWorker
       zkw.getRecoverableZooKeeper().create(PATH,
-        new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
-        CreateMode.PERSISTENT);
+        new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), 
+        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
       assertEquals(1, slw.taskReadySeq);
       byte [] bytes = ZKUtil.getData(zkw, PATH);
       SplitLogTask slt = SplitLogTask.parseFrom(bytes);
       assertTrue(slt.isOwned(SRV));
-      slt = new SplitLogTask.Owned(MANAGER);
+      slt = new SplitLogTask.Owned(MANAGER, this.mode);
       ZKUtil.setData(zkw, PATH, slt.toByteArray());
       waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
     } finally {
@@ -258,7 +267,8 @@ public class TestSplitLogWorker {
       Thread.sleep(100);
       waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
 
-      SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER);
+      SplitLogTask unassignedManager = 
+        new SplitLogTask.Unassigned(MANAGER, this.mode);
       zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
@@ -272,7 +282,7 @@ public class TestSplitLogWorker {
 
       // preempt the first task, have it owned by another worker
       final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
-      SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
+      SplitLogTask slt = new SplitLogTask.Owned(anotherWorker, this.mode);
       ZKUtil.setData(zkw, PATH1, slt.toByteArray());
       waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
 
@@ -298,7 +308,7 @@ public class TestSplitLogWorker {
     Thread.sleep(100);
 
     String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
-    SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER);
+    SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER, this.mode);
     zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
       CreateMode.PERSISTENT);
 
@@ -351,8 +361,8 @@ public class TestSplitLogWorker {
 
     for (int i = 0; i < maxTasks; i++) {
       zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
-        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
-        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
+          Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
     SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
@@ -394,9 +404,8 @@ public class TestSplitLogWorker {
 
     for (int i = 0; i < maxTasks; i++) {
       zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
-        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
-        Ids.OPEN_ACL_UNSAFE,
-        CreateMode.PERSISTENT);
+        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
+          Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
     SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 6e0995c..af15d78 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -390,6 +390,7 @@ public class TestSplitTransactionOnCluster {
       AssignmentManager.TEST_SKIP_SPLIT_HANDLING = false;
       admin.setBalancerRunning(true, false);
       cluster.getMaster().setCatalogJanitorEnabled(true);
+      cluster.startRegionServer();
       t.close();
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
index 5b68f9f..5d304a3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -111,8 +112,10 @@ public class TestHLogMethods {
   @Test
   public void testEntrySink() throws Exception {
     Configuration conf = new Configuration();
+    RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
+      RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
     HLogSplitter splitter = new HLogSplitter(
-      conf, mock(Path.class), mock(FileSystem.class), null, null);
+      conf, mock(Path.class), mock(FileSystem.class), null, null, mode);
 
     EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
     for (int i = 0; i < 1000; i++) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
index 57a2549..8a6f544 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
@@ -127,6 +128,7 @@ public class TestHLogSplit {
   private static String ROBBER;
   private static String ZOMBIE;
   private static String [] GROUP = new String [] {"supergroup"};
+  private RecoveryMode mode;
 
   static enum Corruptions {
     INSERT_GARBAGE_ON_FIRST_LINE,
@@ -177,6 +179,8 @@ public class TestHLogSplit {
     REGIONS.clear();
     Collections.addAll(REGIONS, "bbb", "ccc");
     InstrumentedSequenceFileLogWriter.activateFailure = false;
+    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
+        RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
   }
 
   @After
@@ -805,7 +809,7 @@ public class TestHLogSplit {
       logfiles != null && logfiles.length > 0);
     // Set up a splitter that will throw an IOE on the output side
     HLogSplitter logSplitter = new HLogSplitter(
-        conf, HBASEDIR, fs, null, null) {
+        conf, HBASEDIR, fs, null, null, this.mode) {
       protected HLog.Writer createWriter(FileSystem fs,
           Path logfile, Configuration conf) throws IOException {
         HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
@@ -938,7 +942,7 @@ public class TestHLogSplit {
     try {
       conf.setInt("hbase.splitlog.report.period", 1000);
       boolean ret = HLogSplitter.splitLogFile(
-        HBASEDIR, logfile, spiedFs, conf, localReporter, null, null);
+        HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode);
       assertFalse("Log splitting should failed", ret);
       assertTrue(count.get() > 0);
     } catch (IOException e) {
@@ -997,7 +1001,7 @@ public class TestHLogSplit {
 
     // Create a splitter that reads and writes the data without touching disk
     HLogSplitter logSplitter = new HLogSplitter(
-        localConf, HBASEDIR, fs, null, null) {
+        localConf, HBASEDIR, fs, null, null, this.mode) {
 
       /* Produce a mock writer that doesn't write anywhere */
       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
@@ -1282,7 +1286,7 @@ public class TestHLogSplit {
       logfiles != null && logfiles.length > 0);
 
     HLogSplitter logSplitter = new HLogSplitter(
-        conf, HBASEDIR, fs, null, null) {
+        conf, HBASEDIR, fs, null, null, this.mode) {
       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
       throws IOException {
         HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index c5f5a2f..5b51eca 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
@@ -98,6 +99,8 @@ public class TestWALReplay {
   private Path logDir;
   private FileSystem fs;
   private Configuration conf;
+  private RecoveryMode mode;
+  
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -128,6 +131,8 @@ public class TestWALReplay {
     if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
       TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
     }
+    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
+        RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
   }
 
   @After
@@ -875,7 +880,7 @@ public class TestWALReplay {
     wal.close();
     FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
     HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0],
-      this.fs, this.conf, null, null, null);
+      this.fs, this.conf, null, null, null, mode);
     FileStatus[] listStatus1 = this.fs.listStatus(
         new Path(FSUtils.getTableDir(hbaseRootDir, tableName),
             new Path(hri.getEncodedName(), "recovered.edits")));


[2/2] git commit: HBASE-11094: Distributed log replay is incompatible for rolling restarts

Posted by je...@apache.org.
HBASE-11094: Distributed log replay is incompatible for rolling restarts


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/34ae4a94
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/34ae4a94
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/34ae4a94

Branch: refs/heads/0.98
Commit: 34ae4a94d0795850642915f117b7d1257b418ca5
Parents: 9ce1751
Author: Jeffrey Zhong <je...@apache.org>
Authored: Sun Jun 15 11:44:17 2014 -0700
Committer: Jeffrey Zhong <je...@apache.org>
Committed: Sun Jun 15 11:44:17 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |   2 +-
 .../hadoop/hbase/protobuf/RequestConverter.java |  21 +-
 .../hbase/protobuf/generated/AdminProtos.java   | 265 ++++++++++++++-----
 .../protobuf/generated/ZooKeeperProtos.java     | 239 +++++++++++++++--
 hbase-protocol/src/main/protobuf/Admin.proto    |   2 +
 .../src/main/protobuf/ZooKeeper.proto           |   6 +
 .../org/apache/hadoop/hbase/SplitLogTask.java   |  40 ++-
 .../org/apache/hadoop/hbase/master/HMaster.java |  21 +-
 .../hadoop/hbase/master/MasterFileSystem.java   |  30 ++-
 .../hadoop/hbase/master/ServerManager.java      |  10 +-
 .../hadoop/hbase/master/SplitLogManager.java    | 156 +++++++++--
 .../handler/MetaServerShutdownHandler.java      |   8 +-
 .../master/handler/ServerShutdownHandler.java   |  13 +-
 .../hbase/regionserver/HRegionServer.java       |  26 +-
 .../hbase/regionserver/SplitLogWorker.java      |  36 ++-
 .../handler/HLogSplitterHandler.java            |  13 +-
 .../hbase/regionserver/wal/HLogSplitter.java    |  22 +-
 .../apache/hadoop/hbase/TestSerialization.java  |   4 +-
 .../hadoop/hbase/client/TestMultiParallel.java  |   4 +-
 .../hbase/master/TestAssignmentManager.java     |   5 +
 .../master/TestDistributedLogSplitting.java     |   3 +-
 .../hbase/master/TestMasterFileSystem.java      |   4 +-
 .../hbase/master/TestSplitLogManager.java       |  57 +++-
 .../regionserver/TestRegionServerNoMaster.java  |  12 +-
 .../hbase/regionserver/TestSplitLogWorker.java  |  39 +--
 .../TestSplitTransactionOnCluster.java          |   1 +
 .../hbase/regionserver/wal/TestHLogMethods.java |   5 +-
 .../hbase/regionserver/wal/TestHLogSplit.java   |  12 +-
 .../hbase/regionserver/wal/TestWALReplay.java   |   7 +-
 29 files changed, 801 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 0f393d9..2a0b912 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -1670,7 +1670,7 @@ public final class ProtobufUtil {
   public static void openRegion(final AdminService.BlockingInterface admin,
       ServerName server, final HRegionInfo region) throws IOException {
     OpenRegionRequest request =
-      RequestConverter.buildOpenRegionRequest(server, region, -1, null);
+      RequestConverter.buildOpenRegionRequest(server, region, -1, null, null);
     try {
       admin.openRegion(null, request);
     } catch (ServiceException se) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index 685fd47..1a6b42d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.List;
 
 import com.google.protobuf.HBaseZeroCopyByteString;
+
+import org.apache.commons.configuration.Configuration;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -701,17 +703,18 @@ public final class RequestConverter {
   * Create a protocol buffer OpenRegionRequest to open a list of regions
   *
   * @param regionOpenInfos info of a list of regions to open
+  * @param openForReplay
   * @return a protocol buffer OpenRegionRequest
   */
  public static OpenRegionRequest
      buildOpenRegionRequest(final List<Triple<HRegionInfo, Integer,
-         List<ServerName>>> regionOpenInfos) {
+         List<ServerName>>> regionOpenInfos, Boolean openForReplay) {
    OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
    for (Triple<HRegionInfo, Integer, List<ServerName>> regionOpenInfo: regionOpenInfos) {
      Integer second = regionOpenInfo.getSecond();
      int versionOfOfflineNode = second == null ? -1 : second.intValue();
-     builder.addOpenInfo(buildRegionOpenInfo(
-       regionOpenInfo.getFirst(), versionOfOfflineNode, regionOpenInfo.getThird()));
+     builder.addOpenInfo(buildRegionOpenInfo(regionOpenInfo.getFirst(), versionOfOfflineNode, 
+       regionOpenInfo.getThird(), openForReplay));
    }
    return builder.build();
  }
@@ -723,12 +726,15 @@ public final class RequestConverter {
   * @param region the region to open
   * @param versionOfOfflineNode that needs to be present in the offline node
   * @param favoredNodes
+  * @param openForReplay
   * @return a protocol buffer OpenRegionRequest
   */
  public static OpenRegionRequest buildOpenRegionRequest(ServerName server,
-     final HRegionInfo region, final int versionOfOfflineNode, List<ServerName> favoredNodes) {
+     final HRegionInfo region, final int versionOfOfflineNode, List<ServerName> favoredNodes,
+     Boolean openForReplay) {
    OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
-   builder.addOpenInfo(buildRegionOpenInfo(region, versionOfOfflineNode, favoredNodes));
+   builder.addOpenInfo(buildRegionOpenInfo(region, versionOfOfflineNode, favoredNodes, 
+     openForReplay));
    if (server != null) {
      builder.setServerStartCode(server.getStartcode());
    }
@@ -1477,7 +1483,7 @@ public final class RequestConverter {
    */
   private static RegionOpenInfo buildRegionOpenInfo(
       final HRegionInfo region, final int versionOfOfflineNode,
-      final List<ServerName> favoredNodes) {
+      final List<ServerName> favoredNodes, Boolean openForReplay) {
     RegionOpenInfo.Builder builder = RegionOpenInfo.newBuilder();
     builder.setRegion(HRegionInfo.convert(region));
     if (versionOfOfflineNode >= 0) {
@@ -1488,6 +1494,9 @@ public final class RequestConverter {
         builder.addFavoredNodes(ProtobufUtil.toServerName(server));
       }
     }
+    if(openForReplay != null) {
+      builder.setOpenForDistributedLogReplay(openForReplay);
+    }
     return builder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
index 0ad10ad..636e51f 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
@@ -4032,6 +4032,24 @@ public final class AdminProtos {
        */
       org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getFavoredNodesOrBuilder(
           int index);
+
+      // optional bool openForDistributedLogReplay = 4;
+      /**
+       * <code>optional bool openForDistributedLogReplay = 4;</code>
+       *
+       * <pre>
+       * open region for distributedLogReplay
+       * </pre>
+       */
+      boolean hasOpenForDistributedLogReplay();
+      /**
+       * <code>optional bool openForDistributedLogReplay = 4;</code>
+       *
+       * <pre>
+       * open region for distributedLogReplay
+       * </pre>
+       */
+      boolean getOpenForDistributedLogReplay();
     }
     /**
      * Protobuf type {@code OpenRegionRequest.RegionOpenInfo}
@@ -4110,6 +4128,11 @@ public final class AdminProtos {
                 favoredNodes_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.PARSER, extensionRegistry));
                 break;
               }
+              case 32: {
+                bitField0_ |= 0x00000004;
+                openForDistributedLogReplay_ = input.readBool();
+                break;
+              }
             }
           }
         } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4227,10 +4250,35 @@ public final class AdminProtos {
         return favoredNodes_.get(index);
       }
 
+      // optional bool openForDistributedLogReplay = 4;
+      public static final int OPENFORDISTRIBUTEDLOGREPLAY_FIELD_NUMBER = 4;
+      private boolean openForDistributedLogReplay_;
+      /**
+       * <code>optional bool openForDistributedLogReplay = 4;</code>
+       *
+       * <pre>
+       * open region for distributedLogReplay
+       * </pre>
+       */
+      public boolean hasOpenForDistributedLogReplay() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional bool openForDistributedLogReplay = 4;</code>
+       *
+       * <pre>
+       * open region for distributedLogReplay
+       * </pre>
+       */
+      public boolean getOpenForDistributedLogReplay() {
+        return openForDistributedLogReplay_;
+      }
+
       private void initFields() {
         region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo.getDefaultInstance();
         versionOfOfflineNode_ = 0;
         favoredNodes_ = java.util.Collections.emptyList();
+        openForDistributedLogReplay_ = false;
       }
       private byte memoizedIsInitialized = -1;
       public final boolean isInitialized() {
@@ -4267,6 +4315,9 @@ public final class AdminProtos {
         for (int i = 0; i < favoredNodes_.size(); i++) {
           output.writeMessage(3, favoredNodes_.get(i));
         }
+        if (((bitField0_ & 0x00000004) == 0x00000004)) {
+          output.writeBool(4, openForDistributedLogReplay_);
+        }
         getUnknownFields().writeTo(output);
       }
 
@@ -4288,6 +4339,10 @@ public final class AdminProtos {
           size += com.google.protobuf.CodedOutputStream
             .computeMessageSize(3, favoredNodes_.get(i));
         }
+        if (((bitField0_ & 0x00000004) == 0x00000004)) {
+          size += com.google.protobuf.CodedOutputStream
+            .computeBoolSize(4, openForDistributedLogReplay_);
+        }
         size += getUnknownFields().getSerializedSize();
         memoizedSerializedSize = size;
         return size;
@@ -4323,6 +4378,11 @@ public final class AdminProtos {
         }
         result = result && getFavoredNodesList()
             .equals(other.getFavoredNodesList());
+        result = result && (hasOpenForDistributedLogReplay() == other.hasOpenForDistributedLogReplay());
+        if (hasOpenForDistributedLogReplay()) {
+          result = result && (getOpenForDistributedLogReplay()
+              == other.getOpenForDistributedLogReplay());
+        }
         result = result &&
             getUnknownFields().equals(other.getUnknownFields());
         return result;
@@ -4348,6 +4408,10 @@ public final class AdminProtos {
           hash = (37 * hash) + FAVORED_NODES_FIELD_NUMBER;
           hash = (53 * hash) + getFavoredNodesList().hashCode();
         }
+        if (hasOpenForDistributedLogReplay()) {
+          hash = (37 * hash) + OPENFORDISTRIBUTEDLOGREPLAY_FIELD_NUMBER;
+          hash = (53 * hash) + hashBoolean(getOpenForDistributedLogReplay());
+        }
         hash = (29 * hash) + getUnknownFields().hashCode();
         memoizedHashCode = hash;
         return hash;
@@ -4473,6 +4537,8 @@ public final class AdminProtos {
           } else {
             favoredNodesBuilder_.clear();
           }
+          openForDistributedLogReplay_ = false;
+          bitField0_ = (bitField0_ & ~0x00000008);
           return this;
         }
 
@@ -4522,6 +4588,10 @@ public final class AdminProtos {
           } else {
             result.favoredNodes_ = favoredNodesBuilder_.build();
           }
+          if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+            to_bitField0_ |= 0x00000004;
+          }
+          result.openForDistributedLogReplay_ = openForDistributedLogReplay_;
           result.bitField0_ = to_bitField0_;
           onBuilt();
           return result;
@@ -4570,6 +4640,9 @@ public final class AdminProtos {
               }
             }
           }
+          if (other.hasOpenForDistributedLogReplay()) {
+            setOpenForDistributedLogReplay(other.getOpenForDistributedLogReplay());
+          }
           this.mergeUnknownFields(other.getUnknownFields());
           return this;
         }
@@ -5001,6 +5074,55 @@ public final class AdminProtos {
           return favoredNodesBuilder_;
         }
 
+        // optional bool openForDistributedLogReplay = 4;
+        private boolean openForDistributedLogReplay_ ;
+        /**
+         * <code>optional bool openForDistributedLogReplay = 4;</code>
+         *
+         * <pre>
+         * open region for distributedLogReplay
+         * </pre>
+         */
+        public boolean hasOpenForDistributedLogReplay() {
+          return ((bitField0_ & 0x00000008) == 0x00000008);
+        }
+        /**
+         * <code>optional bool openForDistributedLogReplay = 4;</code>
+         *
+         * <pre>
+         * open region for distributedLogReplay
+         * </pre>
+         */
+        public boolean getOpenForDistributedLogReplay() {
+          return openForDistributedLogReplay_;
+        }
+        /**
+         * <code>optional bool openForDistributedLogReplay = 4;</code>
+         *
+         * <pre>
+         * open region for distributedLogReplay
+         * </pre>
+         */
+        public Builder setOpenForDistributedLogReplay(boolean value) {
+          bitField0_ |= 0x00000008;
+          openForDistributedLogReplay_ = value;
+          onChanged();
+          return this;
+        }
+        /**
+         * <code>optional bool openForDistributedLogReplay = 4;</code>
+         *
+         * <pre>
+         * open region for distributedLogReplay
+         * </pre>
+         */
+        public Builder clearOpenForDistributedLogReplay() {
+          bitField0_ = (bitField0_ & ~0x00000008);
+          openForDistributedLogReplay_ = false;
+          onChanged();
+          return this;
+        }
+
         // @@protoc_insertion_point(builder_scope:OpenRegionRequest.RegionOpenInfo)
       }
 
@@ -21166,77 +21288,78 @@ public final class AdminProtos {
       "FileResponse\022\022\n\nstore_file\030\001 \003(\t\"\030\n\026GetO" +
       "nlineRegionRequest\";\n\027GetOnlineRegionRes" +
       "ponse\022 \n\013region_info\030\001 \003(\0132\013.RegionInfo\"" +
-      "\326\001\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" +
+      "\374\001\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" +
       "2!.OpenRegionRequest.RegionOpenInfo\022\027\n\017s" +
-      "erverStartCode\030\002 \001(\004\032r\n\016RegionOpenInfo\022\033" +
-      "\n\006region\030\001 \002(\0132\013.RegionInfo\022\037\n\027version_o" +
-      "f_offline_node\030\002 \001(\r\022\"\n\rfavored_nodes\030\003 " +
-      "\003(\0132\013.ServerName\"\235\001\n\022OpenRegionResponse\022",
-      "=\n\ropening_state\030\001 \003(\0162&.OpenRegionRespo" +
-      "nse.RegionOpeningState\"H\n\022RegionOpeningS" +
-      "tate\022\n\n\006OPENED\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016" +
-      "FAILED_OPENING\020\002\"\271\001\n\022CloseRegionRequest\022" +
-      " \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\037\n\027ver" +
-      "sion_of_closing_node\030\002 \001(\r\022\036\n\020transition" +
-      "_in_ZK\030\003 \001(\010:\004true\022\'\n\022destination_server" +
-      "\030\004 \001(\0132\013.ServerName\022\027\n\017serverStartCode\030\005" +
-      " \001(\004\"%\n\023CloseRegionResponse\022\016\n\006closed\030\001 " +
-      "\002(\010\"P\n\022FlushRegionRequest\022 \n\006region\030\001 \002(",
-      "\0132\020.RegionSpecifier\022\030\n\020if_older_than_ts\030" +
-      "\002 \001(\004\"?\n\023FlushRegionResponse\022\027\n\017last_flu" +
-      "sh_time\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010\"K\n\022SplitR" +
-      "egionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpe" +
-      "cifier\022\023\n\013split_point\030\002 \001(\014\"\025\n\023SplitRegi" +
-      "onResponse\"W\n\024CompactRegionRequest\022 \n\006re" +
-      "gion\030\001 \002(\0132\020.RegionSpecifier\022\r\n\005major\030\002 " +
-      "\001(\010\022\016\n\006family\030\003 \001(\014\"\027\n\025CompactRegionResp" +
-      "onse\"\262\001\n\031UpdateFavoredNodesRequest\022@\n\013up" +
-      "date_info\030\001 \003(\0132+.UpdateFavoredNodesRequ",
-      "est.RegionUpdateInfo\032S\n\020RegionUpdateInfo" +
-      "\022\033\n\006region\030\001 \002(\0132\013.RegionInfo\022\"\n\rfavored" +
-      "_nodes\030\002 \003(\0132\013.ServerName\".\n\032UpdateFavor" +
-      "edNodesResponse\022\020\n\010response\030\001 \001(\r\"v\n\023Mer" +
-      "geRegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Reg" +
-      "ionSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.RegionS" +
-      "pecifier\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024Mer" +
-      "geRegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002" +
-      "(\0132\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025" +
-      "associated_cell_count\030\003 \001(\005\"4\n\030Replicate",
-      "WALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntr" +
-      "y\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWA" +
-      "LWriterRequest\"0\n\025RollWALWriterResponse\022" +
-      "\027\n\017region_to_flush\030\001 \003(\014\"#\n\021StopServerRe" +
-      "quest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespo" +
-      "nse\"\026\n\024GetServerInfoRequest\"B\n\nServerInf" +
-      "o\022 \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nw" +
-      "ebui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse" +
-      "\022 \n\013server_info\030\001 \002(\0132\013.ServerInfo2\306\007\n\014A" +
-      "dminService\022>\n\rGetRegionInfo\022\025.GetRegion",
-      "InfoRequest\032\026.GetRegionInfoResponse\022;\n\014G" +
-      "etStoreFile\022\024.GetStoreFileRequest\032\025.GetS" +
-      "toreFileResponse\022D\n\017GetOnlineRegion\022\027.Ge" +
-      "tOnlineRegionRequest\032\030.GetOnlineRegionRe" +
-      "sponse\0225\n\nOpenRegion\022\022.OpenRegionRequest" +
-      "\032\023.OpenRegionResponse\0228\n\013CloseRegion\022\023.C" +
-      "loseRegionRequest\032\024.CloseRegionResponse\022" +
-      "8\n\013FlushRegion\022\023.FlushRegionRequest\032\024.Fl" +
-      "ushRegionResponse\0228\n\013SplitRegion\022\023.Split" +
-      "RegionRequest\032\024.SplitRegionResponse\022>\n\rC",
-      "ompactRegion\022\025.CompactRegionRequest\032\026.Co" +
-      "mpactRegionResponse\022;\n\014MergeRegions\022\024.Me" +
-      "rgeRegionsRequest\032\025.MergeRegionsResponse" +
-      "\022J\n\021ReplicateWALEntry\022\031.ReplicateWALEntr" +
-      "yRequest\032\032.ReplicateWALEntryResponse\022?\n\006" +
-      "Replay\022\031.ReplicateWALEntryRequest\032\032.Repl" +
-      "icateWALEntryResponse\022>\n\rRollWALWriter\022\025" +
-      ".RollWALWriterRequest\032\026.RollWALWriterRes" +
-      "ponse\022>\n\rGetServerInfo\022\025.GetServerInfoRe" +
-      "quest\032\026.GetServerInfoResponse\0225\n\nStopSer",
-      "ver\022\022.StopServerRequest\032\023.StopServerResp" +
-      "onse\022M\n\022UpdateFavoredNodes\022\032.UpdateFavor" +
-      "edNodesRequest\032\033.UpdateFavoredNodesRespo" +
-      "nseBA\n*org.apache.hadoop.hbase.protobuf." +
-      "generatedB\013AdminProtosH\001\210\001\001\240\001\001"
+      "erverStartCode\030\002 \001(\004\032\227\001\n\016RegionOpenInfo\022" +
+      "\033\n\006region\030\001 \002(\0132\013.RegionInfo\022\037\n\027version_" +
+      "of_offline_node\030\002 \001(\r\022\"\n\rfavored_nodes\030\003" +
+      " \003(\0132\013.ServerName\022#\n\033openForDistributedL",
+      "ogReplay\030\004 \001(\010\"\235\001\n\022OpenRegionResponse\022=\n" +
+      "\ropening_state\030\001 \003(\0162&.OpenRegionRespons" +
+      "e.RegionOpeningState\"H\n\022RegionOpeningSta" +
+      "te\022\n\n\006OPENED\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016FA" +
+      "ILED_OPENING\020\002\"\271\001\n\022CloseRegionRequest\022 \n" +
+      "\006region\030\001 \002(\0132\020.RegionSpecifier\022\037\n\027versi" +
+      "on_of_closing_node\030\002 \001(\r\022\036\n\020transition_i" +
+      "n_ZK\030\003 \001(\010:\004true\022\'\n\022destination_server\030\004" +
+      " \001(\0132\013.ServerName\022\027\n\017serverStartCode\030\005 \001" +
+      "(\004\"%\n\023CloseRegionResponse\022\016\n\006closed\030\001 \002(",
+      "\010\"P\n\022FlushRegionRequest\022 \n\006region\030\001 \002(\0132" +
+      "\020.RegionSpecifier\022\030\n\020if_older_than_ts\030\002 " +
+      "\001(\004\"?\n\023FlushRegionResponse\022\027\n\017last_flush" +
+      "_time\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010\"K\n\022SplitReg" +
+      "ionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpeci" +
+      "fier\022\023\n\013split_point\030\002 \001(\014\"\025\n\023SplitRegion" +
+      "Response\"W\n\024CompactRegionRequest\022 \n\006regi" +
+      "on\030\001 \002(\0132\020.RegionSpecifier\022\r\n\005major\030\002 \001(" +
+      "\010\022\016\n\006family\030\003 \001(\014\"\027\n\025CompactRegionRespon" +
+      "se\"\262\001\n\031UpdateFavoredNodesRequest\022@\n\013upda",
+      "te_info\030\001 \003(\0132+.UpdateFavoredNodesReques" +
+      "t.RegionUpdateInfo\032S\n\020RegionUpdateInfo\022\033" +
+      "\n\006region\030\001 \002(\0132\013.RegionInfo\022\"\n\rfavored_n" +
+      "odes\030\002 \003(\0132\013.ServerName\".\n\032UpdateFavored" +
+      "NodesResponse\022\020\n\010response\030\001 \001(\r\"v\n\023Merge" +
+      "RegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Regio" +
+      "nSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.RegionSpe" +
+      "cifier\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024Merge" +
+      "RegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002(\013" +
+      "2\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025as",
+      "sociated_cell_count\030\003 \001(\005\"4\n\030ReplicateWA" +
+      "LEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"" +
+      "\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWALW" +
+      "riterRequest\"0\n\025RollWALWriterResponse\022\027\n" +
+      "\017region_to_flush\030\001 \003(\014\"#\n\021StopServerRequ" +
+      "est\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespons" +
+      "e\"\026\n\024GetServerInfoRequest\"B\n\nServerInfo\022" +
+      " \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nweb" +
+      "ui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse\022 " +
+      "\n\013server_info\030\001 \002(\0132\013.ServerInfo2\306\007\n\014Adm",
+      "inService\022>\n\rGetRegionInfo\022\025.GetRegionIn" +
+      "foRequest\032\026.GetRegionInfoResponse\022;\n\014Get" +
+      "StoreFile\022\024.GetStoreFileRequest\032\025.GetSto" +
+      "reFileResponse\022D\n\017GetOnlineRegion\022\027.GetO" +
+      "nlineRegionRequest\032\030.GetOnlineRegionResp" +
+      "onse\0225\n\nOpenRegion\022\022.OpenRegionRequest\032\023" +
+      ".OpenRegionResponse\0228\n\013CloseRegion\022\023.Clo" +
+      "seRegionRequest\032\024.CloseRegionResponse\0228\n" +
+      "\013FlushRegion\022\023.FlushRegionRequest\032\024.Flus" +
+      "hRegionResponse\0228\n\013SplitRegion\022\023.SplitRe",
+      "gionRequest\032\024.SplitRegionResponse\022>\n\rCom" +
+      "pactRegion\022\025.CompactRegionRequest\032\026.Comp" +
+      "actRegionResponse\022;\n\014MergeRegions\022\024.Merg" +
+      "eRegionsRequest\032\025.MergeRegionsResponse\022J" +
+      "\n\021ReplicateWALEntry\022\031.ReplicateWALEntryR" +
+      "equest\032\032.ReplicateWALEntryResponse\022?\n\006Re" +
+      "play\022\031.ReplicateWALEntryRequest\032\032.Replic" +
+      "ateWALEntryResponse\022>\n\rRollWALWriter\022\025.R" +
+      "ollWALWriterRequest\032\026.RollWALWriterRespo" +
+      "nse\022>\n\rGetServerInfo\022\025.GetServerInfoRequ",
+      "est\032\026.GetServerInfoResponse\0225\n\nStopServe" +
+      "r\022\022.StopServerRequest\032\023.StopServerRespon" +
+      "se\022M\n\022UpdateFavoredNodes\022\032.UpdateFavored" +
+      "NodesRequest\032\033.UpdateFavoredNodesRespons" +
+      "eBA\n*org.apache.hadoop.hbase.protobuf.ge" +
+      "neratedB\013AdminProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -21290,7 +21413,7 @@ public final class AdminProtos {
           internal_static_OpenRegionRequest_RegionOpenInfo_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_OpenRegionRequest_RegionOpenInfo_descriptor,
-              new java.lang.String[] { "Region", "VersionOfOfflineNode", "FavoredNodes", });
+              new java.lang.String[] { "Region", "VersionOfOfflineNode", "FavoredNodes", "OpenForDistributedLogReplay", });
           internal_static_OpenRegionResponse_descriptor =
             getDescriptor().getMessageTypes().get(7);
           internal_static_OpenRegionResponse_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
index 0af2a97..9d037f5 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
@@ -3230,6 +3230,16 @@ public final class ZooKeeperProtos {
      * <code>required .ServerName server_name = 2;</code>
      */
     org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerNameOrBuilder();
+
+    // optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];
+    /**
+     * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+     */
+    boolean hasMode();
+    /**
+     * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode getMode();
   }
   /**
    * Protobuf type {@code SplitLogTask}
@@ -3312,6 +3322,17 @@ public final class ZooKeeperProtos {
               bitField0_ |= 0x00000002;
               break;
             }
+            case 24: {
+              int rawValue = input.readEnum();
+              org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode value = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.valueOf(rawValue);
+              if (value == null) {
+                unknownFields.mergeVarintField(3, rawValue);
+              } else {
+                bitField0_ |= 0x00000004;
+                mode_ = value;
+              }
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3460,6 +3481,97 @@ public final class ZooKeeperProtos {
       // @@protoc_insertion_point(enum_scope:SplitLogTask.State)
     }
 
+    /**
+     * Protobuf enum {@code SplitLogTask.RecoveryMode}
+     */
+    public enum RecoveryMode
+        implements com.google.protobuf.ProtocolMessageEnum {
+      /**
+       * <code>UNKNOWN = 0;</code>
+       */
+      UNKNOWN(0, 0),
+      /**
+       * <code>LOG_SPLITTING = 1;</code>
+       */
+      LOG_SPLITTING(1, 1),
+      /**
+       * <code>LOG_REPLAY = 2;</code>
+       */
+      LOG_REPLAY(2, 2),
+      ;
+
+      /**
+       * <code>UNKNOWN = 0;</code>
+       */
+      public static final int UNKNOWN_VALUE = 0;
+      /**
+       * <code>LOG_SPLITTING = 1;</code>
+       */
+      public static final int LOG_SPLITTING_VALUE = 1;
+      /**
+       * <code>LOG_REPLAY = 2;</code>
+       */
+      public static final int LOG_REPLAY_VALUE = 2;
+
+
+      public final int getNumber() { return value; }
+
+      public static RecoveryMode valueOf(int value) {
+        switch (value) {
+          case 0: return UNKNOWN;
+          case 1: return LOG_SPLITTING;
+          case 2: return LOG_REPLAY;
+          default: return null;
+        }
+      }
+
+      public static com.google.protobuf.Internal.EnumLiteMap<RecoveryMode>
+          internalGetValueMap() {
+        return internalValueMap;
+      }
+      private static com.google.protobuf.Internal.EnumLiteMap<RecoveryMode>
+          internalValueMap =
+            new com.google.protobuf.Internal.EnumLiteMap<RecoveryMode>() {
+              public RecoveryMode findValueByNumber(int number) {
+                return RecoveryMode.valueOf(number);
+              }
+            };
+
+      public final com.google.protobuf.Descriptors.EnumValueDescriptor
+          getValueDescriptor() {
+        return getDescriptor().getValues().get(index);
+      }
+      public final com.google.protobuf.Descriptors.EnumDescriptor
+          getDescriptorForType() {
+        return getDescriptor();
+      }
+      public static final com.google.protobuf.Descriptors.EnumDescriptor
+          getDescriptor() {
+        return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.getDescriptor().getEnumTypes().get(1);
+      }
+
+      private static final RecoveryMode[] VALUES = values();
+
+      public static RecoveryMode valueOf(
+          com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+        if (desc.getType() != getDescriptor()) {
+          throw new java.lang.IllegalArgumentException(
+            "EnumValueDescriptor is not for this type.");
+        }
+        return VALUES[desc.getIndex()];
+      }
+
+      private final int index;
+      private final int value;
+
+      private RecoveryMode(int index, int value) {
+        this.index = index;
+        this.value = value;
+      }
+
+      // @@protoc_insertion_point(enum_scope:SplitLogTask.RecoveryMode)
+    }
+
     private int bitField0_;
     // required .SplitLogTask.State state = 1;
     public static final int STATE_FIELD_NUMBER = 1;
@@ -3499,9 +3611,26 @@ public final class ZooKeeperProtos {
       return serverName_;
     }
 
+    // optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];
+    public static final int MODE_FIELD_NUMBER = 3;
+    private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode mode_;
+    /**
+     * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+     */
+    public boolean hasMode() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode getMode() {
+      return mode_;
+    }
+
     private void initFields() {
       state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.UNASSIGNED;
       serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+      mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -3533,6 +3662,9 @@ public final class ZooKeeperProtos {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeMessage(2, serverName_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeEnum(3, mode_.getNumber());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -3550,6 +3682,10 @@ public final class ZooKeeperProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(2, serverName_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeEnumSize(3, mode_.getNumber());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -3583,6 +3719,11 @@ public final class ZooKeeperProtos {
         result = result && getServerName()
             .equals(other.getServerName());
       }
+      result = result && (hasMode() == other.hasMode());
+      if (hasMode()) {
+        result = result &&
+            (getMode() == other.getMode());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -3604,6 +3745,10 @@ public final class ZooKeeperProtos {
         hash = (37 * hash) + SERVER_NAME_FIELD_NUMBER;
         hash = (53 * hash) + getServerName().hashCode();
       }
+      if (hasMode()) {
+        hash = (37 * hash) + MODE_FIELD_NUMBER;
+        hash = (53 * hash) + hashEnum(getMode());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -3728,6 +3873,8 @@ public final class ZooKeeperProtos {
           serverNameBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000002);
+        mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
 
@@ -3768,6 +3915,10 @@ public final class ZooKeeperProtos {
         } else {
           result.serverName_ = serverNameBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.mode_ = mode_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -3790,6 +3941,9 @@ public final class ZooKeeperProtos {
         if (other.hasServerName()) {
           mergeServerName(other.getServerName());
         }
+        if (other.hasMode()) {
+          setMode(other.getMode());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -3982,6 +4136,42 @@ public final class ZooKeeperProtos {
         return serverNameBuilder_;
       }
 
+      // optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];
+      private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
+      /**
+       * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+       */
+      public boolean hasMode() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode getMode() {
+        return mode_;
+      }
+      /**
+       * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+       */
+      public Builder setMode(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00000004;
+        mode_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+       */
+      public Builder clearMode() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:SplitLogTask)
     }
 
@@ -9399,29 +9589,32 @@ public final class ZooKeeperProtos {
       "gionTransition\022\027\n\017event_type_code\030\001 \002(\r\022" +
       "\023\n\013region_name\030\002 \002(\014\022\023\n\013create_time\030\003 \002(" +
       "\004\022 \n\013server_name\030\004 \002(\0132\013.ServerName\022\017\n\007p" +
-      "ayload\030\005 \001(\014\"\231\001\n\014SplitLogTask\022\"\n\005state\030\001" +
+      "ayload\030\005 \001(\014\"\214\002\n\014SplitLogTask\022\"\n\005state\030\001" +
       " \002(\0162\023.SplitLogTask.State\022 \n\013server_name",
-      "\030\002 \002(\0132\013.ServerName\"C\n\005State\022\016\n\nUNASSIGN" +
-      "ED\020\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020\002\022\010\n\004DONE\020\003\022" +
-      "\007\n\003ERR\020\004\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table" +
-      ".State:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n" +
-      "\010DISABLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003" +
-      "\"%\n\017ReplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\"" +
-      "^\n\020ReplicationState\022&\n\005state\030\001 \002(\0162\027.Rep" +
-      "licationState.State\"\"\n\005State\022\013\n\007ENABLED\020" +
-      "\000\022\014\n\010DISABLED\020\001\"+\n\027ReplicationHLogPositi" +
-      "on\022\020\n\010position\030\001 \002(\003\"%\n\017ReplicationLock\022",
-      "\022\n\nlock_owner\030\001 \002(\t\"\230\001\n\tTableLock\022\036\n\ntab" +
-      "le_name\030\001 \001(\0132\n.TableName\022\037\n\nlock_owner\030" +
-      "\002 \001(\0132\013.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n" +
-      "\tis_shared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013cre" +
-      "ate_time\030\006 \001(\003\";\n\017StoreSequenceId\022\023\n\013fam" +
-      "ily_name\030\001 \002(\014\022\023\n\013sequence_id\030\002 \002(\004\"g\n\026R" +
-      "egionStoreSequenceIds\022 \n\030last_flushed_se" +
-      "quence_id\030\001 \002(\004\022+\n\021store_sequence_id\030\002 \003" +
-      "(\0132\020.StoreSequenceIdBE\n*org.apache.hadoo" +
-      "p.hbase.protobuf.generatedB\017ZooKeeperPro",
-      "tosH\001\210\001\001\240\001\001"
+      "\030\002 \002(\0132\013.ServerName\0221\n\004mode\030\003 \001(\0162\032.Spli" +
+      "tLogTask.RecoveryMode:\007UNKNOWN\"C\n\005State\022" +
+      "\016\n\nUNASSIGNED\020\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020\002" +
+      "\022\010\n\004DONE\020\003\022\007\n\003ERR\020\004\">\n\014RecoveryMode\022\013\n\007U" +
+      "NKNOWN\020\000\022\021\n\rLOG_SPLITTING\020\001\022\016\n\nLOG_REPLA" +
+      "Y\020\002\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table.Stat" +
+      "e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
+      "BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"%\n\017R" +
+      "eplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\"^\n\020Re" +
+      "plicationState\022&\n\005state\030\001 \002(\0162\027.Replicat",
+      "ionState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010" +
+      "DISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n" +
+      "\010position\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlo" +
+      "ck_owner\030\001 \002(\t\"\230\001\n\tTableLock\022\036\n\ntable_na" +
+      "me\030\001 \001(\0132\n.TableName\022\037\n\nlock_owner\030\002 \001(\013" +
+      "2\013.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_s" +
+      "hared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_t" +
+      "ime\030\006 \001(\003\";\n\017StoreSequenceId\022\023\n\013family_n" +
+      "ame\030\001 \002(\014\022\023\n\013sequence_id\030\002 \002(\004\"g\n\026Region" +
+      "StoreSequenceIds\022 \n\030last_flushed_sequenc",
+      "e_id\030\001 \002(\004\022+\n\021store_sequence_id\030\002 \003(\0132\020." +
+      "StoreSequenceIdBE\n*org.apache.hadoop.hba" +
+      "se.protobuf.generatedB\017ZooKeeperProtosH\001" +
+      "\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9457,7 +9650,7 @@ public final class ZooKeeperProtos {
           internal_static_SplitLogTask_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_SplitLogTask_descriptor,
-              new java.lang.String[] { "State", "ServerName", });
+              new java.lang.String[] { "State", "ServerName", "Mode", });
           internal_static_Table_descriptor =
             getDescriptor().getMessageTypes().get(5);
           internal_static_Table_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-protocol/src/main/protobuf/Admin.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto
index 5b889cd..ecf30f4 100644
--- a/hbase-protocol/src/main/protobuf/Admin.proto
+++ b/hbase-protocol/src/main/protobuf/Admin.proto
@@ -75,6 +75,8 @@ message OpenRegionRequest {
     required RegionInfo region = 1;
     optional uint32 version_of_offline_node = 2;
     repeated ServerName favored_nodes = 3;
+    // open region for distributedLogReplay
+    optional bool openForDistributedLogReplay = 4;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-protocol/src/main/protobuf/ZooKeeper.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
index 082e1f7..37816da 100644
--- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto
+++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
@@ -85,8 +85,14 @@ message SplitLogTask {
     DONE = 3;
     ERR = 4;
   }
+  enum RecoveryMode {
+    UNKNOWN = 0;
+    LOG_SPLITTING = 1;
+    LOG_REPLAY = 2;
+  }
   required State state = 1;
   required ServerName server_name = 2;
+  optional RecoveryMode mode = 3 [default = UNKNOWN];
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
index 67a0994..1b5f8b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
@@ -18,10 +18,13 @@
 package org.apache.hadoop.hbase;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -36,49 +39,59 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class SplitLogTask {
   private final ServerName originServer;
   private final ZooKeeperProtos.SplitLogTask.State state;
+  private final ZooKeeperProtos.SplitLogTask.RecoveryMode mode;
 
   public static class Unassigned extends SplitLogTask {
-    public Unassigned(final ServerName originServer) {
-      super(originServer, ZooKeeperProtos.SplitLogTask.State.UNASSIGNED);
+    public Unassigned(final ServerName originServer, final RecoveryMode mode) {
+      super(originServer, ZooKeeperProtos.SplitLogTask.State.UNASSIGNED, mode);
     }
   }
 
   public static class Owned extends SplitLogTask {
-    public Owned(final ServerName originServer) {
-      super(originServer, ZooKeeperProtos.SplitLogTask.State.OWNED);
+    public Owned(final ServerName originServer, final RecoveryMode mode) {
+      super(originServer, ZooKeeperProtos.SplitLogTask.State.OWNED, mode);
     }
   }
 
   public static class Resigned extends SplitLogTask {
-    public Resigned(final ServerName originServer) {
-      super(originServer, ZooKeeperProtos.SplitLogTask.State.RESIGNED);
+    public Resigned(final ServerName originServer, final RecoveryMode mode) {
+      super(originServer, ZooKeeperProtos.SplitLogTask.State.RESIGNED, mode);
     }
   }
 
   public static class Done extends SplitLogTask {
-    public Done(final ServerName originServer) {
-      super(originServer, ZooKeeperProtos.SplitLogTask.State.DONE);
+    public Done(final ServerName originServer, final RecoveryMode mode) {
+      super(originServer, ZooKeeperProtos.SplitLogTask.State.DONE, mode);
     }
   }
 
   public static class Err extends SplitLogTask {
-    public Err(final ServerName originServer) {
-      super(originServer, ZooKeeperProtos.SplitLogTask.State.ERR);
+    public Err(final ServerName originServer, final RecoveryMode mode) {
+      super(originServer, ZooKeeperProtos.SplitLogTask.State.ERR, mode);
     }
   }
 
   SplitLogTask(final ZooKeeperProtos.SplitLogTask slt) {
-    this(ProtobufUtil.toServerName(slt.getServerName()), slt.getState());
+    this.originServer = ProtobufUtil.toServerName(slt.getServerName());
+    this.state = slt.getState();
+    this.mode = (slt.hasMode()) ? slt.getMode() : 
+      ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
   }
 
-  SplitLogTask(final ServerName originServer, final ZooKeeperProtos.SplitLogTask.State state) {
+  SplitLogTask(final ServerName originServer, final ZooKeeperProtos.SplitLogTask.State state,
+      final ZooKeeperProtos.SplitLogTask.RecoveryMode mode) {
     this.originServer = originServer;
     this.state = state;
+    this.mode = mode;
   }
 
   public ServerName getServerName() {
     return this.originServer;
   }
+  
+  public ZooKeeperProtos.SplitLogTask.RecoveryMode getMode() {
+    return this.mode;
+  }
 
   public boolean isUnassigned(final ServerName sn) {
     return this.originServer.equals(sn) && isUnassigned();
@@ -167,7 +180,8 @@ public class SplitLogTask {
     // pbs just created.
     HBaseProtos.ServerName snpb = ProtobufUtil.toServerName(this.originServer);
     ZooKeeperProtos.SplitLogTask slts =
-      ZooKeeperProtos.SplitLogTask.newBuilder().setServerName(snpb).setState(this.state).build();
+      ZooKeeperProtos.SplitLogTask.newBuilder().setServerName(snpb).setState(this.state).
+      setMode(this.mode).build();
     return ProtobufUtil.prependPBMagic(slts.toByteArray());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 83a3ece..3d36915 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -111,15 +111,12 @@ import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.*;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
@@ -200,7 +197,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequ
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
@@ -209,6 +205,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
@@ -391,12 +388,6 @@ MasterServices, Server {
   /** The health check chore. */
   private HealthCheckChore healthCheckChore;
 
-  /**
-   * is in distributedLogReplay mode. When true, SplitLogWorker directly replays WAL edits to newly
-   * assigned region servers instead of creating recovered.edits files.
-   */
-  private final boolean distributedLogReplay;
-
   /** flag used in test cases in order to simulate RS failures during master initialization */
   private volatile boolean initializationBeforeMetaAssignment = false;
 
@@ -516,9 +507,6 @@ MasterServices, Server {
         Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
       }
     }
-
-    distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
-      HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
   }
 
   /**
@@ -1039,7 +1027,8 @@ MasterServices, Server {
 
     enableMeta(TableName.META_TABLE_NAME);
 
-    if (this.distributedLogReplay && (!previouslyFailedMetaRSs.isEmpty())) {
+    if ((RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode())
+        && (!previouslyFailedMetaRSs.isEmpty())) {
       // replay WAL edits mode need new hbase:meta RS is assigned firstly
       status.setStatus("replaying log for Meta Region");
       this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
@@ -1063,7 +1052,7 @@ MasterServices, Server {
   }
 
   private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
-    if (this.distributedLogReplay) {
+    if (RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode()) {
       // In log replay mode, we mark hbase:meta region as recovering in ZK
       Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
       regions.add(HRegionInfo.FIRST_META_REGIONINFO);

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index 687fc75..2b2a5f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.catalog.MetaReader;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
@@ -122,14 +123,17 @@ public class MasterFileSystem {
     FSUtils.setFsDefault(conf, new Path(this.fs.getUri()));
     // make sure the fs has the same conf
     fs.setConf(conf);
-    this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
     // setup the filesystem variable
     // set up the archived logs path
     this.oldLogDir = createInitialFileSystemLayout();
     HFileSystem.addLocationsOrderInterceptor(conf);
-    this.splitLogManager = new SplitLogManager(master.getZooKeeper(),
-      master.getConfiguration(), master, services,
-      master.getServerName(), masterRecovery);
+    try {
+      this.splitLogManager = new SplitLogManager(master.getZooKeeper(), master.getConfiguration(),
+          master, services, master.getServerName());
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    }
+    this.distributedLogReplay = (this.splitLogManager.getRecoveryMode() == RecoveryMode.LOG_REPLAY);
   }
 
   /**
@@ -682,4 +686,22 @@ public class MasterFileSystem {
     }
     return null;
   }
+
+  /**
+   * The function is used in SSH to set recovery mode based on configuration after all outstanding
+   * log split tasks drained.
+   * @throws KeeperException
+   * @throws InterruptedIOException
+   */
+  public void setLogRecoveryMode() throws IOException {
+    try {
+      this.splitLogManager.setRecoveryMode(false);
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public RecoveryMode getLogRecoveryMode() {
+    return this.splitLogManager.getRecoveryMode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index e7b8532..4e4ad15 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Triple;
@@ -662,8 +663,9 @@ public class ServerManager {
         " failed because no RPC connection found to this server");
       return RegionOpeningState.FAILED_OPENING;
     }
-    OpenRegionRequest request =
-      RequestConverter.buildOpenRegionRequest(server, region, versionOfOfflineNode, favoredNodes);
+    OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, 
+      region, versionOfOfflineNode, favoredNodes, 
+      (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
     try {
       OpenRegionResponse response = admin.openRegion(null, request);
       return ResponseConverter.getRegionOpeningState(response);
@@ -691,8 +693,8 @@ public class ServerManager {
       return null;
     }
 
-    OpenRegionRequest request =
-      RequestConverter.buildOpenRegionRequest(regionOpenInfos);
+    OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(regionOpenInfos, 
+      (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
     try {
       OpenRegionResponse response = admin.openRegion(null, request);
       return ResponseConverter.getRegionOpeningStateList(response);

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index bdf60dd..2e98433 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -25,6 +25,7 @@ import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.I
 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -45,16 +46,19 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
@@ -137,7 +141,8 @@ public class SplitLogManager extends ZooKeeperListener {
    */
   protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
 
-  final boolean distributedLogReplay;
+  private volatile RecoveryMode recoveryMode;
+  private volatile boolean isDrainingDone = false;
 
   private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
   private TimeoutMonitor timeoutMonitor;
@@ -158,9 +163,12 @@ public class SplitLogManager extends ZooKeeperListener {
    * @param stopper the stoppable in case anything is wrong
    * @param master the master services
    * @param serverName the master server name
+   * @throws KeeperException
+   * @throws InterruptedIOException
    */
-  public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
-      Stoppable stopper, MasterServices master, ServerName serverName) {
+    public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
+       Stoppable stopper, MasterServices master, ServerName serverName)
+       throws InterruptedIOException, KeeperException {
     this(zkw, conf, stopper, master, serverName, false, null);
   }
 
@@ -178,9 +186,12 @@ public class SplitLogManager extends ZooKeeperListener {
    * @param master the master services
    * @param serverName the master server name
    * @param masterRecovery an indication if the master is in recovery
+   * @throws KeeperException
+   * @throws InterruptedIOException
    */
   public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
-      Stoppable stopper, MasterServices master, ServerName serverName, boolean masterRecovery) {
+      Stoppable stopper, MasterServices master, ServerName serverName, boolean masterRecovery) 
+      throws InterruptedIOException, KeeperException {
     this(zkw, conf, stopper, master, serverName, masterRecovery, new TaskFinisher() {
       @Override
       public Status finish(ServerName workerName, String logfile) {
@@ -207,10 +218,13 @@ public class SplitLogManager extends ZooKeeperListener {
    * @param serverName the master server name
    * @param masterRecovery an indication if the master is in recovery
    * @param tf task finisher
+   * @throws KeeperException
+   * @throws InterruptedIOException
    */
   public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
         Stoppable stopper, MasterServices master,
-        ServerName serverName, boolean masterRecovery, TaskFinisher tf) {
+        ServerName serverName, boolean masterRecovery, TaskFinisher tf) 
+      throws InterruptedIOException, KeeperException {
     super(zkw);
     this.taskFinisher = tf;
     this.conf = conf;
@@ -221,9 +235,12 @@ public class SplitLogManager extends ZooKeeperListener {
     this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
     this.unassignedTimeout =
       conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
-    this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf);
+
+    // Determine recovery mode  
+    setRecoveryMode(true);
+
     LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout +
-      ", distributedLogReplay=" + this.distributedLogReplay);
+      ", distributedLogReplay=" + (this.recoveryMode == RecoveryMode.LOG_REPLAY));
 
     this.serverName = serverName;
     this.timeoutMonitor = new TimeoutMonitor(
@@ -486,8 +503,7 @@ public class SplitLogManager extends ZooKeeperListener {
    */
   private void
       removeRecoveringRegionsFromZK(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
-
-    if (!this.distributedLogReplay) {
+    if (this.recoveryMode != RecoveryMode.LOG_REPLAY) {
       // the function is only used in WALEdit direct replay mode
       return;
     }
@@ -515,7 +531,7 @@ public class SplitLogManager extends ZooKeeperListener {
       if (count == 0 && this.master.isInitialized()
           && !this.master.getServerManager().areDeadServersInProgress()) {
         // no splitting work items left
-        deleteRecoveringRegionZNodes(null);
+        deleteRecoveringRegionZNodes(watcher, null);
         // reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at
         // this point.
         lastRecoveringNodeCreationTime = Long.MAX_VALUE;
@@ -571,14 +587,6 @@ public class SplitLogManager extends ZooKeeperListener {
   void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
       throws KeeperException {
 
-    if (!this.distributedLogReplay) {
-      // remove any regions in recovery from ZK which could happen when we turn the feature on
-      // and later turn it off
-      ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
-      // the function is only used in distributedLogReplay mode when master is in initialization
-      return;
-    }
-
     Set<String> knownFailedServers = new HashSet<String>();
     if (failedServers != null) {
       for (ServerName tmpServerName : failedServers) {
@@ -641,7 +649,7 @@ public class SplitLogManager extends ZooKeeperListener {
     }
   }
 
-  private void deleteRecoveringRegionZNodes(List<String> regions) {
+  public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
     try {
       if (regions == null) {
         // remove all children under /home/recovering-regions
@@ -699,7 +707,7 @@ public class SplitLogManager extends ZooKeeperListener {
   }
 
   private void createNode(String path, Long retry_count) {
-    SplitLogTask slt = new SplitLogTask.Unassigned(serverName);
+    SplitLogTask slt = new SplitLogTask.Unassigned(serverName, this.recoveryMode);
     ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count);
     SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
     return;
@@ -874,7 +882,7 @@ public class SplitLogManager extends ZooKeeperListener {
     task.incarnation++;
     try {
       // blocking zk call but this is done from the timeout thread
-      SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName);
+      SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName, this.recoveryMode);
       if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
         LOG.debug("failed to resubmit task " + path +
             " version changed");
@@ -967,7 +975,7 @@ public class SplitLogManager extends ZooKeeperListener {
     // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
     // therefore this behavior is safe.
     lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
-    SplitLogTask slt = new SplitLogTask.Done(this.serverName);
+    SplitLogTask slt = new SplitLogTask.Done(this.serverName, this.recoveryMode);
     this.watcher.getRecoverableZooKeeper().getZooKeeper().
       create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(),
         Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
@@ -1115,7 +1123,7 @@ public class SplitLogManager extends ZooKeeperListener {
    */
   void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)
       throws KeeperException {
-    if (userRegions == null || !this.distributedLogReplay) {
+    if (userRegions == null || (this.recoveryMode != RecoveryMode.LOG_REPLAY)) {
       return;
     }
 
@@ -1251,6 +1259,108 @@ public class SplitLogManager extends ZooKeeperListener {
     }
     return result;
   }
+  
+  /**
+   * This function is to set recovery mode from outstanding split log tasks from before or
+   * current configuration setting
+   * @param isForInitialization
+   * @throws KeeperException
+   * @throws InterruptedIOException
+   */
+  public void setRecoveryMode(boolean isForInitialization) throws KeeperException {
+    if(this.isDrainingDone) {
+      // when there is no outstanding splitlogtask after master start up, we already have up to date
+      // recovery mode
+      return;
+    }
+    if(this.watcher == null) {
+      // when watcher is null(testing code) and recovery mode can only be LOG_SPLITTING
+      this.isDrainingDone = true;
+      this.recoveryMode = RecoveryMode.LOG_SPLITTING;
+      return;
+    }
+    boolean hasSplitLogTask = false;
+    boolean hasRecoveringRegions = false;
+    RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN;
+    RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ? 
+      RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING;
+
+    // Firstly check if there are outstanding recovering regions
+    List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
+    if (regions != null && !regions.isEmpty()) {
+      hasRecoveringRegions = true;
+      previousRecoveryMode = RecoveryMode.LOG_REPLAY;
+    }
+    if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
+      // Secondly check if there are outstanding split log task
+      List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
+      if (tasks != null && !tasks.isEmpty()) {
+        hasSplitLogTask = true;
+        if (isForInitialization) {
+          // during initialization, try to get recovery mode from splitlogtask
+          for (String task : tasks) {
+            try {
+              byte[] data = ZKUtil.getData(this.watcher,
+                ZKUtil.joinZNode(watcher.splitLogZNode, task));
+              if (data == null) continue;
+              SplitLogTask slt = SplitLogTask.parseFrom(data);
+              previousRecoveryMode = slt.getMode();
+              if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
+                // created by old code base where we don't set recovery mode in splitlogtask
+                // we can safely set to LOG_SPLITTING because we're in master initialization code 
+                // before SSH is enabled & there is no outstanding recovering regions
+                previousRecoveryMode = RecoveryMode.LOG_SPLITTING;
+              }
+              break;
+            } catch (DeserializationException e) {
+              LOG.warn("Failed parse data for znode " + task, e);
+            }
+          }
+        }
+      }
+    }
+
+    synchronized(this) {
+      if(this.isDrainingDone) {
+        return;
+      }
+      if (!hasSplitLogTask && !hasRecoveringRegions) {
+        this.isDrainingDone = true;
+        this.recoveryMode = recoveryModeInConfig;
+        return;
+      } else if (!isForInitialization) {
+        // splitlogtask hasn't drained yet, keep existing recovery mode
+        return;
+      }
+  
+      if (previousRecoveryMode != RecoveryMode.UNKNOWN) {
+        this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig);
+        this.recoveryMode = previousRecoveryMode;
+      } else {
+        this.recoveryMode = recoveryModeInConfig;
+      }
+    }
+  }
+
+  public RecoveryMode getRecoveryMode() {
+    return this.recoveryMode;
+  }
+  
+  /**
+   * Returns if distributed log replay is turned on or not
+   * @param conf
+   * @return true when distributed log replay is turned on
+   */
+  private boolean isDistributedLogReplay(Configuration conf) {
+    boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
+      HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
+    int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version);
+    }
+    // For distributed log replay, hfile version must be 3 at least; we need tag support.
+    return dlr && (version >= 3);
+  }
 
   /**
    * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
index 03acccf..ce58e90 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.DeadServer;
 import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.zookeeper.KeeperException;
 
@@ -62,10 +63,13 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
     boolean gotException = true; 
     try {
       AssignmentManager am = this.services.getAssignmentManager();
+      this.services.getMasterFileSystem().setLogRecoveryMode();
+      boolean distributedLogReplay = 
+        (this.services.getMasterFileSystem().getLogRecoveryMode() == RecoveryMode.LOG_REPLAY);
       try {
         if (this.shouldSplitHlog) {
           LOG.info("Splitting hbase:meta logs for " + serverName);
-          if (this.distributedLogReplay) {
+          if (distributedLogReplay) {
             Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
             regions.add(HRegionInfo.FIRST_META_REGIONINFO);
             this.services.getMasterFileSystem().prepareLogReplay(serverName, regions);
@@ -97,7 +101,7 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
       }
 
       try {
-        if (this.shouldSplitHlog && this.distributedLogReplay) {
+        if (this.shouldSplitHlog && distributedLogReplay) {
           if (!am.waitOnRegionToClearRegionsInTransition(HRegionInfo.FIRST_META_REGIONINFO,
             regionAssignmentWaitTimeout)) {
             // Wait here is to avoid log replay hits current dead server and incur a RPC timeout

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
index a08dc3c..8a9e9c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.RegionState.State;
 import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
 import org.apache.zookeeper.KeeperException;
@@ -63,7 +64,6 @@ public class ServerShutdownHandler extends EventHandler {
   protected final MasterServices services;
   protected final DeadServer deadServers;
   protected final boolean shouldSplitHlog; // whether to split HLog or not
-  protected final boolean distributedLogReplay;
   protected final int regionAssignmentWaitTimeout;
 
   public ServerShutdownHandler(final Server server, final MasterServices services,
@@ -85,7 +85,6 @@ public class ServerShutdownHandler extends EventHandler {
       LOG.warn(this.serverName + " is NOT in deadservers; it should be!");
     }
     this.shouldSplitHlog = shouldSplitHlog;
-    this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(server.getConfiguration());
     this.regionAssignmentWaitTimeout = server.getConfiguration().getInt(
       HConstants.LOG_REPLAY_WAIT_REGION_TIMEOUT, 15000);
   }
@@ -186,10 +185,16 @@ public class ServerShutdownHandler extends EventHandler {
         throw new IOException("Server is stopped");
       }
 
+      // delayed to set recovery mode based on configuration only after all outstanding splitlogtask
+      // drained
+      this.services.getMasterFileSystem().setLogRecoveryMode();
+      boolean distributedLogReplay = 
+        (this.services.getMasterFileSystem().getLogRecoveryMode() == RecoveryMode.LOG_REPLAY);
+
       try {
         if (this.shouldSplitHlog) {
           LOG.info("Splitting logs for " + serverName + " before assignment.");
-          if (this.distributedLogReplay) {
+          if (distributedLogReplay) {
             LOG.info("Mark regions in recovery before assignment.");
             Set<ServerName> serverNames = new HashSet<ServerName>();
             serverNames.add(serverName);
@@ -288,7 +293,7 @@ public class ServerShutdownHandler extends EventHandler {
         throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
       }
 
-      if (this.shouldSplitHlog && this.distributedLogReplay) {
+      if (this.shouldSplitHlog && distributedLogReplay) {
         // wait for region assignment completes
         for (HRegionInfo hri : toAssignRegions) {
           try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 569983e..1f3736d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -495,9 +495,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
 
   private RegionServerProcedureManagerHost rspmHost;
 
-  // configuration setting on if replay WAL edits directly to another RS
-  private final boolean distributedLogReplay;
-
   // Table level lock manager for locking for region operations
   private TableLockManager tableLockManager;
 
@@ -630,7 +627,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
     // Put up the webui. Webui may come up on port other than configured if
     // that port is occupied. Adjust serverInfo if this is the case.
     this.rsInfo.setInfoPort(putUpWebUI());
-    this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
   }
 
   /**
@@ -764,9 +760,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
         ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode));
 
     // register watcher for recovering regions
-    if(this.distributedLogReplay) {
-      this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
-    }
+    this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
   }
 
   /**
@@ -3670,10 +3664,20 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
 
         if (previous == null) {
           // check if the region to be opened is marked in recovering state in ZK
-          if (this.distributedLogReplay
-              && SplitLogManager.isRegionMarkedRecoveringInZK(this.getZooKeeper(),
-            region.getEncodedName())) {
-            this.recoveringRegions.put(region.getEncodedName(), null);
+          if (SplitLogManager.isRegionMarkedRecoveringInZK(this.zooKeeper,
+                region.getEncodedName())) {
+            // check if current region open is for distributedLogReplay. This check is to support
+            // rolling restart/upgrade where we want to Master/RS see same configuration
+            if (!regionOpenInfo.hasOpenForDistributedLogReplay() 
+                  || regionOpenInfo.getOpenForDistributedLogReplay()) {
+              this.recoveringRegions.put(region.getEncodedName(), null);
+            } else {
+              // remove stale recovery region from ZK when we open region not for recovering which
+              // could happen when turn distributedLogReplay off from on.
+              List<String> tmpRegions = new ArrayList<String>();
+              tmpRegions.add(region.getEncodedName());
+              SplitLogManager.deleteRecoveringRegionZNodes(this.zooKeeper, tmpRegions);
+            }
           }
           // If there is no action in progress, we can submit a specific handler.
           // Need to pass the expected version in the constructor.

http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index 4a83741..c1b2cb7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
@@ -125,7 +126,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
       RegionServerServices server, final LastSequenceId sequenceIdChecker) {
     this(watcher, conf, server, new TaskExecutor() {
       @Override
-      public Status exec(String filename, CancelableProgressable p) {
+      public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
         Path rootdir;
         FileSystem fs;
         try {
@@ -140,7 +141,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
         // encountered a bad non-retry-able persistent error.
         try {
           if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
-            fs, conf, p, sequenceIdChecker, watcher)) {
+            fs, conf, p, sequenceIdChecker, watcher, mode)) {
             return Status.PREEMPTED;
           }
         } catch (InterruptedIOException iioe) {
@@ -174,11 +175,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
     try {
       LOG.info("SplitLogWorker " + this.serverName + " starting");
       this.watcher.registerListener(this);
-      boolean distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf);
-      if (distributedLogReplay) {
-        // initialize a new connection for splitlogworker configuration
-        HConnectionManager.getConnection(conf);
-      }
+      // pre-initialize a new connection for splitlogworker configuration
+      HConnectionManager.getConnection(conf);
 
       // wait for master to create the splitLogZnode
       int res = -1;
@@ -313,7 +311,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
    */
   private void grabTask(String path) {
     Stat stat = new Stat();
-    long t = -1;
     byte[] data;
     synchronized (grabTaskLock) {
       currentTask = path;
@@ -346,14 +343,15 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
         return;
       }
 
-      currentVersion = attemptToOwnTask(true, watcher, serverName, path, stat.getVersion());
+      currentVersion = attemptToOwnTask(true, watcher, serverName, path, slt.getMode(), 
+        stat.getVersion());
       if (currentVersion < 0) {
         SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
         return;
       }
 
       if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
-        HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName),
+        HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName, slt.getMode()),
           SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion);
         return;
       }
@@ -362,7 +360,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
       SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
       getDataSetWatchAsync();
 
-      submitTask(path, currentVersion, this.report_period);
+      submitTask(path, slt.getMode(), currentVersion, this.report_period);
 
       // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
       try {
@@ -397,10 +395,10 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
    * @return non-negative integer value when task can be owned by current region server otherwise -1
    */
   protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
-      ServerName server, String task, int taskZKVersion) {
+      ServerName server, String task, RecoveryMode mode, int taskZKVersion) {
     int latestZKVersion = FAILED_TO_OWN_TASK;
     try {
-      SplitLogTask slt = new SplitLogTask.Owned(server);
+      SplitLogTask slt = new SplitLogTask.Owned(server, mode);
       Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
       if (stat == null) {
         LOG.warn("zk.setData() returned null for path " + task);
@@ -458,7 +456,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
    * @param curTask
    * @param curTaskZKVersion
    */
-  void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) {
+  void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion, 
+    final int reportPeriod) {
     final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
 
     CancelableProgressable reporter = new CancelableProgressable() {
@@ -470,7 +469,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
         if ((t - last_report_at) > reportPeriod) {
           last_report_at = t;
           int latestZKVersion =
-              attemptToOwnTask(false, watcher, serverName, curTask, zkVersion.intValue());
+              attemptToOwnTask(false, watcher, serverName, curTask, mode, zkVersion.intValue());
           if (latestZKVersion < 0) {
             LOG.warn("Failed to heartbeat the task" + curTask);
             return false;
@@ -481,9 +480,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
       }
     };
     
-    HLogSplitterHandler hsh =
-        new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, this.tasksInProgress,
-            this.splitTaskExecutor);
+    HLogSplitterHandler hsh = new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, 
+      this.tasksInProgress, this.splitTaskExecutor, mode);
     this.executorService.submit(hsh);
   }
 
@@ -658,6 +656,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
       RESIGNED(),
       PREEMPTED()
     }
-    Status exec(String name, CancelableProgressable p);
+    Status exec(String name, RecoveryMode mode, CancelableProgressable p);
   }
 }