You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/02/24 16:14:08 UTC

[1/3] hbase git commit: HBASE-15302 Reenable the other tests disabled by HBASE-14678

Repository: hbase
Updated Branches:
  refs/heads/master 876a6ab73 -> 30cec72f9


http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
new file mode 100644
index 0000000..125f5a1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.EnableTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.TruncateTableState;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, LargeTests.class})
+public class TestMasterFailoverWithProcedures {
+  private static final Log LOG = LogFactory.getLog(TestMasterFailoverWithProcedures.class);
+
+  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static void setupConf(Configuration conf) {
+    // don't waste time retrying with the roll, the test is already slow enough.
+    conf.setInt("hbase.procedure.store.wal.max.retries.before.roll", 1);
+    conf.setInt("hbase.procedure.store.wal.wait.before.roll", 0);
+    conf.setInt("hbase.procedure.store.wal.max.roll.retries", 1);
+    conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 1);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    setupConf(UTIL.getConfiguration());
+    UTIL.startMiniCluster(2, 1);
+
+    final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, false);
+    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, false);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      UTIL.shutdownMiniCluster();
+    } catch (Exception e) {
+      LOG.warn("failure shutting down cluster", e);
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testWalRecoverLease() throws Exception {
+    final ProcedureStore masterStore = getMasterProcedureExecutor().getStore();
+    assertTrue("expected WALStore for this test", masterStore instanceof WALProcedureStore);
+
+    HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
+    // Abort Latch for the master store
+    final CountDownLatch masterStoreAbort = new CountDownLatch(1);
+    masterStore.registerListener(new ProcedureStore.ProcedureStoreListener() {
+      @Override
+      public void postSync() {}
+
+      @Override
+      public void abortProcess() {
+        LOG.debug("Abort store of Master");
+        masterStoreAbort.countDown();
+      }
+    });
+
+    // startup a fake master the new WAL store will take the lease
+    // and the active master should abort.
+    HMaster backupMaster3 = Mockito.mock(HMaster.class);
+    Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
+    Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
+    final WALProcedureStore backupStore3 = new WALProcedureStore(firstMaster.getConfiguration(),
+        firstMaster.getMasterFileSystem().getFileSystem(),
+        ((WALProcedureStore)masterStore).getLogDir(),
+        new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
+    // Abort Latch for the test store
+    final CountDownLatch backupStore3Abort = new CountDownLatch(1);
+    backupStore3.registerListener(new ProcedureStore.ProcedureStoreListener() {
+      @Override
+      public void postSync() {}
+
+      @Override
+      public void abortProcess() {
+        LOG.debug("Abort store of backupMaster3");
+        backupStore3Abort.countDown();
+        backupStore3.stop(true);
+      }
+    });
+    backupStore3.start(1);
+    backupStore3.recoverLease();
+
+    // Try to trigger a command on the master (WAL lease expired on the active one)
+    HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(TableName.valueOf("mtb"), "f");
+    HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, null);
+    LOG.debug("submit proc");
+    try {
+      getMasterProcedureExecutor().submitProcedure(
+          new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions));
+      fail("expected RuntimeException 'sync aborted'");
+    } catch (RuntimeException e) {
+      LOG.info("got " + e.getMessage());
+    }
+    LOG.debug("wait master store abort");
+    masterStoreAbort.await();
+
+    // Now the real backup master should start up
+    LOG.debug("wait backup master to startup");
+    waitBackupMaster(UTIL, firstMaster);
+    assertEquals(true, firstMaster.isStopped());
+
+    // wait the store in here to abort (the test will fail due to timeout if it doesn't)
+    LOG.debug("wait the store to abort");
+    backupStore3.getStoreTracker().setDeleted(1, false);
+    try {
+      backupStore3.delete(1);
+      fail("expected RuntimeException 'sync aborted'");
+    } catch (RuntimeException e) {
+      LOG.info("got " + e.getMessage());
+    }
+    backupStore3Abort.await();
+  }
+
+  /**
+   * Tests proper fencing in case the current WAL store is fenced
+   */
+  @Test
+  public void testWALfencingWithoutWALRolling() throws IOException {
+    testWALfencing(false);
+  }
+
+  /**
+   * Tests proper fencing in case the current WAL store does not receive writes until after the
+   * new WAL does a couple of WAL rolls.
+   */
+  @Test
+  public void testWALfencingWithWALRolling() throws IOException {
+    testWALfencing(true);
+  }
+
+  public void testWALfencing(boolean walRolls) throws IOException {
+    final ProcedureStore procStore = getMasterProcedureExecutor().getStore();
+    assertTrue("expected WALStore for this test", procStore instanceof WALProcedureStore);
+
+    HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
+
+    // cause WAL rolling after a delete in WAL:
+    firstMaster.getConfiguration().setLong("hbase.procedure.store.wal.roll.threshold", 1);
+
+    HMaster backupMaster3 = Mockito.mock(HMaster.class);
+    Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
+    Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
+    final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(),
+        firstMaster.getMasterFileSystem().getFileSystem(),
+        ((WALProcedureStore)procStore).getLogDir(),
+        new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
+
+    // start a second store which should fence the first one out
+    LOG.info("Starting new WALProcedureStore");
+    procStore2.start(1);
+    procStore2.recoverLease();
+
+    // before writing back to the WAL store, optionally do a couple of WAL rolls (which causes
+    // to delete the old WAL files).
+    if (walRolls) {
+      LOG.info("Inserting into second WALProcedureStore, causing WAL rolls");
+      for (int i = 0; i < 512; i++) {
+        // insert something to the second store then delete it, causing a WAL roll(s)
+        Procedure proc2 = new TestProcedure(i);
+        procStore2.insert(proc2, null);
+        procStore2.delete(proc2.getProcId()); // delete the procedure so that the WAL is removed later
+      }
+    }
+
+    // Now, insert something to the first store, should fail.
+    // If the store does a WAL roll and continue with another logId without checking higher logIds
+    // it will incorrectly succeed.
+    LOG.info("Inserting into first WALProcedureStore");
+    try {
+      procStore.insert(new TestProcedure(11), null);
+      fail("Inserting into Procedure Store should have failed");
+    } catch (Exception ex) {
+      LOG.info("Received expected exception", ex);
+    }
+  }
+
+  // ==========================================================================
+  //  Test Create Table
+  // ==========================================================================
+  @Test(timeout=60000)
+  public void testCreateWithFailover() throws Exception {
+    // TODO: Should we try every step? (master failover takes long time)
+    // It is already covered by TestCreateTableProcedure
+    // but without the master restart, only the executor/store is restarted.
+    // Without Master restart we may not find bug in the procedure code
+    // like missing "wait" for resources to be available (e.g. RS)
+    testCreateWithFailoverAtStep(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS.ordinal());
+  }
+
+  private void testCreateWithFailoverAtStep(final int step) throws Exception {
+    final TableName tableName = TableName.valueOf("testCreateWithFailoverAtStep" + step);
+
+    // create the table
+    ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, true);
+    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, true);
+
+    // Start the Create procedure && kill the executor
+    byte[][] splitKeys = null;
+    HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f1", "f2");
+    HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys);
+    long procId = procExec.submitProcedure(
+        new CreateTableProcedure(procExec.getEnvironment(), htd, regions));
+    testRecoveryAndDoubleExecution(UTIL, procId, step, CreateTableState.values());
+
+    MasterProcedureTestingUtility.validateTableCreation(
+        UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
+  }
+
+  // ==========================================================================
+  //  Test Delete Table
+  // ==========================================================================
+  @Test(timeout=60000)
+  public void testDeleteWithFailover() throws Exception {
+    // TODO: Should we try every step? (master failover takes long time)
+    // It is already covered by TestDeleteTableProcedure
+    // but without the master restart, only the executor/store is restarted.
+    // Without Master restart we may not find bug in the procedure code
+    // like missing "wait" for resources to be available (e.g. RS)
+    testDeleteWithFailoverAtStep(DeleteTableState.DELETE_TABLE_UNASSIGN_REGIONS.ordinal());
+  }
+
+  private void testDeleteWithFailoverAtStep(final int step) throws Exception {
+    final TableName tableName = TableName.valueOf("testDeleteWithFailoverAtStep" + step);
+
+    // create the table
+    byte[][] splitKeys = null;
+    HRegionInfo[] regions = MasterProcedureTestingUtility.createTable(
+        getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2");
+    Path tableDir = FSUtils.getTableDir(getRootDir(), tableName);
+    MasterProcedureTestingUtility.validateTableCreation(
+        UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
+    UTIL.getHBaseAdmin().disableTable(tableName);
+
+    ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, true);
+    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, true);
+
+    // Start the Delete procedure && kill the executor
+    long procId = procExec.submitProcedure(
+        new DeleteTableProcedure(procExec.getEnvironment(), tableName));
+    testRecoveryAndDoubleExecution(UTIL, procId, step, DeleteTableState.values());
+
+    MasterProcedureTestingUtility.validateTableDeletion(
+        UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
+  }
+
+  // ==========================================================================
+  //  Test Truncate Table
+  // ==========================================================================
+  @Test(timeout=90000)
+  public void testTruncateWithFailover() throws Exception {
+    // TODO: Should we try every step? (master failover takes long time)
+    // It is already covered by TestTruncateTableProcedure
+    // but without the master restart, only the executor/store is restarted.
+    // Without Master restart we may not find bug in the procedure code
+    // like missing "wait" for resources to be available (e.g. RS)
+    testTruncateWithFailoverAtStep(true, TruncateTableState.TRUNCATE_TABLE_ADD_TO_META.ordinal());
+  }
+
+  private void testTruncateWithFailoverAtStep(final boolean preserveSplits, final int step)
+      throws Exception {
+    final TableName tableName = TableName.valueOf("testTruncateWithFailoverAtStep" + step);
+
+    // create the table
+    final String[] families = new String[] { "f1", "f2" };
+    final byte[][] splitKeys = new byte[][] {
+        Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c")
+    };
+    HRegionInfo[] regions = MasterProcedureTestingUtility.createTable(
+        getMasterProcedureExecutor(), tableName, splitKeys, families);
+    // load and verify that there are rows in the table
+    MasterProcedureTestingUtility.loadData(
+        UTIL.getConnection(), tableName, 100, splitKeys, families);
+    assertEquals(100, UTIL.countRows(tableName));
+    // disable the table
+    UTIL.getHBaseAdmin().disableTable(tableName);
+
+    ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
+
+    // Start the Truncate procedure && kill the executor
+    long procId = procExec.submitProcedure(
+        new TruncateTableProcedure(procExec.getEnvironment(), tableName, preserveSplits));
+    testRecoveryAndDoubleExecution(UTIL, procId, step, TruncateTableState.values());
+
+    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
+    UTIL.waitUntilAllRegionsAssigned(tableName);
+
+    // validate the table regions and layout
+    if (preserveSplits) {
+      assertEquals(1 + splitKeys.length, UTIL.getHBaseAdmin().getTableRegions(tableName).size());
+    } else {
+      regions = UTIL.getHBaseAdmin().getTableRegions(tableName).toArray(new HRegionInfo[1]);
+      assertEquals(1, regions.length);
+    }
+    MasterProcedureTestingUtility.validateTableCreation(
+        UTIL.getHBaseCluster().getMaster(), tableName, regions, families);
+
+    // verify that there are no rows in the table
+    assertEquals(0, UTIL.countRows(tableName));
+
+    // verify that the table is read/writable
+    MasterProcedureTestingUtility.loadData(
+        UTIL.getConnection(), tableName, 50, splitKeys, families);
+    assertEquals(50, UTIL.countRows(tableName));
+  }
+
+  // ==========================================================================
+  //  Test Disable Table
+  // ==========================================================================
+  @Test(timeout=60000)
+  public void testDisableTableWithFailover() throws Exception {
+    // TODO: Should we try every step? (master failover takes long time)
+    // It is already covered by TestDisableTableProcedure
+    // but without the master restart, only the executor/store is restarted.
+    // Without Master restart we may not find bug in the procedure code
+    // like missing "wait" for resources to be available (e.g. RS)
+    testDisableTableWithFailoverAtStep(
+        DisableTableState.DISABLE_TABLE_MARK_REGIONS_OFFLINE.ordinal());
+  }
+
+  private void testDisableTableWithFailoverAtStep(final int step) throws Exception {
+    final TableName tableName = TableName.valueOf("testDisableTableWithFailoverAtStep" + step);
+
+    // create the table
+    final byte[][] splitKeys = new byte[][] {
+        Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c")
+    };
+    MasterProcedureTestingUtility.createTable(
+        getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2");
+
+    ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
+
+    // Start the Delete procedure && kill the executor
+    long procId = procExec.submitProcedure(
+        new DisableTableProcedure(procExec.getEnvironment(), tableName, false));
+    testRecoveryAndDoubleExecution(UTIL, procId, step, DisableTableState.values());
+
+    MasterProcedureTestingUtility.validateTableIsDisabled(
+        UTIL.getHBaseCluster().getMaster(), tableName);
+  }
+
+  // ==========================================================================
+  //  Test Enable Table
+  // ==========================================================================
+  @Test(timeout=60000)
+  public void testEnableTableWithFailover() throws Exception {
+    // TODO: Should we try every step? (master failover takes long time)
+    // It is already covered by TestEnableTableProcedure
+    // but without the master restart, only the executor/store is restarted.
+    // Without Master restart we may not find bug in the procedure code
+    // like missing "wait" for resources to be available (e.g. RS)
+    testEnableTableWithFailoverAtStep(
+        EnableTableState.ENABLE_TABLE_MARK_REGIONS_ONLINE.ordinal());
+  }
+
+  private void testEnableTableWithFailoverAtStep(final int step) throws Exception {
+    final TableName tableName = TableName.valueOf("testEnableTableWithFailoverAtStep" + step);
+
+    // create the table
+    final byte[][] splitKeys = new byte[][] {
+        Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c")
+    };
+    MasterProcedureTestingUtility.createTable(
+        getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2");
+    UTIL.getHBaseAdmin().disableTable(tableName);
+
+    ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
+
+    // Start the Delete procedure && kill the executor
+    long procId = procExec.submitProcedure(
+        new EnableTableProcedure(procExec.getEnvironment(), tableName, false));
+    testRecoveryAndDoubleExecution(UTIL, procId, step, EnableTableState.values());
+
+    MasterProcedureTestingUtility.validateTableIsEnabled(
+        UTIL.getHBaseCluster().getMaster(), tableName);
+  }
+
+  // ==========================================================================
+  //  Test Helpers
+  // ==========================================================================
+  public static <TState> void testRecoveryAndDoubleExecution(final HBaseTestingUtility testUtil,
+      final long procId, final int lastStepBeforeFailover, TState[] states) throws Exception {
+    ProcedureExecutor<MasterProcedureEnv> procExec =
+        testUtil.getHBaseCluster().getMaster().getMasterProcedureExecutor();
+    ProcedureTestingUtility.waitProcedure(procExec, procId);
+
+    for (int i = 0; i < lastStepBeforeFailover; ++i) {
+      LOG.info("Restart "+ i +" exec state: " + states[i]);
+      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
+      ProcedureTestingUtility.restart(procExec);
+      ProcedureTestingUtility.waitProcedure(procExec, procId);
+    }
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
+
+    LOG.info("Trigger master failover");
+    masterFailover(testUtil);
+
+    procExec = testUtil.getHBaseCluster().getMaster().getMasterProcedureExecutor();
+    ProcedureTestingUtility.waitProcedure(procExec, procId);
+    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
+  }
+
+  // ==========================================================================
+  //  Master failover utils
+  // ==========================================================================
+  public static void masterFailover(final HBaseTestingUtility testUtil)
+      throws Exception {
+    MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
+
+    // Kill the master
+    HMaster oldMaster = cluster.getMaster();
+    cluster.killMaster(cluster.getMaster().getServerName());
+
+    // Wait the secondary
+    waitBackupMaster(testUtil, oldMaster);
+  }
+
+  public static void waitBackupMaster(final HBaseTestingUtility testUtil,
+      final HMaster oldMaster) throws Exception {
+    MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
+
+    HMaster newMaster = cluster.getMaster();
+    while (newMaster == null || newMaster == oldMaster) {
+      Thread.sleep(250);
+      newMaster = cluster.getMaster();
+    }
+
+    while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) {
+      Thread.sleep(250);
+    }
+  }
+
+  // ==========================================================================
+  //  Helpers
+  // ==========================================================================
+  private MasterProcedureEnv getMasterProcedureEnv() {
+    return getMasterProcedureExecutor().getEnvironment();
+  }
+
+  private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
+    return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
+  }
+
+  private FileSystem getFileSystem() {
+    return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+  }
+
+  private Path getRootDir() {
+    return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+  }
+
+  private Path getTempDir() {
+    return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getTempDir();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobFlushSnapshotFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobFlushSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobFlushSnapshotFromClient.java
new file mode 100644
index 0000000..fe297edc
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobFlushSnapshotFromClient.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.snapshot;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test creating/using/deleting snapshots from the client
+ * <p>
+ * This is an end-to-end test for the snapshot utility
+ *
+ * TODO This is essentially a clone of TestSnapshotFromClient.  This is worth refactoring this
+ * because there will be a few more flavors of snapshots that need to run these tests.
+ */
+@Category({ClientTests.class, LargeTests.class})
+public class TestMobFlushSnapshotFromClient extends TestFlushSnapshotFromClient {
+  private static final Log LOG = LogFactory.getLog(TestFlushSnapshotFromClient.class);
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    setupConf(UTIL.getConfiguration());
+    UTIL.startMiniCluster(3);
+  }
+
+  protected static void setupConf(Configuration conf) {
+    TestFlushSnapshotFromClient.setupConf(conf);
+    UTIL.getConfiguration().setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
+  }
+
+  @Override
+  protected void createTable() throws Exception {
+    MobSnapshotTestingUtils.createMobTable(UTIL, TABLE_NAME, 1, TEST_FAM);
+  }
+
+  @Override
+  protected void verifyRowCount(final HBaseTestingUtility util, final TableName tableName,
+      long expectedRows) throws IOException {
+    MobSnapshotTestingUtils.verifyMobRowCount(util, tableName, expectedRows);
+  }
+
+  @Override
+  protected int countRows(final Table table, final byte[]... families) throws IOException {
+    return MobSnapshotTestingUtils.countMobRows(table, families);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
new file mode 100644
index 0000000..67fc60a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -0,0 +1,1320 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.FaultySequenceFileLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
+import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Testing {@link WAL} splitting code.
+ */
+@Category({RegionServerTests.class, LargeTests.class})
+public class TestWALSplit {
+  {
+    // Uncomment the following lines if more verbosity is needed for
+    // debugging (see HBASE-12285 for details).
+    //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+  }
+  private final static Log LOG = LogFactory.getLog(TestWALSplit.class);
+
+  private static Configuration conf;
+  private FileSystem fs;
+
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private Path HBASEDIR;
+  private Path WALDIR;
+  private Path OLDLOGDIR;
+  private Path CORRUPTDIR;
+  private Path TABLEDIR;
+
+  private static final int NUM_WRITERS = 10;
+  private static final int ENTRIES = 10; // entries per writer per region
+
+  private static final String FILENAME_BEING_SPLIT = "testfile";
+  private static final TableName TABLE_NAME =
+      TableName.valueOf("t1");
+  private static final byte[] FAMILY = "f1".getBytes();
+  private static final byte[] QUALIFIER = "q1".getBytes();
+  private static final byte[] VALUE = "v1".getBytes();
+  private static final String WAL_FILE_PREFIX = "wal.dat.";
+  private static List<String> REGIONS = new ArrayList<String>();
+  private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
+  private static String ROBBER;
+  private static String ZOMBIE;
+  private static String [] GROUP = new String [] {"supergroup"};
+  private RecoveryMode mode;
+
+  static enum Corruptions {
+    INSERT_GARBAGE_ON_FIRST_LINE,
+    INSERT_GARBAGE_IN_THE_MIDDLE,
+    APPEND_GARBAGE,
+    TRUNCATE,
+    TRUNCATE_TRAILER
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf = TEST_UTIL.getConfiguration();
+    conf.setClass("hbase.regionserver.hlog.writer.impl",
+        InstrumentedLogWriter.class, Writer.class);
+    conf.setBoolean("dfs.support.broken.append", true);
+    conf.setBoolean("dfs.support.append", true);
+    // This is how you turn off shortcircuit read currently.  TODO: Fix.  Should read config.
+    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
+    // Create fake maping user to group and set it to the conf.
+    Map<String, String []> u2g_map = new HashMap<String, String []>(2);
+    ROBBER = User.getCurrent().getName() + "-robber";
+    ZOMBIE = User.getCurrent().getName() + "-zombie";
+    u2g_map.put(ROBBER, GROUP);
+    u2g_map.put(ZOMBIE, GROUP);
+    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
+    conf.setInt("dfs.heartbeat.interval", 1);
+    TEST_UTIL.startMiniDFSCluster(2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniDFSCluster();
+  }
+
+  @Rule
+  public TestName name = new TestName();
+  private WALFactory wals = null;
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Cleaning up cluster for new test.");
+    fs = TEST_UTIL.getDFSCluster().getFileSystem();
+    HBASEDIR = TEST_UTIL.createRootDir();
+    OLDLOGDIR = new Path(HBASEDIR, HConstants.HREGION_OLDLOGDIR_NAME);
+    CORRUPTDIR = new Path(HBASEDIR, HConstants.CORRUPT_DIR_NAME);
+    TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
+    REGIONS.clear();
+    Collections.addAll(REGIONS, "bbb", "ccc");
+    InstrumentedLogWriter.activateFailure = false;
+    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
+        RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
+    wals = new WALFactory(conf, null, name.getMethodName());
+    WALDIR = new Path(HBASEDIR, DefaultWALProvider.getWALDirectoryName(name.getMethodName()));
+    //fs.mkdirs(WALDIR);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      wals.close();
+    } catch(IOException exception) {
+      // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
+      LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" +
+          " you see a failure look here.");
+      LOG.debug("exception details", exception);
+    } finally {
+      wals = null;
+      fs.delete(HBASEDIR, true);
+    }
+  }
+
+  /**
+   * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
+   * Ensures we do not lose edits.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=300000)
+  public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
+    final AtomicLong counter = new AtomicLong(0);
+    AtomicBoolean stop = new AtomicBoolean(false);
+    // Region we'll write edits too and then later examine to make sure they all made it in.
+    final String region = REGIONS.get(0);
+    final int numWriters = 3;
+    Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
+    try {
+      long startCount = counter.get();
+      zombie.start();
+      // Wait till writer starts going.
+      while (startCount == counter.get()) Threads.sleep(1);
+      // Give it a second to write a few appends.
+      Threads.sleep(1000);
+      final Configuration conf2 = HBaseConfiguration.create(this.conf);
+      final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
+      int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
+        @Override
+        public Integer run() throws Exception {
+          StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR)
+              .append("):\n");
+          for (FileStatus status : fs.listStatus(WALDIR)) {
+            ls.append("\t").append(status.toString()).append("\n");
+          }
+          LOG.debug(ls);
+          LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
+          WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
+          LOG.info("Finished splitting out from under zombie.");
+          Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+          assertEquals("wrong number of split files for region", numWriters, logfiles.length);
+          int count = 0;
+          for (Path logfile: logfiles) {
+            count += countWAL(logfile);
+          }
+          return count;
+        }
+      });
+      LOG.info("zombie=" + counter.get() + ", robber=" + count);
+      assertTrue("The log file could have at most 1 extra log entry, but can't have less. " +
+              "Zombie could write " + counter.get() + " and logfile had only " + count,
+          counter.get() == count || counter.get() + 1 == count);
+    } finally {
+      stop.set(true);
+      zombie.interrupt();
+      Threads.threadDumpingIsAlive(zombie);
+    }
+  }
+
+  /**
+   * This thread will keep writing to a 'wal' file even after the split process has started.
+   * It simulates a region server that was considered dead but woke up and wrote some more to the
+   * last log entry. Does its writing as an alternate user in another filesystem instance to
+   * simulate better it being a regionserver.
+   */
+  class ZombieLastLogWriterRegionServer extends Thread {
+    final AtomicLong editsCount;
+    final AtomicBoolean stop;
+    final int numOfWriters;
+    /**
+     * Region to write edits for.
+     */
+    final String region;
+    final User user;
+
+    public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
+        final String region, final int writers)
+        throws IOException, InterruptedException {
+      super("ZombieLastLogWriterRegionServer");
+      setDaemon(true);
+      this.stop = stop;
+      this.editsCount = counter;
+      this.region = region;
+      this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
+      numOfWriters = writers;
+    }
+
+    @Override
+    public void run() {
+      try {
+        doWriting();
+      } catch (IOException e) {
+        LOG.warn(getName() + " Writer exiting " + e);
+      } catch (InterruptedException e) {
+        LOG.warn(getName() + " Writer exiting " + e);
+      }
+    }
+
+    private void doWriting() throws IOException, InterruptedException {
+      this.user.runAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          // Index of the WAL we want to keep open.  generateWALs will leave open the WAL whose
+          // index we supply here.
+          int walToKeepOpen = numOfWriters - 1;
+          // The below method writes numOfWriters files each with ENTRIES entries for a total of
+          // numOfWriters * ENTRIES added per column family in the region.
+          Writer writer = null;
+          try {
+            writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
+          } catch (IOException e1) {
+            throw new RuntimeException("Failed", e1);
+          }
+          // Update counter so has all edits written so far.
+          editsCount.addAndGet(numOfWriters * ENTRIES);
+          loop(writer);
+          // If we've been interruped, then things should have shifted out from under us.
+          // closing should error
+          try {
+            writer.close();
+            fail("Writing closing after parsing should give an error.");
+          } catch (IOException exception) {
+            LOG.debug("ignoring error when closing final writer.", exception);
+          }
+          return null;
+        }
+      });
+    }
+
+    private void loop(final Writer writer) {
+      byte [] regionBytes = Bytes.toBytes(this.region);
+      while (!stop.get()) {
+        try {
+          long seq = appendEntry(writer, TABLE_NAME, regionBytes,
+              ("r" + editsCount.get()).getBytes(), regionBytes, QUALIFIER, VALUE, 0);
+          long count = editsCount.incrementAndGet();
+          LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
+          try {
+            Thread.sleep(1);
+          } catch (InterruptedException e) {
+            //
+          }
+        } catch (IOException ex) {
+          LOG.error(getName() + " ex " + ex.toString());
+          if (ex instanceof RemoteException) {
+            LOG.error("Juliet: got RemoteException " + ex.getMessage() +
+                " while writing " + (editsCount.get() + 1));
+          } else {
+            LOG.error(getName() + " failed to write....at " + editsCount.get());
+            fail("Failed to write " + editsCount.get());
+          }
+          break;
+        } catch (Throwable t) {
+          LOG.error(getName() + " HOW? " + t);
+          LOG.debug("exception details", t);
+          break;
+        }
+      }
+      LOG.info(getName() + " Writer exiting");
+    }
+  }
+
+  /**
+   * @throws IOException
+   * @see https://issues.apache.org/jira/browse/HBASE-3020
+   */
+  @Test (timeout=300000)
+  public void testRecoveredEditsPathForMeta() throws IOException {
+    byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
+    Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
+    Path regiondir = new Path(tdir,
+        HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
+    fs.mkdirs(regiondir);
+    long now = System.currentTimeMillis();
+    Entry entry =
+        new Entry(new WALKey(encoded,
+            TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
+            new WALEdit());
+    Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR,
+        FILENAME_BEING_SPLIT);
+    String parentOfParent = p.getParent().getParent().getName();
+    assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
+  }
+
+  /**
+   * Test old recovered edits file doesn't break WALSplitter.
+   * This is useful in upgrading old instances.
+   */
+  @Test (timeout=300000)
+  public void testOldRecoveredEditsFileSidelined() throws IOException {
+    byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
+    Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
+    Path regiondir = new Path(tdir,
+        HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
+    fs.mkdirs(regiondir);
+    long now = System.currentTimeMillis();
+    Entry entry =
+        new Entry(new WALKey(encoded,
+            TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
+            new WALEdit());
+    Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+    assertEquals(parent.getName(), HConstants.RECOVERED_EDITS_DIR);
+    fs.createNewFile(parent); // create a recovered.edits file
+
+    Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR,
+        FILENAME_BEING_SPLIT);
+    String parentOfParent = p.getParent().getParent().getName();
+    assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
+    WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
+  }
+
+  private void useDifferentDFSClient() throws IOException {
+    // make fs act as a different client now
+    // initialize will create a new DFSClient with a new client ID
+    fs.initialize(fs.getUri(), conf);
+  }
+
+  @Test (timeout=300000)
+  public void testSplitPreservesEdits() throws IOException{
+    final String REGION = "region__1";
+    REGIONS.clear();
+    REGIONS.add(REGION);
+
+    generateWALs(1, 10, -1);
+    useDifferentDFSClient();
+    WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
+    Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+    assertEquals(1, splitLog.length);
+
+    assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
+  }
+
+  /**
+   * @param expectedEntries -1 to not assert
+   * @return the count across all regions
+   */
+  private int splitAndCount(final int expectedFiles, final int expectedEntries)
+      throws IOException {
+    useDifferentDFSClient();
+    WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+    int result = 0;
+    for (String region : REGIONS) {
+      Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+      assertEquals(expectedFiles, logfiles.length);
+      int count = 0;
+      for (Path logfile: logfiles) {
+        count += countWAL(logfile);
+      }
+      if (-1 != expectedEntries) {
+        assertEquals(expectedEntries, count);
+      }
+      result += count;
+    }
+    return result;
+  }
+
+  @Test (timeout=300000)
+  public void testEmptyLogFiles() throws IOException {
+    testEmptyLogFiles(true);
+  }
+
+  @Test (timeout=300000)
+  public void testEmptyOpenLogFiles() throws IOException {
+    testEmptyLogFiles(false);
+  }
+
+  private void testEmptyLogFiles(final boolean close) throws IOException {
+    // we won't create the hlog dir until getWAL got called, so
+    // make dir here when testing empty log file
+    fs.mkdirs(WALDIR);
+    injectEmptyFile(".empty", close);
+    generateWALs(Integer.MAX_VALUE);
+    injectEmptyFile("empty", close);
+    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty
+  }
+
+  @Test (timeout=300000)
+  public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
+    // generate logs but leave wal.dat.5 open.
+    generateWALs(5);
+    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
+  }
+
+  @Test (timeout=300000)
+  public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+    generateWALs(Integer.MAX_VALUE);
+    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
+        Corruptions.APPEND_GARBAGE, true);
+    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
+  }
+
+  @Test (timeout=300000)
+  public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+    generateWALs(Integer.MAX_VALUE);
+    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
+        Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
+    splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt
+  }
+
+  @Test (timeout=300000)
+  public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+    generateWALs(Integer.MAX_VALUE);
+    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
+        Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false);
+    // the entries in the original logs are alternating regions
+    // considering the sequence file header, the middle corruption should
+    // affect at least half of the entries
+    int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
+    int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
+    int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
+    assertTrue("The file up to the corrupted area hasn't been parsed",
+        REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount);
+  }
+
+  @Test (timeout=300000)
+  public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+    for (FaultySequenceFileLogReader.FailureType  failureType :
+        FaultySequenceFileLogReader.FailureType.values()) {
+      final Set<String> walDirContents = splitCorruptWALs(failureType);
+      final Set<String> archivedLogs = new HashSet<String>();
+      final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
+      for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
+        archived.append("\n\t").append(log.toString());
+        archivedLogs.add(log.getPath().getName());
+      }
+      LOG.debug(archived.toString());
+      assertEquals(failureType.name() + ": expected to find all of our wals corrupt.",
+          walDirContents, archivedLogs);
+    }
+  }
+
+  /**
+   * @return set of wal names present prior to split attempt.
+   * @throws IOException if the split process fails
+   */
+  private Set<String> splitCorruptWALs(final FaultySequenceFileLogReader.FailureType failureType)
+      throws IOException {
+    Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
+        Reader.class);
+    InstrumentedLogWriter.activateFailure = false;
+
+    try {
+      conf.setClass("hbase.regionserver.hlog.reader.impl",
+          FaultySequenceFileLogReader.class, Reader.class);
+      conf.set("faultysequencefilelogreader.failuretype", failureType.name());
+      // Clean up from previous tests or previous loop
+      try {
+        wals.shutdown();
+      } catch (IOException exception) {
+        // since we're splitting out from under the factory, we should expect some closing failures.
+        LOG.debug("Ignoring problem closing WALFactory.", exception);
+      }
+      wals.close();
+      try {
+        for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
+          fs.delete(log.getPath(), true);
+        }
+      } catch (FileNotFoundException exception) {
+        LOG.debug("no previous CORRUPTDIR to clean.");
+      }
+      // change to the faulty reader
+      wals = new WALFactory(conf, null, name.getMethodName());
+      generateWALs(-1);
+      // Our reader will render all of these files corrupt.
+      final Set<String> walDirContents = new HashSet<String>();
+      for (FileStatus status : fs.listStatus(WALDIR)) {
+        walDirContents.add(status.getPath().getName());
+      }
+      useDifferentDFSClient();
+      WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+      return walDirContents;
+    } finally {
+      conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
+          Reader.class);
+    }
+  }
+
+  @Test (timeout=300000, expected = IOException.class)
+  public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
+      throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, false);
+    splitCorruptWALs(FaultySequenceFileLogReader.FailureType.BEGINNING);
+  }
+
+  @Test (timeout=300000)
+  public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
+      throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, false);
+    try {
+      splitCorruptWALs(FaultySequenceFileLogReader.FailureType.BEGINNING);
+    } catch (IOException e) {
+      LOG.debug("split with 'skip errors' set to 'false' correctly threw");
+    }
+    assertEquals("if skip.errors is false all files should remain in place",
+        NUM_WRITERS, fs.listStatus(WALDIR).length);
+  }
+
+  private void ignoreCorruption(final Corruptions corruption, final int entryCount,
+      final int expectedCount) throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, false);
+
+    final String REGION = "region__1";
+    REGIONS.clear();
+    REGIONS.add(REGION);
+
+    Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
+    generateWALs(1, entryCount, -1);
+    corruptWAL(c1, corruption, true);
+
+    useDifferentDFSClient();
+    WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+
+    Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+    assertEquals(1, splitLog.length);
+
+    int actualCount = 0;
+    Reader in = wals.createReader(fs, splitLog[0]);
+    @SuppressWarnings("unused")
+    Entry entry;
+    while ((entry = in.next()) != null) ++actualCount;
+    assertEquals(expectedCount, actualCount);
+    in.close();
+
+    // should not have stored the EOF files as corrupt
+    FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
+    assertEquals(archivedLogs.length, 0);
+
+  }
+
+  @Test (timeout=300000)
+  public void testEOFisIgnored() throws IOException {
+    int entryCount = 10;
+    ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1);
+  }
+
+  @Test (timeout=300000)
+  public void testCorruptWALTrailer() throws IOException {
+    int entryCount = 10;
+    ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
+  }
+
+  @Test (timeout=300000)
+  public void testLogsGetArchivedAfterSplit() throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, false);
+    generateWALs(-1);
+    useDifferentDFSClient();
+    WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+    FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
+    assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
+  }
+
+  @Test (timeout=300000)
+  public void testSplit() throws IOException {
+    generateWALs(-1);
+    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
+  }
+
+  @Test (timeout=300000)
+  public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
+      throws IOException {
+    generateWALs(-1);
+    useDifferentDFSClient();
+    WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+    FileStatus [] statuses = null;
+    try {
+      statuses = fs.listStatus(WALDIR);
+      if (statuses != null) {
+        fail("Files left in log dir: " +
+            Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
+      }
+    } catch (FileNotFoundException e) {
+      // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
+    }
+  }
+
+  @Test(timeout=300000, expected = IOException.class)
+  public void testSplitWillFailIfWritingToRegionFails() throws Exception {
+    //leave 5th log open so we could append the "trap"
+    Writer writer = generateWALs(4);
+    useDifferentDFSClient();
+
+    String region = "break";
+    Path regiondir = new Path(TABLEDIR, region);
+    fs.mkdirs(regiondir);
+
+    InstrumentedLogWriter.activateFailure = false;
+    appendEntry(writer, TABLE_NAME, Bytes.toBytes(region),
+        ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
+    writer.close();
+
+    try {
+      InstrumentedLogWriter.activateFailure = true;
+      WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+    } catch (IOException e) {
+      assertTrue(e.getMessage().
+          contains("This exception is instrumented and should only be thrown for testing"));
+      throw e;
+    } finally {
+      InstrumentedLogWriter.activateFailure = false;
+    }
+  }
+
+  @Test (timeout=300000)
+  public void testSplitDeletedRegion() throws IOException {
+    REGIONS.clear();
+    String region = "region_that_splits";
+    REGIONS.add(region);
+
+    generateWALs(1);
+    useDifferentDFSClient();
+
+    Path regiondir = new Path(TABLEDIR, region);
+    fs.delete(regiondir, true);
+    WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+    assertFalse(fs.exists(regiondir));
+  }
+
+  @Test (timeout=300000)
+  public void testIOEOnOutputThread() throws Exception {
+    conf.setBoolean(HBASE_SKIP_ERRORS, false);
+
+    generateWALs(-1);
+    useDifferentDFSClient();
+    FileStatus[] logfiles = fs.listStatus(WALDIR);
+    assertTrue("There should be some log file",
+        logfiles != null && logfiles.length > 0);
+    // wals with no entries (like the one we don't use in the factory)
+    // won't cause a failure since nothing will ever be written.
+    // pick the largest one since it's most likely to have entries.
+    int largestLogFile = 0;
+    long largestSize = 0;
+    for (int i = 0; i < logfiles.length; i++) {
+      if (logfiles[i].getLen() > largestSize) {
+        largestLogFile = i;
+        largestSize = logfiles[i].getLen();
+      }
+    }
+    assertTrue("There should be some log greater than size 0.", 0 < largestSize);
+    // Set up a splitter that will throw an IOE on the output side
+    WALSplitter logSplitter = new WALSplitter(wals,
+        conf, HBASEDIR, fs, null, null, this.mode) {
+      @Override
+      protected Writer createWriter(Path logfile) throws IOException {
+        Writer mockWriter = Mockito.mock(Writer.class);
+        Mockito.doThrow(new IOException("Injected")).when(
+            mockWriter).append(Mockito.<Entry>any());
+        return mockWriter;
+      }
+    };
+    // Set up a background thread dumper.  Needs a thread to depend on and then we need to run
+    // the thread dumping in a background thread so it does not hold up the test.
+    final AtomicBoolean stop = new AtomicBoolean(false);
+    final Thread someOldThread = new Thread("Some-old-thread") {
+      @Override
+      public void run() {
+        while(!stop.get()) Threads.sleep(10);
+      }
+    };
+    someOldThread.setDaemon(true);
+    someOldThread.start();
+    final Thread t = new Thread("Background-thread-dumper") {
+      public void run() {
+        try {
+          Threads.threadDumpingIsAlive(someOldThread);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+    t.setDaemon(true);
+    t.start();
+    try {
+      logSplitter.splitLogFile(logfiles[largestLogFile], null);
+      fail("Didn't throw!");
+    } catch (IOException ioe) {
+      assertTrue(ioe.toString().contains("Injected"));
+    } finally {
+      // Setting this to true will turn off the background thread dumper.
+      stop.set(true);
+    }
+  }
+
+  /**
+   * @param spiedFs should be instrumented for failure.
+   */
+  private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
+    generateWALs(-1);
+    useDifferentDFSClient();
+
+    try {
+      WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
+      assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
+      assertFalse(fs.exists(WALDIR));
+    } catch (IOException e) {
+      fail("There shouldn't be any exception but: " + e.toString());
+    }
+  }
+
+  // Test for HBASE-3412
+  @Test (timeout=300000)
+  public void testMovedWALDuringRecovery() throws Exception {
+    // This partial mock will throw LEE for every file simulating
+    // files that were moved
+    FileSystem spiedFs = Mockito.spy(fs);
+    // The "File does not exist" part is very important,
+    // that's how it comes out of HDFS
+    Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
+        when(spiedFs).append(Mockito.<Path>any());
+    retryOverHdfsProblem(spiedFs);
+  }
+
+  @Test (timeout=300000)
+  public void testRetryOpenDuringRecovery() throws Exception {
+    FileSystem spiedFs = Mockito.spy(fs);
+    // The "Cannot obtain block length", "Could not obtain the last block",
+    // and "Blocklist for [^ ]* has changed.*" part is very important,
+    // that's how it comes out of HDFS. If HDFS changes the exception
+    // message, this test needs to be adjusted accordingly.
+    //
+    // When DFSClient tries to open a file, HDFS needs to locate
+    // the last block of the file and get its length. However, if the
+    // last block is under recovery, HDFS may have problem to obtain
+    // the block length, in which case, retry may help.
+    Mockito.doAnswer(new Answer<FSDataInputStream>() {
+      private final String[] errors = new String[] {
+          "Cannot obtain block length", "Could not obtain the last block",
+          "Blocklist for " + OLDLOGDIR + " has changed"};
+      private int count = 0;
+
+      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
+        if (count < 3) {
+          throw new IOException(errors[count++]);
+        }
+        return (FSDataInputStream)invocation.callRealMethod();
+      }
+    }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
+    retryOverHdfsProblem(spiedFs);
+  }
+
+  @Test (timeout=300000)
+  public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
+    generateWALs(1, 10, -1);
+    FileStatus logfile = fs.listStatus(WALDIR)[0];
+    useDifferentDFSClient();
+
+    final AtomicInteger count = new AtomicInteger();
+
+    CancelableProgressable localReporter
+        = new CancelableProgressable() {
+      @Override
+      public boolean progress() {
+        count.getAndIncrement();
+        return false;
+      }
+    };
+
+    FileSystem spiedFs = Mockito.spy(fs);
+    Mockito.doAnswer(new Answer<FSDataInputStream>() {
+      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
+        Thread.sleep(1500); // Sleep a while and wait report status invoked
+        return (FSDataInputStream)invocation.callRealMethod();
+      }
+    }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
+
+    try {
+      conf.setInt("hbase.splitlog.report.period", 1000);
+      boolean ret = WALSplitter.splitLogFile(
+          HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode, wals);
+      assertFalse("Log splitting should failed", ret);
+      assertTrue(count.get() > 0);
+    } catch (IOException e) {
+      fail("There shouldn't be any exception but: " + e.toString());
+    } finally {
+      // reset it back to its default value
+      conf.setInt("hbase.splitlog.report.period", 59000);
+    }
+  }
+
+  /**
+   * Test log split process with fake data and lots of edits to trigger threading
+   * issues.
+   */
+  @Test (timeout=300000)
+  public void testThreading() throws Exception {
+    doTestThreading(20000, 128*1024*1024, 0);
+  }
+
+  /**
+   * Test blocking behavior of the log split process if writers are writing slower
+   * than the reader is reading.
+   */
+  @Test (timeout=300000)
+  public void testThreadingSlowWriterSmallBuffer() throws Exception {
+    doTestThreading(200, 1024, 50);
+  }
+
+  /**
+   * Sets up a log splitter with a mock reader and writer. The mock reader generates
+   * a specified number of edits spread across 5 regions. The mock writer optionally
+   * sleeps for each edit it is fed.
+   * *
+   * After the split is complete, verifies that the statistics show the correct number
+   * of edits output into each region.
+   *
+   * @param numFakeEdits number of fake edits to push through pipeline
+   * @param bufferSize size of in-memory buffer
+   * @param writerSlowness writer threads will sleep this many ms per edit
+   */
+  private void doTestThreading(final int numFakeEdits,
+      final int bufferSize,
+      final int writerSlowness) throws Exception {
+
+    Configuration localConf = new Configuration(conf);
+    localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
+
+    // Create a fake log file (we'll override the reader to produce a stream of edits)
+    Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
+    FSDataOutputStream out = fs.create(logPath);
+    out.close();
+
+    // Make region dirs for our destination regions so the output doesn't get skipped
+    final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
+    makeRegionDirs(regions);
+
+    // Create a splitter that reads and writes the data without touching disk
+    WALSplitter logSplitter = new WALSplitter(wals,
+        localConf, HBASEDIR, fs, null, null, this.mode) {
+
+      /* Produce a mock writer that doesn't write anywhere */
+      @Override
+      protected Writer createWriter(Path logfile) throws IOException {
+        Writer mockWriter = Mockito.mock(Writer.class);
+        Mockito.doAnswer(new Answer<Void>() {
+          int expectedIndex = 0;
+
+          @Override
+          public Void answer(InvocationOnMock invocation) {
+            if (writerSlowness > 0) {
+              try {
+                Thread.sleep(writerSlowness);
+              } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+              }
+            }
+            Entry entry = (Entry) invocation.getArguments()[0];
+            WALEdit edit = entry.getEdit();
+            List<Cell> cells = edit.getCells();
+            assertEquals(1, cells.size());
+            Cell cell = cells.get(0);
+
+            // Check that the edits come in the right order.
+            assertEquals(expectedIndex, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(),
+                cell.getRowLength()));
+            expectedIndex++;
+            return null;
+          }
+        }).when(mockWriter).append(Mockito.<Entry>any());
+        return mockWriter;
+      }
+
+      /* Produce a mock reader that generates fake entries */
+      @Override
+      protected Reader getReader(Path curLogFile, CancelableProgressable reporter)
+          throws IOException {
+        Reader mockReader = Mockito.mock(Reader.class);
+        Mockito.doAnswer(new Answer<Entry>() {
+          int index = 0;
+
+          @Override
+          public Entry answer(InvocationOnMock invocation) throws Throwable {
+            if (index >= numFakeEdits) return null;
+
+            // Generate r0 through r4 in round robin fashion
+            int regionIdx = index % regions.size();
+            byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
+
+            Entry ret = createTestEntry(TABLE_NAME, region,
+                Bytes.toBytes((int)(index / regions.size())),
+                FAMILY, QUALIFIER, VALUE, index);
+            index++;
+            return ret;
+          }
+        }).when(mockReader).next();
+        return mockReader;
+      }
+    };
+
+    logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
+
+    // Verify number of written edits per region
+    Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
+    for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
+      LOG.info("Got " + entry.getValue() + " output edits for region " +
+          Bytes.toString(entry.getKey()));
+      assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
+    }
+    assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
+  }
+
+  // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
+  @Test (timeout=300000)
+  public void testSplitLogFileDeletedRegionDir() throws IOException {
+    LOG.info("testSplitLogFileDeletedRegionDir");
+    final String REGION = "region__1";
+    REGIONS.clear();
+    REGIONS.add(REGION);
+
+    generateWALs(1, 10, -1);
+    useDifferentDFSClient();
+
+    Path regiondir = new Path(TABLEDIR, REGION);
+    LOG.info("Region directory is" + regiondir);
+    fs.delete(regiondir, true);
+    WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+    assertFalse(fs.exists(regiondir));
+  }
+
+  @Test (timeout=300000)
+  public void testSplitLogFileEmpty() throws IOException {
+    LOG.info("testSplitLogFileEmpty");
+    // we won't create the hlog dir until getWAL got called, so
+    // make dir here when testing empty log file
+    fs.mkdirs(WALDIR);
+    injectEmptyFile(".empty", true);
+    useDifferentDFSClient();
+
+    WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+    Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
+    assertFalse(fs.exists(tdir));
+
+    assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
+  }
+
+  @Test (timeout=300000)
+  public void testSplitLogFileMultipleRegions() throws IOException {
+    LOG.info("testSplitLogFileMultipleRegions");
+    generateWALs(1, 10, -1);
+    splitAndCount(1, 10);
+  }
+
+  @Test (timeout=300000)
+  public void testSplitLogFileFirstLineCorruptionLog()
+      throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+    generateWALs(1, 10, -1);
+    FileStatus logfile = fs.listStatus(WALDIR)[0];
+
+    corruptWAL(logfile.getPath(),
+        Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
+
+    useDifferentDFSClient();
+    WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+
+    final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
+        "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
+    assertEquals(1, fs.listStatus(corruptDir).length);
+  }
+
+  /**
+   * @throws IOException
+   * @see https://issues.apache.org/jira/browse/HBASE-4862
+   */
+  @Test (timeout=300000)
+  public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
+    LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
+    // Generate wals for our destination region
+    String regionName = "r0";
+    final Path regiondir = new Path(TABLEDIR, regionName);
+    REGIONS.clear();
+    REGIONS.add(regionName);
+    generateWALs(-1);
+
+    wals.getWAL(Bytes.toBytes(regionName), null);
+    FileStatus[] logfiles = fs.listStatus(WALDIR);
+    assertTrue("There should be some log file",
+        logfiles != null && logfiles.length > 0);
+
+    WALSplitter logSplitter = new WALSplitter(wals,
+        conf, HBASEDIR, fs, null, null, this.mode) {
+      @Override
+      protected Writer createWriter(Path logfile)
+          throws IOException {
+        Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile);
+        // After creating writer, simulate region's
+        // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
+        // region and delete them, excluding files with '.temp' suffix.
+        NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
+        if (files != null && !files.isEmpty()) {
+          for (Path file : files) {
+            if (!this.fs.delete(file, false)) {
+              LOG.error("Failed delete of " + file);
+            } else {
+              LOG.debug("Deleted recovered.edits file=" + file);
+            }
+          }
+        }
+        return writer;
+      }
+    };
+    try{
+      logSplitter.splitLogFile(logfiles[0], null);
+    } catch (IOException e) {
+      LOG.info(e);
+      fail("Throws IOException when spliting "
+          + "log, it is most likely because writing file does not "
+          + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
+    }
+    if (fs.exists(CORRUPTDIR)) {
+      if (fs.listStatus(CORRUPTDIR).length > 0) {
+        fail("There are some corrupt logs, "
+            + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
+      }
+    }
+  }
+
+  private Writer generateWALs(int leaveOpen) throws IOException {
+    return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen);
+  }
+
+  private void makeRegionDirs(List<String> regions) throws IOException {
+    for (String region : regions) {
+      LOG.debug("Creating dir for region " + region);
+      fs.mkdirs(new Path(TABLEDIR, region));
+    }
+  }
+
+  /**
+   * @param leaveOpen index to leave un-closed. -1 to close all.
+   * @return the writer that's still open, or null if all were closed.
+   */
+  private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
+    makeRegionDirs(REGIONS);
+    fs.mkdirs(WALDIR);
+    Writer [] ws = new Writer[writers];
+    int seq = 0;
+    for (int i = 0; i < writers; i++) {
+      ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
+      for (int j = 0; j < entries; j++) {
+        int prefix = 0;
+        for (String region : REGIONS) {
+          String row_key = region + prefix++ + i + j;
+          appendEntry(ws[i], TABLE_NAME, region.getBytes(), row_key.getBytes(), FAMILY, QUALIFIER,
+              VALUE, seq++);
+        }
+      }
+      if (i != leaveOpen) {
+        ws[i].close();
+        LOG.info("Closing writer " + i);
+      }
+    }
+    if (leaveOpen < 0 || leaveOpen >= writers) {
+      return null;
+    }
+    return ws[leaveOpen];
+  }
+
+  private Path[] getLogForRegion(Path rootdir, TableName table, String region)
+      throws IOException {
+    Path tdir = FSUtils.getTableDir(rootdir, table);
+    @SuppressWarnings("deprecation")
+    Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
+        Bytes.toString(region.getBytes())));
+    FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
+      @Override
+      public boolean accept(Path p) {
+        if (WALSplitter.isSequenceIdFile(p)) {
+          return false;
+        }
+        return true;
+      }
+    });
+    Path[] paths = new Path[files.length];
+    for (int i = 0; i < files.length; i++) {
+      paths[i] = files[i].getPath();
+    }
+    return paths;
+  }
+
+  private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
+    FSDataOutputStream out;
+    int fileSize = (int) fs.listStatus(path)[0].getLen();
+
+    FSDataInputStream in = fs.open(path);
+    byte[] corrupted_bytes = new byte[fileSize];
+    in.readFully(0, corrupted_bytes, 0, fileSize);
+    in.close();
+
+    switch (corruption) {
+    case APPEND_GARBAGE:
+      fs.delete(path, false);
+      out = fs.create(path);
+      out.write(corrupted_bytes);
+      out.write("-----".getBytes());
+      closeOrFlush(close, out);
+      break;
+
+    case INSERT_GARBAGE_ON_FIRST_LINE:
+      fs.delete(path, false);
+      out = fs.create(path);
+      out.write(0);
+      out.write(corrupted_bytes);
+      closeOrFlush(close, out);
+      break;
+
+    case INSERT_GARBAGE_IN_THE_MIDDLE:
+      fs.delete(path, false);
+      out = fs.create(path);
+      int middle = (int) Math.floor(corrupted_bytes.length / 2);
+      out.write(corrupted_bytes, 0, middle);
+      out.write(0);
+      out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
+      closeOrFlush(close, out);
+      break;
+
+    case TRUNCATE:
+      fs.delete(path, false);
+      out = fs.create(path);
+      out.write(corrupted_bytes, 0, fileSize
+          - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
+      closeOrFlush(close, out);
+      break;
+
+    case TRUNCATE_TRAILER:
+      fs.delete(path, false);
+      out = fs.create(path);
+      out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
+      closeOrFlush(close, out);
+      break;
+    }
+  }
+
+  private void closeOrFlush(boolean close, FSDataOutputStream out)
+      throws IOException {
+    if (close) {
+      out.close();
+    } else {
+      Method syncMethod = null;
+      try {
+        syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
+      } catch (NoSuchMethodException e) {
+        try {
+          syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
+        } catch (NoSuchMethodException ex) {
+          throw new IOException("This version of Hadoop supports " +
+              "neither Syncable.sync() nor Syncable.hflush().");
+        }
+      }
+      try {
+        syncMethod.invoke(out, new Object[]{});
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      // Not in 0out.hflush();
+    }
+  }
+
+  private int countWAL(Path log) throws IOException {
+    int count = 0;
+    Reader in = wals.createReader(fs, log);
+    while (in.next() != null) {
+      count++;
+    }
+    in.close();
+    return count;
+  }
+
+  public static long appendEntry(Writer writer, TableName table, byte[] region,
+      byte[] row, byte[] family, byte[] qualifier,
+      byte[] value, long seq)
+      throws IOException {
+    LOG.info(Thread.currentThread().getName() + " append");
+    writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
+    LOG.info(Thread.currentThread().getName() + " sync");
+    writer.sync();
+    return seq;
+  }
+
+  private static Entry createTestEntry(
+      TableName table, byte[] region,
+      byte[] row, byte[] family, byte[] qualifier,
+      byte[] value, long seq) {
+    long time = System.nanoTime();
+    WALEdit edit = new WALEdit();
+    seq++;
+    edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
+    return new Entry(new WALKey(region, table, seq, time,
+        HConstants.DEFAULT_CLUSTER_ID), edit);
+  }
+
+  private void injectEmptyFile(String suffix, boolean closeFile)
+      throws IOException {
+    Writer writer = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix),
+        conf);
+    if (closeFile) writer.close();
+  }
+
+  private boolean logsAreEqual(Path p1, Path p2) throws IOException {
+    Reader in1, in2;
+    in1 = wals.createReader(fs, p1);
+    in2 = wals.createReader(fs, p2);
+    Entry entry1;
+    Entry entry2;
+    while ((entry1 = in1.next()) != null) {
+      entry2 = in2.next();
+      if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
+          (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
+        return false;
+      }
+    }
+    in1.close();
+    in2.close();
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitCompressed.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitCompressed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitCompressed.java
new file mode 100644
index 0000000..f47951a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitCompressed.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.wal;
+
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({RegionServerTests.class, LargeTests.class})
+public class TestWALSplitCompressed extends TestWALSplit {
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TestWALSplit.setUpBeforeClass();
+    TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java b/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java
new file mode 100644
index 0000000..3f4af05
--- /dev/null
+++ b/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.jruby.embed.PathType;
+import org.junit.Test;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+@Category({ ClientTests.class, LargeTests.class })
+public class TestReplicationShell extends AbstractTestShell {
+  @Ignore ("Disabled because hangs on occasion.. about 10% of the time") @Test
+  public void testRunShellTests() throws IOException {
+    System.setProperty("shell.test.include", "replication_admin_test.rb");
+    // Start all ruby tests
+    jruby.runScriptlet(PathType.ABSOLUTE, "src/test/ruby/tests_runner.rb");
+  }
+}
\ No newline at end of file


[2/3] hbase git commit: HBASE-15302 Reenable the other tests disabled by HBASE-14678

Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
new file mode 100644
index 0000000..c5728cf
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -0,0 +1,1799 @@
+/**
+ *
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.NonceGenerator;
+import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
+import org.apache.hadoop.hbase.exceptions.OperationConflictException;
+import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MasterTests.class, LargeTests.class})
+@SuppressWarnings("deprecation")
+public class TestDistributedLogSplitting {
+  private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
+  static {
+    // Uncomment the following line if more verbosity is needed for
+    // debugging (see HBASE-12285 for details).
+    //Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
+
+    // test ThreeRSAbort fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. this
+    // turns it off for this test.  TODO: Figure out why scr breaks recovery.
+    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
+
+  }
+
+  // Start a cluster with 2 masters and 6 regionservers
+  static final int NUM_MASTERS = 2;
+  static final int NUM_RS = 5;
+
+  MiniHBaseCluster cluster;
+  HMaster master;
+  Configuration conf;
+  static Configuration originalConf;
+  static HBaseTestingUtility TEST_UTIL;
+  static MiniDFSCluster dfsCluster;
+  static MiniZooKeeperCluster zkCluster;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility(HBaseConfiguration.create());
+    dfsCluster = TEST_UTIL.startMiniDFSCluster(1);
+    zkCluster = TEST_UTIL.startMiniZKCluster();
+    originalConf = TEST_UTIL.getConfiguration();
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    TEST_UTIL.shutdownMiniZKCluster();
+    TEST_UTIL.shutdownMiniDFSCluster();
+    TEST_UTIL.shutdownMiniHBaseCluster();
+  }
+
+  private void startCluster(int num_rs) throws Exception {
+    SplitLogCounters.resetCounters();
+    LOG.info("Starting cluster");
+    conf.getLong("hbase.splitlog.max.resubmit", 0);
+    // Make the failure test faster
+    conf.setInt("zookeeper.recovery.retry", 0);
+    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
+    conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
+    conf.setInt("hbase.regionserver.wal.max.splitters", 3);
+    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
+    TEST_UTIL.shutdownMiniHBaseCluster();
+    TEST_UTIL = new HBaseTestingUtility(conf);
+    TEST_UTIL.setDFSCluster(dfsCluster);
+    TEST_UTIL.setZkCluster(zkCluster);
+    TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, num_rs);
+    cluster = TEST_UTIL.getHBaseCluster();
+    LOG.info("Waiting for active/ready master");
+    cluster.waitForActiveAndReadyMaster();
+    master = cluster.getMaster();
+    while (cluster.getLiveRegionServerThreads().size() < num_rs) {
+      Threads.sleep(10);
+    }
+  }
+
+  @Before
+  public void before() throws Exception {
+    // refresh configuration
+    conf = HBaseConfiguration.create(originalConf);
+  }
+
+  @After
+  public void after() throws Exception {
+    try {
+      if (TEST_UTIL.getHBaseCluster() != null) {
+        for (MasterThread mt : TEST_UTIL.getHBaseCluster().getLiveMasterThreads()) {
+          mt.getMaster().abort("closing...", null);
+        }
+      }
+      TEST_UTIL.shutdownMiniHBaseCluster();
+    } finally {
+      TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true);
+      ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
+    }
+  }
+
+  @Ignore("DLR is broken by HBASE-12751") @Test (timeout=300000)
+  public void testRecoveredEdits() throws Exception {
+    LOG.info("testRecoveredEdits");
+    conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+    startCluster(NUM_RS);
+
+    final int NUM_LOG_LINES = 1000;
+    final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+    // turn off load balancing to prevent regions from moving around otherwise
+    // they will consume recovered.edits
+    master.balanceSwitch(false);
+    FileSystem fs = master.getMasterFileSystem().getFileSystem();
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+
+    Path rootdir = FSUtils.getRootDir(conf);
+
+    Table t = installTable(new ZooKeeperWatcher(conf, "table-creation", null),
+        "table", "family", 40);
+    try {
+      TableName table = t.getName();
+      List<HRegionInfo> regions = null;
+      HRegionServer hrs = null;
+      for (int i = 0; i < NUM_RS; i++) {
+        boolean foundRs = false;
+        hrs = rsts.get(i).getRegionServer();
+        regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+        for (HRegionInfo region : regions) {
+          if (region.getTable().getNameAsString().equalsIgnoreCase("table")) {
+            foundRs = true;
+            break;
+          }
+        }
+        if (foundRs) break;
+      }
+      final Path logDir = new Path(rootdir, DefaultWALProvider.getWALDirectoryName(hrs
+          .getServerName().toString()));
+
+      LOG.info("#regions = " + regions.size());
+      Iterator<HRegionInfo> it = regions.iterator();
+      while (it.hasNext()) {
+        HRegionInfo region = it.next();
+        if (region.getTable().getNamespaceAsString()
+            .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
+          it.remove();
+        }
+      }
+
+      makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
+
+      slm.splitLogDistributed(logDir);
+
+      int count = 0;
+      for (HRegionInfo hri : regions) {
+
+        Path tdir = FSUtils.getTableDir(rootdir, table);
+        Path editsdir =
+            WALSplitter.getRegionDirRecoveredEditsDir(
+                HRegion.getRegionDir(tdir, hri.getEncodedName()));
+        LOG.debug("checking edits dir " + editsdir);
+        FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
+          @Override
+          public boolean accept(Path p) {
+            if (WALSplitter.isSequenceIdFile(p)) {
+              return false;
+            }
+            return true;
+          }
+        });
+        assertTrue(
+            "edits dir should have more than a single file in it. instead has " + files.length,
+            files.length > 1);
+        for (int i = 0; i < files.length; i++) {
+          int c = countWAL(files[i].getPath(), fs, conf);
+          count += c;
+        }
+        LOG.info(count + " edits in " + files.length + " recovered edits files.");
+      }
+
+      // check that the log file is moved
+      assertFalse(fs.exists(logDir));
+
+      assertEquals(NUM_LOG_LINES, count);
+    } finally {
+      if (t != null) t.close();
+    }
+  }
+
+  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
+  public void testLogReplayWithNonMetaRSDown() throws Exception {
+    LOG.info("testLogReplayWithNonMetaRSDown");
+    conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    startCluster(NUM_RS);
+    final int NUM_REGIONS_TO_CREATE = 40;
+    final int NUM_LOG_LINES = 1000;
+    // turn off load balancing to prevent regions from moving around otherwise
+    // they will consume recovered.edits
+    master.balanceSwitch(false);
+
+    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+    try {
+      HRegionServer hrs = findRSToKill(false, "table");
+      List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+      makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
+
+      // wait for abort completes
+      this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
+    } finally {
+      if (ht != null) ht.close();
+      if (zkw != null) zkw.close();
+    }
+  }
+
+  private static class NonceGeneratorWithDups extends PerClientRandomNonceGenerator {
+    private boolean isDups = false;
+    private LinkedList<Long> nonces = new LinkedList<Long>();
+
+    public void startDups() {
+      isDups = true;
+    }
+
+    @Override
+    public long newNonce() {
+      long nonce = isDups ? nonces.removeFirst() : super.newNonce();
+      if (!isDups) {
+        nonces.add(nonce);
+      }
+      return nonce;
+    }
+  }
+
+  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
+  public void testNonceRecovery() throws Exception {
+    LOG.info("testNonceRecovery");
+    final String TABLE_NAME = "table";
+    final String FAMILY_NAME = "family";
+    final int NUM_REGIONS_TO_CREATE = 40;
+
+    conf.setLong("hbase.regionserver.hlog.blocksize", 100*1024);
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    startCluster(NUM_RS);
+    master.balanceSwitch(false);
+
+    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+    Table ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE);
+    NonceGeneratorWithDups ng = new NonceGeneratorWithDups();
+    NonceGenerator oldNg =
+        ConnectionUtils.injectNonceGeneratorForTesting(
+            (ClusterConnection)TEST_UTIL.getConnection(), ng);
+
+    try {
+      List<Increment> reqs = new ArrayList<Increment>();
+      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
+        HRegionServer hrs = rst.getRegionServer();
+        List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+        for (HRegionInfo hri : hris) {
+          if (TABLE_NAME.equalsIgnoreCase(hri.getTable().getNameAsString())) {
+            byte[] key = hri.getStartKey();
+            if (key == null || key.length == 0) {
+              key = Bytes.copy(hri.getEndKey());
+              --(key[key.length - 1]);
+            }
+            Increment incr = new Increment(key);
+            incr.addColumn(Bytes.toBytes(FAMILY_NAME), Bytes.toBytes("q"), 1);
+            ht.increment(incr);
+            reqs.add(incr);
+          }
+        }
+      }
+
+      HRegionServer hrs = findRSToKill(false, "table");
+      abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
+      ng.startDups();
+      for (Increment incr : reqs) {
+        try {
+          ht.increment(incr);
+          fail("should have thrown");
+        } catch (OperationConflictException ope) {
+          LOG.debug("Caught as expected: " + ope.getMessage());
+        }
+      }
+    } finally {
+      ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)
+          TEST_UTIL.getConnection(), oldNg);
+      if (ht != null) ht.close();
+      if (zkw != null) zkw.close();
+    }
+  }
+
+  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
+  public void testLogReplayWithMetaRSDown() throws Exception {
+    LOG.info("testRecoveredEditsReplayWithMetaRSDown");
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    startCluster(NUM_RS);
+    final int NUM_REGIONS_TO_CREATE = 40;
+    final int NUM_LOG_LINES = 1000;
+    // turn off load balancing to prevent regions from moving around otherwise
+    // they will consume recovered.edits
+    master.balanceSwitch(false);
+
+    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+    try {
+      HRegionServer hrs = findRSToKill(true, "table");
+      List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+      makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
+
+      this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
+    } finally {
+      if (ht != null) ht.close();
+      if (zkw != null) zkw.close();
+    }
+  }
+
+  private void abortRSAndVerifyRecovery(HRegionServer hrs, Table ht, final ZooKeeperWatcher zkw,
+      final int numRegions, final int numofLines) throws Exception {
+
+    abortRSAndWaitForRecovery(hrs, zkw, numRegions);
+    assertEquals(numofLines, TEST_UTIL.countRows(ht));
+  }
+
+  private void abortRSAndWaitForRecovery(HRegionServer hrs, final ZooKeeperWatcher zkw,
+      final int numRegions) throws Exception {
+    final MiniHBaseCluster tmpCluster = this.cluster;
+
+    // abort RS
+    LOG.info("Aborting region server: " + hrs.getServerName());
+    hrs.abort("testing");
+
+    // wait for abort completes
+    TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return (tmpCluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
+      }
+    });
+
+    // wait for regions come online
+    TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return (HBaseTestingUtility.getAllOnlineRegions(tmpCluster).size()
+            >= (numRegions + 1));
+      }
+    });
+
+    // wait for all regions are fully recovered
+    TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
+            zkw.recoveringRegionsZNode, false);
+        return (recoveringRegions != null && recoveringRegions.size() == 0);
+      }
+    });
+  }
+
+  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
+  public void testMasterStartsUpWithLogSplittingWork() throws Exception {
+    LOG.info("testMasterStartsUpWithLogSplittingWork");
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
+    startCluster(NUM_RS);
+
+    final int NUM_REGIONS_TO_CREATE = 40;
+    final int NUM_LOG_LINES = 1000;
+    // turn off load balancing to prevent regions from moving around otherwise
+    // they will consume recovered.edits
+    master.balanceSwitch(false);
+
+    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+    try {
+      HRegionServer hrs = findRSToKill(false, "table");
+      List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+      makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
+
+      // abort master
+      abortMaster(cluster);
+
+      // abort RS
+      LOG.info("Aborting region server: " + hrs.getServerName());
+      hrs.abort("testing");
+
+      // wait for abort completes
+      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
+        }
+      });
+
+      Thread.sleep(2000);
+      LOG.info("Current Open Regions:"
+          + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
+
+      // wait for abort completes
+      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
+              >= (NUM_REGIONS_TO_CREATE + 1));
+        }
+      });
+
+      LOG.info("Current Open Regions After Master Node Starts Up:"
+          + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
+
+      assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
+    } finally {
+      if (ht != null) ht.close();
+      if (zkw != null) zkw.close();
+    }
+  }
+
+  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
+  public void testMasterStartsUpWithLogReplayWork() throws Exception {
+    LOG.info("testMasterStartsUpWithLogReplayWork");
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
+    startCluster(NUM_RS);
+
+    final int NUM_REGIONS_TO_CREATE = 40;
+    final int NUM_LOG_LINES = 1000;
+    // turn off load balancing to prevent regions from moving around otherwise
+    // they will consume recovered.edits
+    master.balanceSwitch(false);
+
+    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+    try {
+      HRegionServer hrs = findRSToKill(false, "table");
+      List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+      makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
+
+      // abort master
+      abortMaster(cluster);
+
+      // abort RS
+      LOG.info("Aborting region server: " + hrs.getServerName());
+      hrs.abort("testing");
+
+      // wait for the RS dies
+      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
+        }
+      });
+
+      Thread.sleep(2000);
+      LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
+
+      // wait for all regions are fully recovered
+      TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
+              zkw.recoveringRegionsZNode, false);
+          boolean done = recoveringRegions != null && recoveringRegions.size() == 0;
+          if (!done) {
+            LOG.info("Recovering regions: " + recoveringRegions);
+          }
+          return done;
+        }
+      });
+
+      LOG.info("Current Open Regions After Master Node Starts Up:"
+          + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
+
+      assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
+    } finally {
+      if (ht != null) ht.close();
+      if (zkw != null) zkw.close();
+    }
+  }
+
+
+  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
+  public void testLogReplayTwoSequentialRSDown() throws Exception {
+    LOG.info("testRecoveredEditsReplayTwoSequentialRSDown");
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    startCluster(NUM_RS);
+    final int NUM_REGIONS_TO_CREATE = 40;
+    final int NUM_LOG_LINES = 1000;
+    // turn off load balancing to prevent regions from moving around otherwise
+    // they will consume recovered.edits
+    master.balanceSwitch(false);
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+    try {
+      List<HRegionInfo> regions = null;
+      HRegionServer hrs1 = findRSToKill(false, "table");
+      regions = ProtobufUtil.getOnlineRegions(hrs1.getRSRpcServices());
+
+      makeWAL(hrs1, regions, "table", "family", NUM_LOG_LINES, 100);
+
+      // abort RS1
+      LOG.info("Aborting region server: " + hrs1.getServerName());
+      hrs1.abort("testing");
+
+      // wait for abort completes
+      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
+        }
+      });
+
+      // wait for regions come online
+      TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
+              >= (NUM_REGIONS_TO_CREATE + 1));
+        }
+      });
+
+      // sleep a little bit in order to interrupt recovering in the middle
+      Thread.sleep(300);
+      // abort second region server
+      rsts = cluster.getLiveRegionServerThreads();
+      HRegionServer hrs2 = rsts.get(0).getRegionServer();
+      LOG.info("Aborting one more region server: " + hrs2.getServerName());
+      hrs2.abort("testing");
+
+      // wait for abort completes
+      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 2));
+        }
+      });
+
+      // wait for regions come online
+      TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
+              >= (NUM_REGIONS_TO_CREATE + 1));
+        }
+      });
+
+      // wait for all regions are fully recovered
+      TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
+              zkw.recoveringRegionsZNode, false);
+          return (recoveringRegions != null && recoveringRegions.size() == 0);
+        }
+      });
+
+      assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
+    } finally {
+      if (ht != null) ht.close();
+      if (zkw != null) zkw.close();
+    }
+  }
+
+  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
+  public void testMarkRegionsRecoveringInZK() throws Exception {
+    LOG.info("testMarkRegionsRecoveringInZK");
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    startCluster(NUM_RS);
+    master.balanceSwitch(false);
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    final ZooKeeperWatcher zkw = master.getZooKeeper();
+    Table ht = installTable(zkw, "table", "family", 40);
+    try {
+      final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+
+      Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
+      HRegionInfo region = null;
+      HRegionServer hrs = null;
+      ServerName firstFailedServer = null;
+      ServerName secondFailedServer = null;
+      for (int i = 0; i < NUM_RS; i++) {
+        hrs = rsts.get(i).getRegionServer();
+        List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+        if (regions.isEmpty()) continue;
+        region = regions.get(0);
+        regionSet.add(region);
+        firstFailedServer = hrs.getServerName();
+        secondFailedServer = rsts.get((i + 1) % NUM_RS).getRegionServer().getServerName();
+        break;
+      }
+
+      slm.markRegionsRecovering(firstFailedServer, regionSet);
+      slm.markRegionsRecovering(secondFailedServer, regionSet);
+
+      List<String> recoveringRegions = ZKUtil.listChildrenNoWatch(zkw,
+          ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName()));
+
+      assertEquals(recoveringRegions.size(), 2);
+
+      // wait for splitLogWorker to mark them up because there is no WAL files recorded in ZK
+      final HRegionServer tmphrs = hrs;
+      TEST_UTIL.waitFor(60000, 1000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return (tmphrs.getRecoveringRegions().size() == 0);
+        }
+      });
+    } finally {
+      if (ht != null) ht.close();
+      if (zkw != null) zkw.close();
+    }
+  }
+
+  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
+  public void testReplayCmd() throws Exception {
+    LOG.info("testReplayCmd");
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    startCluster(NUM_RS);
+    final int NUM_REGIONS_TO_CREATE = 40;
+    // turn off load balancing to prevent regions from moving around otherwise
+    // they will consume recovered.edits
+    master.balanceSwitch(false);
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+    try {
+      List<HRegionInfo> regions = null;
+      HRegionServer hrs = null;
+      for (int i = 0; i < NUM_RS; i++) {
+        boolean isCarryingMeta = false;
+        hrs = rsts.get(i).getRegionServer();
+        regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+        for (HRegionInfo region : regions) {
+          if (region.isMetaRegion()) {
+            isCarryingMeta = true;
+            break;
+          }
+        }
+        if (isCarryingMeta) {
+          continue;
+        }
+        if (regions.size() > 0) break;
+      }
+
+      this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1"));
+      String originalCheckSum = TEST_UTIL.checksumRows(ht);
+
+      // abort RA and trigger replay
+      abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
+
+      assertEquals("Data should remain after reopening of regions", originalCheckSum,
+          TEST_UTIL.checksumRows(ht));
+    } finally {
+      if (ht != null) ht.close();
+      if (zkw != null) zkw.close();
+    }
+  }
+
+  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
+  public void testLogReplayForDisablingTable() throws Exception {
+    LOG.info("testLogReplayForDisablingTable");
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    startCluster(NUM_RS);
+    final int NUM_REGIONS_TO_CREATE = 40;
+    final int NUM_LOG_LINES = 1000;
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+    Table disablingHT = installTable(zkw, "disableTable", "family", NUM_REGIONS_TO_CREATE);
+    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE, NUM_REGIONS_TO_CREATE);
+    try {
+      // turn off load balancing to prevent regions from moving around otherwise
+      // they will consume recovered.edits
+      master.balanceSwitch(false);
+
+      List<HRegionInfo> regions = null;
+      HRegionServer hrs = null;
+      boolean hasRegionsForBothTables = false;
+      String tableName = null;
+      for (int i = 0; i < NUM_RS; i++) {
+        tableName = null;
+        hasRegionsForBothTables = false;
+        boolean isCarryingSystem = false;
+        hrs = rsts.get(i).getRegionServer();
+        regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+        for (HRegionInfo region : regions) {
+          if (region.getTable().isSystemTable()) {
+            isCarryingSystem = true;
+            break;
+          }
+          if (tableName != null &&
+              !tableName.equalsIgnoreCase(region.getTable().getNameAsString())) {
+            // make sure that we find a RS has online regions for both "table" and "disableTable"
+            hasRegionsForBothTables = true;
+            break;
+          } else if (tableName == null) {
+            tableName = region.getTable().getNameAsString();
+          }
+        }
+        if (isCarryingSystem) {
+          continue;
+        }
+        if (hasRegionsForBothTables) {
+          break;
+        }
+      }
+
+      // make sure we found a good RS
+      Assert.assertTrue(hasRegionsForBothTables);
+
+      LOG.info("#regions = " + regions.size());
+      Iterator<HRegionInfo> it = regions.iterator();
+      while (it.hasNext()) {
+        HRegionInfo region = it.next();
+        if (region.isMetaTable()) {
+          it.remove();
+        }
+      }
+      makeWAL(hrs, regions, "disableTable", "family", NUM_LOG_LINES, 100, false);
+      makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
+
+      LOG.info("Disabling table\n");
+      TEST_UTIL.getHBaseAdmin().disableTable(TableName.valueOf("disableTable"));
+      TEST_UTIL.waitTableDisabled(TableName.valueOf("disableTable").getName());
+
+      // abort RS
+      LOG.info("Aborting region server: " + hrs.getServerName());
+      hrs.abort("testing");
+
+      // wait for abort completes
+      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
+        }
+      });
+
+      // wait for regions come online
+      TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
+              >= (NUM_REGIONS_TO_CREATE + 1));
+        }
+      });
+
+      // wait for all regions are fully recovered
+      TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
+              zkw.recoveringRegionsZNode, false);
+          ServerManager serverManager = master.getServerManager();
+          return (!serverManager.areDeadServersInProgress() &&
+              recoveringRegions != null && recoveringRegions.size() == 0);
+        }
+      });
+
+      int count = 0;
+      FileSystem fs = master.getMasterFileSystem().getFileSystem();
+      Path rootdir = FSUtils.getRootDir(conf);
+      Path tdir = FSUtils.getTableDir(rootdir, TableName.valueOf("disableTable"));
+      for (HRegionInfo hri : regions) {
+        Path editsdir =
+            WALSplitter.getRegionDirRecoveredEditsDir(
+                HRegion.getRegionDir(tdir, hri.getEncodedName()));
+        LOG.debug("checking edits dir " + editsdir);
+        if(!fs.exists(editsdir)) continue;
+        FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
+          @Override
+          public boolean accept(Path p) {
+            if (WALSplitter.isSequenceIdFile(p)) {
+              return false;
+            }
+            return true;
+          }
+        });
+        if(files != null) {
+          for(FileStatus file : files) {
+            int c = countWAL(file.getPath(), fs, conf);
+            count += c;
+            LOG.info(c + " edits in " + file.getPath());
+          }
+        }
+      }
+
+      LOG.info("Verify edits in recovered.edits files");
+      assertEquals(NUM_LOG_LINES, count);
+      LOG.info("Verify replayed edits");
+      assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
+
+      // clean up
+      for (HRegionInfo hri : regions) {
+        Path editsdir =
+            WALSplitter.getRegionDirRecoveredEditsDir(
+                HRegion.getRegionDir(tdir, hri.getEncodedName()));
+        fs.delete(editsdir, true);
+      }
+      disablingHT.close();
+    } finally {
+      if (ht != null) ht.close();
+      if (zkw != null) zkw.close();
+    }
+  }
+
+  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
+  public void testDisallowWritesInRecovering() throws Exception {
+    LOG.info("testDisallowWritesInRecovering");
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
+    conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true);
+    startCluster(NUM_RS);
+    final int NUM_REGIONS_TO_CREATE = 40;
+    // turn off load balancing to prevent regions from moving around otherwise
+    // they will consume recovered.edits
+    master.balanceSwitch(false);
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+    try {
+      final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+
+      Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
+      HRegionInfo region = null;
+      HRegionServer hrs = null;
+      HRegionServer dstRS = null;
+      for (int i = 0; i < NUM_RS; i++) {
+        hrs = rsts.get(i).getRegionServer();
+        List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+        if (regions.isEmpty()) continue;
+        region = regions.get(0);
+        regionSet.add(region);
+        dstRS = rsts.get((i+1) % NUM_RS).getRegionServer();
+        break;
+      }
+
+      slm.markRegionsRecovering(hrs.getServerName(), regionSet);
+      // move region in order for the region opened in recovering state
+      final HRegionInfo hri = region;
+      final HRegionServer tmpRS = dstRS;
+      TEST_UTIL.getHBaseAdmin().move(region.getEncodedNameAsBytes(),
+          Bytes.toBytes(dstRS.getServerName().getServerName()));
+      // wait for region move completes
+      final RegionStates regionStates =
+          TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
+      TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          ServerName sn = regionStates.getRegionServerOfRegion(hri);
+          return (sn != null && sn.equals(tmpRS.getServerName()));
+        }
+      });
+
+      try {
+        byte[] key = region.getStartKey();
+        if (key == null || key.length == 0) {
+          key = new byte[] { 0, 0, 0, 0, 1 };
+        }
+        Put put = new Put(key);
+        put.addColumn(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'});
+        ht.put(put);
+      } catch (IOException ioe) {
+        Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException);
+        RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe;
+        boolean foundRegionInRecoveryException = false;
+        for (Throwable t : re.getCauses()) {
+          if (t instanceof RegionInRecoveryException) {
+            foundRegionInRecoveryException = true;
+            break;
+          }
+        }
+        Assert.assertTrue(
+            "No RegionInRecoveryException. Following exceptions returned=" + re.getCauses(),
+            foundRegionInRecoveryException);
+      }
+    } finally {
+      if (ht != null) ht.close();
+      if (ht != null) zkw.close();
+    }
+  }
+
+  /**
+   * The original intention of this test was to force an abort of a region
+   * server and to make sure that the failure path in the region servers is
+   * properly evaluated. But it is difficult to ensure that the region server
+   * doesn't finish the log splitting before it aborts. Also now, there is
+   * this code path where the master will preempt the region server when master
+   * detects that the region server has aborted.
+   * @throws Exception
+   */
+  @Ignore ("Disabled because flakey") @Test (timeout=300000)
+  public void testWorkerAbort() throws Exception {
+    LOG.info("testWorkerAbort");
+    startCluster(3);
+    final int NUM_LOG_LINES = 10000;
+    final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+    FileSystem fs = master.getMasterFileSystem().getFileSystem();
+
+    final List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    HRegionServer hrs = findRSToKill(false, "table");
+    Path rootdir = FSUtils.getRootDir(conf);
+    final Path logDir = new Path(rootdir,
+        DefaultWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
+
+    Table t = installTable(new ZooKeeperWatcher(conf, "table-creation", null),
+        "table", "family", 40);
+    try {
+      makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()),
+          "table", "family", NUM_LOG_LINES, 100);
+
+      new Thread() {
+        @Override
+        public void run() {
+          waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+          for (RegionServerThread rst : rsts) {
+            rst.getRegionServer().abort("testing");
+            break;
+          }
+        }
+      }.start();
+      // slm.splitLogDistributed(logDir);
+      FileStatus[] logfiles = fs.listStatus(logDir);
+      TaskBatch batch = new TaskBatch();
+      slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
+      //waitForCounter but for one of the 2 counters
+      long curt = System.currentTimeMillis();
+      long waitTime = 80000;
+      long endt = curt + waitTime;
+      while (curt < endt) {
+        if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
+            tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
+            tot_wkr_preempt_task.get()) == 0) {
+          Thread.yield();
+          curt = System.currentTimeMillis();
+        } else {
+          assertTrue(1 <= (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
+              tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
+              tot_wkr_preempt_task.get()));
+          return;
+        }
+      }
+      fail("none of the following counters went up in " + waitTime +
+          " milliseconds - " +
+          "tot_wkr_task_resigned, tot_wkr_task_err, " +
+          "tot_wkr_final_transition_failed, tot_wkr_task_done, " +
+          "tot_wkr_preempt_task");
+    } finally {
+      if (t != null) t.close();
+    }
+  }
+
+  @Test (timeout=300000)
+  public void testThreeRSAbort() throws Exception {
+    LOG.info("testThreeRSAbort");
+    final int NUM_REGIONS_TO_CREATE = 40;
+    final int NUM_ROWS_PER_REGION = 100;
+
+    startCluster(NUM_RS); // NUM_RS=6.
+
+    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf,
+        "distributed log splitting test", null);
+
+    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+    try {
+      populateDataInTable(NUM_ROWS_PER_REGION, "family");
+
+
+      List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+      assertEquals(NUM_RS, rsts.size());
+      rsts.get(0).getRegionServer().abort("testing");
+      rsts.get(1).getRegionServer().abort("testing");
+      rsts.get(2).getRegionServer().abort("testing");
+
+      long start = EnvironmentEdgeManager.currentTime();
+      while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) {
+        if (EnvironmentEdgeManager.currentTime() - start > 60000) {
+          assertTrue(false);
+        }
+        Thread.sleep(200);
+      }
+
+      start = EnvironmentEdgeManager.currentTime();
+      while (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
+          < (NUM_REGIONS_TO_CREATE + 1)) {
+        if (EnvironmentEdgeManager.currentTime() - start > 60000) {
+          assertTrue("Timedout", false);
+        }
+        Thread.sleep(200);
+      }
+
+      // wait for all regions are fully recovered
+      TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
+              zkw.recoveringRegionsZNode, false);
+          return (recoveringRegions != null && recoveringRegions.size() == 0);
+        }
+      });
+
+      assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION,
+          TEST_UTIL.countRows(ht));
+    } finally {
+      if (ht != null) ht.close();
+      if (zkw != null) zkw.close();
+    }
+  }
+
+
+
+  @Test(timeout=30000)
+  public void testDelayedDeleteOnFailure() throws Exception {
+    LOG.info("testDelayedDeleteOnFailure");
+    startCluster(1);
+    final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
+    final Path logDir = new Path(FSUtils.getRootDir(conf), "x");
+    fs.mkdirs(logDir);
+    ExecutorService executor = null;
+    try {
+      final Path corruptedLogFile = new Path(logDir, "x");
+      FSDataOutputStream out;
+      out = fs.create(corruptedLogFile);
+      out.write(0);
+      out.write(Bytes.toBytes("corrupted bytes"));
+      out.close();
+      ZKSplitLogManagerCoordination coordination =
+          (ZKSplitLogManagerCoordination) ((BaseCoordinatedStateManager) master
+              .getCoordinatedStateManager()).getSplitLogManagerCoordination();
+      coordination.setIgnoreDeleteForTesting(true);
+      executor = Executors.newSingleThreadExecutor();
+      Runnable runnable = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            // since the logDir is a fake, corrupted one, so the split log worker
+            // will finish it quickly with error, and this call will fail and throw
+            // an IOException.
+            slm.splitLogDistributed(logDir);
+          } catch (IOException ioe) {
+            try {
+              assertTrue(fs.exists(corruptedLogFile));
+              // this call will block waiting for the task to be removed from the
+              // tasks map which is not going to happen since ignoreZKDeleteForTesting
+              // is set to true, until it is interrupted.
+              slm.splitLogDistributed(logDir);
+            } catch (IOException e) {
+              assertTrue(Thread.currentThread().isInterrupted());
+              return;
+            }
+            fail("did not get the expected IOException from the 2nd call");
+          }
+          fail("did not get the expected IOException from the 1st call");
+        }
+      };
+      Future<?> result = executor.submit(runnable);
+      try {
+        result.get(2000, TimeUnit.MILLISECONDS);
+      } catch (TimeoutException te) {
+        // it is ok, expected.
+      }
+      waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
+      executor.shutdownNow();
+      executor = null;
+
+      // make sure the runnable is finished with no exception thrown.
+      result.get();
+    } finally {
+      if (executor != null) {
+        // interrupt the thread in case the test fails in the middle.
+        // it has no effect if the thread is already terminated.
+        executor.shutdownNow();
+      }
+      fs.delete(logDir, true);
+    }
+  }
+
+  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
+  public void testMetaRecoveryInZK() throws Exception {
+    LOG.info("testMetaRecoveryInZK");
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    startCluster(NUM_RS);
+
+    // turn off load balancing to prevent regions from moving around otherwise
+    // they will consume recovered.edits
+    master.balanceSwitch(false);
+    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+
+    // only testing meta recovery in ZK operation
+    HRegionServer hrs = findRSToKill(true, null);
+    List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+
+    LOG.info("#regions = " + regions.size());
+    Set<HRegionInfo> tmpRegions = new HashSet<HRegionInfo>();
+    tmpRegions.add(HRegionInfo.FIRST_META_REGIONINFO);
+    master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), tmpRegions);
+    Set<HRegionInfo> userRegionSet = new HashSet<HRegionInfo>();
+    userRegionSet.addAll(regions);
+    master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), userRegionSet);
+    boolean isMetaRegionInRecovery = false;
+    List<String> recoveringRegions =
+        zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
+    for (String curEncodedRegionName : recoveringRegions) {
+      if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
+        isMetaRegionInRecovery = true;
+        break;
+      }
+    }
+    assertTrue(isMetaRegionInRecovery);
+
+    master.getMasterFileSystem().splitMetaLog(hrs.getServerName());
+
+    isMetaRegionInRecovery = false;
+    recoveringRegions =
+        zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
+    for (String curEncodedRegionName : recoveringRegions) {
+      if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
+        isMetaRegionInRecovery = true;
+        break;
+      }
+    }
+    // meta region should be recovered
+    assertFalse(isMetaRegionInRecovery);
+    zkw.close();
+  }
+
+  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
+  public void testSameVersionUpdatesRecovery() throws Exception {
+    LOG.info("testSameVersionUpdatesRecovery");
+    conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    startCluster(NUM_RS);
+    final AtomicLong sequenceId = new AtomicLong(100);
+    final int NUM_REGIONS_TO_CREATE = 40;
+    final int NUM_LOG_LINES = 1000;
+    // turn off load balancing to prevent regions from moving around otherwise
+    // they will consume recovered.edits
+    master.balanceSwitch(false);
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+    try {
+      List<HRegionInfo> regions = null;
+      HRegionServer hrs = null;
+      for (int i = 0; i < NUM_RS; i++) {
+        boolean isCarryingMeta = false;
+        hrs = rsts.get(i).getRegionServer();
+        regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+        for (HRegionInfo region : regions) {
+          if (region.isMetaRegion()) {
+            isCarryingMeta = true;
+            break;
+          }
+        }
+        if (isCarryingMeta) {
+          continue;
+        }
+        break;
+      }
+
+      LOG.info("#regions = " + regions.size());
+      Iterator<HRegionInfo> it = regions.iterator();
+      while (it.hasNext()) {
+        HRegionInfo region = it.next();
+        if (region.isMetaTable()
+            || region.getEncodedName().equals(
+            HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
+          it.remove();
+        }
+      }
+      if (regions.size() == 0) return;
+      HRegionInfo curRegionInfo = regions.get(0);
+      byte[] startRow = curRegionInfo.getStartKey();
+      if (startRow == null || startRow.length == 0) {
+        startRow = new byte[] { 0, 0, 0, 0, 1 };
+      }
+      byte[] row = Bytes.incrementBytes(startRow, 1);
+      // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key
+      row = Arrays.copyOfRange(row, 3, 8);
+      long value = 0;
+      TableName tableName = TableName.valueOf("table");
+      byte[] family = Bytes.toBytes("family");
+      byte[] qualifier = Bytes.toBytes("c1");
+      long timeStamp = System.currentTimeMillis();
+      HTableDescriptor htd = new HTableDescriptor(tableName);
+      htd.addFamily(new HColumnDescriptor(family));
+      final WAL wal = hrs.getWAL(curRegionInfo);
+      for (int i = 0; i < NUM_LOG_LINES; i += 1) {
+        WALEdit e = new WALEdit();
+        value++;
+        e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
+        wal.append(htd, curRegionInfo,
+            new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis()),
+            e, true);
+      }
+      wal.sync();
+      wal.shutdown();
+
+      // wait for abort completes
+      this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
+
+      // verify we got the last value
+      LOG.info("Verification Starts...");
+      Get g = new Get(row);
+      Result r = ht.get(g);
+      long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
+      assertEquals(value, theStoredVal);
+
+      // after flush
+      LOG.info("Verification after flush...");
+      TEST_UTIL.getHBaseAdmin().flush(tableName);
+      r = ht.get(g);
+      theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
+      assertEquals(value, theStoredVal);
+    } finally {
+      if (ht != null) ht.close();
+      if (zkw != null) zkw.close();
+    }
+  }
+
+  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
+  public void testSameVersionUpdatesRecoveryWithCompaction() throws Exception {
+    LOG.info("testSameVersionUpdatesRecoveryWithWrites");
+    conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
+    conf.setInt("hbase.hstore.compactionThreshold", 3);
+    startCluster(NUM_RS);
+    final AtomicLong sequenceId = new AtomicLong(100);
+    final int NUM_REGIONS_TO_CREATE = 40;
+    final int NUM_LOG_LINES = 2000;
+    // turn off load balancing to prevent regions from moving around otherwise
+    // they will consume recovered.edits
+    master.balanceSwitch(false);
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+    try {
+      List<HRegionInfo> regions = null;
+      HRegionServer hrs = null;
+      for (int i = 0; i < NUM_RS; i++) {
+        boolean isCarryingMeta = false;
+        hrs = rsts.get(i).getRegionServer();
+        regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+        for (HRegionInfo region : regions) {
+          if (region.isMetaRegion()) {
+            isCarryingMeta = true;
+            break;
+          }
+        }
+        if (isCarryingMeta) {
+          continue;
+        }
+        break;
+      }
+
+      LOG.info("#regions = " + regions.size());
+      Iterator<HRegionInfo> it = regions.iterator();
+      while (it.hasNext()) {
+        HRegionInfo region = it.next();
+        if (region.isMetaTable()
+            || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
+          it.remove();
+        }
+      }
+      if (regions.size() == 0) return;
+      HRegionInfo curRegionInfo = regions.get(0);
+      byte[] startRow = curRegionInfo.getStartKey();
+      if (startRow == null || startRow.length == 0) {
+        startRow = new byte[] { 0, 0, 0, 0, 1 };
+      }
+      byte[] row = Bytes.incrementBytes(startRow, 1);
+      // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key
+      row = Arrays.copyOfRange(row, 3, 8);
+      long value = 0;
+      final TableName tableName = TableName.valueOf("table");
+      byte[] family = Bytes.toBytes("family");
+      byte[] qualifier = Bytes.toBytes("c1");
+      long timeStamp = System.currentTimeMillis();
+      HTableDescriptor htd = new HTableDescriptor(tableName);
+      htd.addFamily(new HColumnDescriptor(family));
+      final WAL wal = hrs.getWAL(curRegionInfo);
+      for (int i = 0; i < NUM_LOG_LINES; i += 1) {
+        WALEdit e = new WALEdit();
+        value++;
+        e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
+        wal.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(),
+            tableName, System.currentTimeMillis()), e, true);
+      }
+      wal.sync();
+      wal.shutdown();
+
+      // wait for abort completes
+      this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
+
+      // verify we got the last value
+      LOG.info("Verification Starts...");
+      Get g = new Get(row);
+      Result r = ht.get(g);
+      long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
+      assertEquals(value, theStoredVal);
+
+      // after flush & compaction
+      LOG.info("Verification after flush...");
+      TEST_UTIL.getHBaseAdmin().flush(tableName);
+      TEST_UTIL.getHBaseAdmin().compact(tableName);
+
+      // wait for compaction completes
+      TEST_UTIL.waitFor(30000, 200, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return (TEST_UTIL.getHBaseAdmin().getCompactionState(tableName) == CompactionState.NONE);
+        }
+      });
+
+      r = ht.get(g);
+      theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
+      assertEquals(value, theStoredVal);
+    } finally {
+      if (ht != null) ht.close();
+      if (zkw != null) zkw.close();
+    }
+  }
+
+  @Test(timeout = 300000)
+  public void testReadWriteSeqIdFiles() throws Exception {
+    LOG.info("testReadWriteSeqIdFiles");
+    startCluster(2);
+    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+    Table ht = installTable(zkw, "table", "family", 10);
+    try {
+      FileSystem fs = master.getMasterFileSystem().getFileSystem();
+      Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf("table"));
+      List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
+      long newSeqId = WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L);
+      WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L);
+      assertEquals(newSeqId + 2000,
+          WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L));
+
+      Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0));
+      FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
+        @Override
+        public boolean accept(Path p) {
+          return WALSplitter.isSequenceIdFile(p);
+        }
+      });
+      // only one seqid file should exist
+      assertEquals(1, files.length);
+
+      // verify all seqId files aren't treated as recovered.edits files
+      NavigableSet<Path> recoveredEdits =
+          WALSplitter.getSplitEditFilesSorted(fs, regionDirs.get(0));
+      assertEquals(0, recoveredEdits.size());
+    } finally {
+      if (ht != null) ht.close();
+      if (zkw != null) zkw.close();
+    }
+  }
+
+  Table installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception {
+    return installTable(zkw, tname, fname, nrs, 0);
+  }
+
+  Table installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs,
+      int existingRegions) throws Exception {
+    // Create a table with regions
+    TableName table = TableName.valueOf(tname);
+    byte [] family = Bytes.toBytes(fname);
+    LOG.info("Creating table with " + nrs + " regions");
+    Table ht = TEST_UTIL.createMultiRegionTable(table, family, nrs);
+    int numRegions = -1;
+    try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(table)) {
+      numRegions = r.getStartKeys().length;
+    }
+    assertEquals(nrs, numRegions);
+    LOG.info("Waiting for no more RIT\n");
+    blockUntilNoRIT(zkw, master);
+    // disable-enable cycle to get rid of table's dead regions left behind
+    // by createMultiRegions
+    LOG.debug("Disabling table\n");
+    TEST_UTIL.getHBaseAdmin().disableTable(table);
+    LOG.debug("Waiting for no more RIT\n");
+    blockUntilNoRIT(zkw, master);
+    NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
+    LOG.debug("Verifying only catalog and namespace regions are assigned\n");
+    if (regions.size() != 2) {
+      for (String oregion : regions)
+        LOG.debug("Region still online: " + oregion);
+    }
+    assertEquals(2 + existingRegions, regions.size());
+    LOG.debug("Enabling table\n");
+    TEST_UTIL.getHBaseAdmin().enableTable(table);
+    LOG.debug("Waiting for no more RIT\n");
+    blockUntilNoRIT(zkw, master);
+    LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
+    regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
+    assertEquals(numRegions + 2 + existingRegions, regions.size());
+    return ht;
+  }
+
+  void populateDataInTable(int nrows, String fname) throws Exception {
+    byte [] family = Bytes.toBytes(fname);
+
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    assertEquals(NUM_RS, rsts.size());
+
+    for (RegionServerThread rst : rsts) {
+      HRegionServer hrs = rst.getRegionServer();
+      List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+      for (HRegionInfo hri : hris) {
+        if (hri.getTable().isSystemTable()) {
+          continue;
+        }
+        LOG.debug("adding data to rs = " + rst.getName() +
+            " region = "+ hri.getRegionNameAsString());
+        Region region = hrs.getOnlineRegion(hri.getRegionName());
+        assertTrue(region != null);
+        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family);
+      }
+    }
+
+    for (MasterThread mt : cluster.getLiveMasterThreads()) {
+      HRegionServer hrs = mt.getMaster();
+      List<HRegionInfo> hris;
+      try {
+        hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+      } catch (ServerNotRunningYetException e) {
+        // It's ok: this master may be a backup. Ignored.
+        continue;
+      }
+      for (HRegionInfo hri : hris) {
+        if (hri.getTable().isSystemTable()) {
+          continue;
+        }
+        LOG.debug("adding data to rs = " + mt.getName() +
+            " region = "+ hri.getRegionNameAsString());
+        Region region = hrs.getOnlineRegion(hri.getRegionName());
+        assertTrue(region != null);
+        putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family);
+      }
+    }
+  }
+
+  public void makeWAL(HRegionServer hrs, List<HRegionInfo> regions, String tname, String fname,
+      int num_edits, int edit_size) throws IOException {
+    makeWAL(hrs, regions, tname, fname, num_edits, edit_size, true);
+  }
+
+  public void makeWAL(HRegionServer hrs, List<HRegionInfo> regions, String tname, String fname,
+      int num_edits, int edit_size, boolean cleanShutdown) throws IOException {
+    TableName fullTName = TableName.valueOf(tname);
+    // remove root and meta region
+    regions.remove(HRegionInfo.FIRST_META_REGIONINFO);
+    // using one sequenceId for edits across all regions is ok.
+    final AtomicLong sequenceId = new AtomicLong(10);
+
+
+    for(Iterator<HRegionInfo> iter = regions.iterator(); iter.hasNext(); ) {
+      HRegionInfo regionInfo = iter.next();
+      if(regionInfo.getTable().isSystemTable()) {
+        iter.remove();
+      }
+    }
+    HTableDescriptor htd = new HTableDescriptor(fullTName);
+    byte[] family = Bytes.toBytes(fname);
+    htd.addFamily(new HColumnDescriptor(family));
+    byte[] value = new byte[edit_size];
+
+    List<HRegionInfo> hris = new ArrayList<HRegionInfo>();
+    for (HRegionInfo region : regions) {
+      if (!region.getTable().getNameAsString().equalsIgnoreCase(tname)) {
+        continue;
+      }
+      hris.add(region);
+    }
+    LOG.info("Creating wal edits across " + hris.size() + " regions.");
+    for (int i = 0; i < edit_size; i++) {
+      value[i] = (byte) ('a' + (i % 26));
+    }
+    int n = hris.size();
+    int[] counts = new int[n];
+    // sync every ~30k to line up with desired wal rolls
+    final int syncEvery = 30 * 1024 / edit_size;
+    if (n > 0) {
+      for (int i = 0; i < num_edits; i += 1) {
+        WALEdit e = new WALEdit();
+        HRegionInfo curRegionInfo = hris.get(i % n);
+        final WAL log = hrs.getWAL(curRegionInfo);
+        byte[] startRow = curRegionInfo.getStartKey();
+        if (startRow == null || startRow.length == 0) {
+          startRow = new byte[] { 0, 0, 0, 0, 1 };
+        }
+        byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
+        row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
+        // HBaseTestingUtility.createMultiRegions use 5 bytes
+        // key
+        byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
+        e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
+        log.append(htd, curRegionInfo,
+            new HLogKey(curRegionInfo.getEncodedNameAsBytes(), fullTName,
+                System.currentTimeMillis()), e, true);
+        if (0 == i % syncEvery) {
+          log.sync();
+        }
+        counts[i % n] += 1;
+      }
+    }
+    // done as two passes because the regions might share logs. shutdown is idempotent, but sync
+    // will cause errors if done after.
+    for (HRegionInfo info : hris) {
+      final WAL log = hrs.getWAL(info);
+      log.sync();
+    }
+    if (cleanShutdown) {
+      for (HRegionInfo info : hris) {
+        final WAL log = hrs.getWAL(info);
+        log.shutdown();
+      }
+    }
+    for (int i = 0; i < n; i++) {
+      LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
+    }
+    return;
+  }
+
+  private int countWAL(Path log, FileSystem fs, Configuration conf)
+      throws IOException {
+    int count = 0;
+    WAL.Reader in = WALFactory.createReader(fs, log, conf);
+    try {
+      WAL.Entry e;
+      while ((e = in.next()) != null) {
+        if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
+          count++;
+        }
+      }
+    } finally {
+      try {
+        in.close();
+      } catch (IOException exception) {
+        LOG.warn("Problem closing wal: " + exception.getMessage());
+        LOG.debug("exception details.", exception);
+      }
+    }
+    return count;
+  }
+
+  private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master) throws Exception {
+    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
+  }
+
+  private void putData(Region region, byte[] startRow, int numRows, byte [] qf,
+      byte [] ...families)
+      throws IOException {
+    for(int i = 0; i < numRows; i++) {
+      Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
+      for(byte [] family : families) {
+        put.addColumn(family, qf, null);
+      }
+      region.put(put);
+    }
+  }
+
+  /**
+   * Load table with puts and deletes with expected values so that we can verify later
+   */
+  private void prepareData(final Table t, final byte[] f, final byte[] column) throws IOException {
+    byte[] k = new byte[3];
+
+    // add puts
+    List<Put> puts = new ArrayList<>();
+    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
+      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
+        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
+          k[0] = b1;
+          k[1] = b2;
+          k[2] = b3;
+          Put put = new Put(k);
+          put.addColumn(f, column, k);
+          puts.add(put);
+        }
+      }
+    }
+    t.put(puts);
+    // add deletes
+    for (byte b3 = 'a'; b3 <= 'z'; b3++) {
+      k[0] = 'a';
+      k[1] = 'a';
+      k[2] = b3;
+      Delete del = new Delete(k);
+      t.delete(del);
+    }
+  }
+
+  private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+      long timems) {
+    long curt = System.currentTimeMillis();
+    long endt = curt + timems;
+    while (curt < endt) {
+      if (ctr.get() == oldval) {
+        Thread.yield();
+        curt = System.currentTimeMillis();
+      } else {
+        assertEquals(newval, ctr.get());
+        return;
+      }
+    }
+    assertTrue(false);
+  }
+
+  private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
+    for (MasterThread mt : cluster.getLiveMasterThreads()) {
+      if (mt.getMaster().isActiveMaster()) {
+        mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
+        mt.join();
+        break;
+      }
+    }
+    LOG.debug("Master is aborted");
+  }
+
+  /**
+   * Find a RS that has regions of a table.
+   * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
+   * @param tableName
+   * @return
+   * @throws Exception
+   */
+  private HRegionServer findRSToKill(boolean hasMetaRegion, String tableName) throws Exception {
+    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+    List<HRegionInfo> regions = null;
+    HRegionServer hrs = null;
+
+    for (RegionServerThread rst: rsts) {
+      hrs = rst.getRegionServer();
+      while (rst.isAlive() && !hrs.isOnline()) {
+        Thread.sleep(100);
+      }
+      if (!rst.isAlive()) {
+        continue;
+      }
+      boolean isCarryingMeta = false;
+      boolean foundTableRegion = false;
+      regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
+      for (HRegionInfo region : regions) {
+        if (region.isMetaRegion()) {
+          isCarryingMeta = true;
+        }
+        if (tableName == null || region.getTable().getNameAsString().equals(tableName)) {
+          foundTableRegion = true;
+        }
+        if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
+          break;
+        }
+      }
+      if (isCarryingMeta && hasMetaRegion) {
+        // clients ask for a RS with META
+        if (!foundTableRegion) {
+          final HRegionServer destRS = hrs;
+          // the RS doesn't have regions of the specified table so we need move one to this RS
+          List<HRegionInfo> tableRegions =
+              TEST_UTIL.getHBaseAdmin().getTableRegions(TableName.valueOf(tableName));
+          final HRegionInfo hri = tableRegions.get(0);
+          TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
+              Bytes.toBytes(destRS.getServerName().getServerName()));
+          // wait for region move completes
+          final RegionStates regionStates =
+              TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
+          TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
+            @Override
+            public boolean evaluate() throws Exception {
+              ServerName sn = regionStates.getRegionServerOfRegion(hri);
+              return (sn != null && sn.equals(destRS.getServerName()));
+            }
+          });
+        }
+        return hrs;
+      } else if (hasMetaRegion || isCarryingMeta) {
+        continue;
+      }
+      if (foundTableRegion) break;
+    }
+
+    return hrs;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java
new file mode 100644
index 0000000..395eef2
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.testclassification.FlakeyTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({FlakeyTests.class, MediumTests.class})
+public class TestStochasticLoadBalancer2 extends BalancerTestBase {
+  private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer2.class);
+
+  @Test (timeout = 800000)
+  public void testRegionReplicasOnMidCluster() {
+    conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
+    conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
+    conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
+    conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
+    TestStochasticLoadBalancer.loadBalancer.setConf(conf);
+    int numNodes = 200;
+    int numRegions = 40 * 200;
+    int replication = 3; // 3 replicas per region
+    int numRegionsPerServer = 30; //all regions are mostly balanced
+    int numTables = 10;
+    testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
+  }
+
+  @Test (timeout = 800000)
+  public void testRegionReplicasOnLargeCluster() {
+    conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
+    conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
+    conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
+    conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
+    loadBalancer.setConf(conf);
+    int numNodes = 1000;
+    int numRegions = 20 * numNodes; // 20 * replication regions per RS
+    int numRegionsPerServer = 19; // all servers except one
+    int numTables = 100;
+    int replication = 3;
+    testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
+  }
+
+  @Test (timeout = 800000)
+  public void testRegionReplicasOnMidClusterHighReplication() {
+    conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L);
+    conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
+    conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
+    conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
+    loadBalancer.setConf(conf);
+    int numNodes = 80;
+    int numRegions = 6 * numNodes;
+    int replication = 80; // 80 replicas per region, one for each server
+    int numRegionsPerServer = 5;
+    int numTables = 10;
+    testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false, true);
+  }
+
+  @Test (timeout = 800000)
+  public void testRegionReplicationOnMidClusterReplicationGreaterThanNumNodes() {
+    conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
+    conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
+    conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
+    conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
+    loadBalancer.setConf(conf);
+    int numNodes = 40;
+    int numRegions = 6 * 50;
+    int replication = 50; // 50 replicas per region, more than numNodes
+    int numRegionsPerServer = 6;
+    int numTables = 10;
+    testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, false);
+  }
+}
\ No newline at end of file


[3/3] hbase git commit: HBASE-15302 Reenable the other tests disabled by HBASE-14678

Posted by st...@apache.org.
HBASE-15302 Reenable the other tests disabled by HBASE-14678

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/30cec72f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/30cec72f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/30cec72f

Branch: refs/heads/master
Commit: 30cec72f9ade972d7e9ce4bba527b0e6074cae60
Parents: 876a6ab
Author: Phil Yang <ud...@gmail.com>
Authored: Mon Feb 22 14:17:24 2016 +0800
Committer: stack <st...@apache.org>
Committed: Wed Feb 24 07:14:01 2016 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/wal/WALSplitter.java    |   11 +-
 .../hbase/TestPartialResultsFromClientSide.java |  832 ++++++++
 .../TestMobSnapshotCloneIndependence.java       |   69 +
 .../client/TestSnapshotCloneIndependence.java   |  481 +++++
 .../master/TestDistributedLogSplitting.java     | 1799 ++++++++++++++++++
 .../balancer/TestStochasticLoadBalancer2.java   |   90 +
 .../TestMasterFailoverWithProcedures.java       |  514 +++++
 .../TestMobFlushSnapshotFromClient.java         |   72 +
 .../apache/hadoop/hbase/wal/TestWALSplit.java   | 1320 +++++++++++++
 .../hbase/wal/TestWALSplitCompressed.java       |   36 +
 .../hbase/client/TestReplicationShell.java      |   37 +
 11 files changed, 5256 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 54b82b2..010fd37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -515,13 +515,14 @@ public class WALSplitter {
    * @param fs
    * @param logEntry
    * @param rootDir HBase root dir.
-   * @param fileBeingSplit the file being split currently. Used to generate tmp file name.
+   * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
    * @return Path to file into which to dump split log edits.
    * @throws IOException
    */
   @SuppressWarnings("deprecation")
-  private static Path getRegionSplitEditsPath(final FileSystem fs,
-      final Entry logEntry, final Path rootDir, FileStatus fileBeingSplit)
+  @VisibleForTesting
+  static Path getRegionSplitEditsPath(final FileSystem fs,
+      final Entry logEntry, final Path rootDir, String fileNameBeingSplit)
   throws IOException {
     Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
     String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
@@ -556,7 +557,7 @@ public class WALSplitter {
     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
     // region's replayRecoveredEdits will not delete it
     String fileName = formatRecoveredEditsFileName(logEntry.getKey().getSequenceId());
-    fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileBeingSplit.getPath().getName());
+    fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
     return new Path(dir, fileName);
   }
 
@@ -1518,7 +1519,7 @@ public class WALSplitter {
      * @return a path with a write for that path. caller should close.
      */
     private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
-      Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit);
+      Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit.getPath().getName());
       if (regionedits == null) {
         return null;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
new file mode 100644
index 0000000..a6f8373
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
@@ -0,0 +1,832 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.ClientScanner;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter;
+import org.apache.hadoop.hbase.filter.RandomRowFilter;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * These tests are focused on testing how partial results appear to a client. Partial results are
+ * {@link Result}s that contain only a portion of a row's complete list of cells. Partial results
+ * are formed when the server breaches its maximum result size when trying to service a client's RPC
+ * request. It is the responsibility of the scanner on the client side to recognize when partial
+ * results have been returned and to take action to form the complete results.
+ * <p>
+ * Unless the flag {@link Scan#setAllowPartialResults(boolean)} has been set to true, the caller of
+ * {@link ResultScanner#next()} should never see partial results.
+ */
+@Category(MediumTests.class)
+public class TestPartialResultsFromClientSide {
+  private static final Log LOG = LogFactory.getLog(TestPartialResultsFromClientSide.class);
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static Table TABLE = null;
+
+  /**
+   * Table configuration
+   */
+  private static TableName TABLE_NAME = TableName.valueOf("testTable");
+
+  private static int NUM_ROWS = 5;
+  private static byte[] ROW = Bytes.toBytes("testRow");
+  private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
+
+  // Should keep this value below 10 to keep generation of expected kv's simple. If above 10 then
+  // table/row/cf1/... will be followed by table/row/cf10/... instead of table/row/cf2/... which
+  // breaks the simple generation of expected kv's
+  private static int NUM_FAMILIES = 10;
+  private static byte[] FAMILY = Bytes.toBytes("testFamily");
+  private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
+
+  private static int NUM_QUALIFIERS = 10;
+  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
+  private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
+
+  private static int VALUE_SIZE = 1024;
+  private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
+
+  private static int NUM_COLS = NUM_FAMILIES * NUM_QUALIFIERS;
+
+  // Approximation of how large the heap size of cells in our table. Should be accessed through
+  // getCellHeapSize().
+  private static long CELL_HEAP_SIZE = -1;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+    TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
+  }
+
+  static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
+      byte[][] qualifiers, byte[] cellValue) throws IOException {
+    Table ht = TEST_UTIL.createTable(name, families);
+    List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
+    ht.put(puts);
+
+    return ht;
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Ensure that the expected key values appear in a result returned from a scanner that is
+   * combining partial results into complete results
+   * @throws Exception
+   */
+  @Test
+  public void testExpectedValuesOfPartialResults() throws Exception {
+    testExpectedValuesOfPartialResults(false);
+    testExpectedValuesOfPartialResults(true);
+  }
+
+  public void testExpectedValuesOfPartialResults(boolean reversed) throws Exception {
+    Scan partialScan = new Scan();
+    partialScan.setMaxVersions();
+    // Max result size of 1 ensures that each RPC request will return a single cell. The scanner
+    // will need to reconstruct the results into a complete result before returning to the caller
+    partialScan.setMaxResultSize(1);
+    partialScan.setReversed(reversed);
+    ResultScanner partialScanner = TABLE.getScanner(partialScan);
+
+    final int startRow = reversed ? ROWS.length - 1 : 0;
+    final int endRow = reversed ? -1 : ROWS.length;
+    final int loopDelta = reversed ? -1 : 1;
+    String message;
+
+    for (int row = startRow; row != endRow; row = row + loopDelta) {
+      message = "Ensuring the expected keyValues are present for row " + row;
+      List<Cell> expectedKeyValues = createKeyValuesForRow(ROWS[row], FAMILIES, QUALIFIERS, VALUE);
+      Result result = partialScanner.next();
+      assertFalse(result.isPartial());
+      verifyResult(result, expectedKeyValues, message);
+    }
+
+    partialScanner.close();
+  }
+
+  /**
+   * Ensure that we only see Results marked as partial when the allowPartial flag is set
+   * @throws Exception
+   */
+  @Test
+  public void testAllowPartialResults() throws Exception {
+    Scan scan = new Scan();
+    scan.setAllowPartialResults(true);
+    scan.setMaxResultSize(1);
+    ResultScanner scanner = TABLE.getScanner(scan);
+    Result result = scanner.next();
+
+    assertTrue(result != null);
+    assertTrue(result.isPartial());
+    assertTrue(result.rawCells() != null);
+    assertTrue(result.rawCells().length == 1);
+
+    scanner.close();
+
+    scan.setAllowPartialResults(false);
+    scanner = TABLE.getScanner(scan);
+    result = scanner.next();
+
+    assertTrue(result != null);
+    assertTrue(!result.isPartial());
+    assertTrue(result.rawCells() != null);
+    assertTrue(result.rawCells().length == NUM_COLS);
+
+    scanner.close();
+  }
+
+  /**
+   * Ensure that the results returned from a scanner that retrieves all results in a single RPC call
+   * matches the results that are returned from a scanner that must incrementally combine partial
+   * results into complete results. A variety of scan configurations can be tested
+   * @throws Exception
+   */
+  @Test
+  public void testEquivalenceOfScanResults() throws Exception {
+    Scan oneShotScan = new Scan();
+    oneShotScan.setMaxResultSize(Long.MAX_VALUE);
+
+    Scan partialScan = new Scan(oneShotScan);
+    partialScan.setMaxResultSize(1);
+
+    testEquivalenceOfScanResults(TABLE, oneShotScan, partialScan);
+  }
+
+  public void testEquivalenceOfScanResults(Table table, Scan scan1, Scan scan2) throws Exception {
+    ResultScanner scanner1 = table.getScanner(scan1);
+    ResultScanner scanner2 = table.getScanner(scan2);
+
+    Result r1 = null;
+    Result r2 = null;
+    int count = 0;
+
+    while ((r1 = scanner1.next()) != null) {
+      r2 = scanner2.next();
+
+      assertTrue(r2 != null);
+      compareResults(r1, r2, "Comparing result #" + count);
+      count++;
+    }
+
+    r2 = scanner2.next();
+    assertTrue("r2: " + r2 + " Should be null", r2 == null);
+
+    scanner1.close();
+    scanner2.close();
+  }
+
+  /**
+   * Order of cells in partial results matches the ordering of cells from complete results
+   * @throws Exception
+   */
+  @Test
+  public void testOrderingOfCellsInPartialResults() throws Exception {
+    Scan scan = new Scan();
+
+    for (int col = 1; col <= NUM_COLS; col++) {
+      scan.setMaxResultSize(getResultSizeForNumberOfCells(col));
+      testOrderingOfCellsInPartialResults(scan);
+
+      // Test again with a reversed scanner
+      scan.setReversed(true);
+      testOrderingOfCellsInPartialResults(scan);
+    }
+  }
+
+  public void testOrderingOfCellsInPartialResults(final Scan basePartialScan) throws Exception {
+    // Scan that retrieves results in pieces (partials). By setting allowPartialResults to be true
+    // the results will NOT be reconstructed and instead the caller will see the partial results
+    // returned by the server
+    Scan partialScan = new Scan(basePartialScan);
+    partialScan.setAllowPartialResults(true);
+    ResultScanner partialScanner = TABLE.getScanner(partialScan);
+
+    // Scan that retrieves all table results in single RPC request
+    Scan oneShotScan = new Scan(basePartialScan);
+    oneShotScan.setMaxResultSize(Long.MAX_VALUE);
+    oneShotScan.setCaching(ROWS.length);
+    ResultScanner oneShotScanner = TABLE.getScanner(oneShotScan);
+
+    Result oneShotResult = oneShotScanner.next();
+    Result partialResult = null;
+    int iterationCount = 0;
+
+    while (oneShotResult != null && oneShotResult.rawCells() != null) {
+      List<Cell> aggregatePartialCells = new ArrayList<Cell>();
+      do {
+        partialResult = partialScanner.next();
+        assertTrue("Partial Result is null. iteration: " + iterationCount, partialResult != null);
+        assertTrue("Partial cells are null. iteration: " + iterationCount,
+            partialResult.rawCells() != null);
+
+        for (Cell c : partialResult.rawCells()) {
+          aggregatePartialCells.add(c);
+        }
+      } while (partialResult.isPartial());
+
+      assertTrue("Number of cells differs. iteration: " + iterationCount,
+          oneShotResult.rawCells().length == aggregatePartialCells.size());
+      final Cell[] oneShotCells = oneShotResult.rawCells();
+      for (int cell = 0; cell < oneShotCells.length; cell++) {
+        Cell oneShotCell = oneShotCells[cell];
+        Cell partialCell = aggregatePartialCells.get(cell);
+
+        assertTrue("One shot cell was null", oneShotCell != null);
+        assertTrue("Partial cell was null", partialCell != null);
+        assertTrue("Cell differs. oneShotCell:" + oneShotCell + " partialCell:" + partialCell,
+            oneShotCell.equals(partialCell));
+      }
+
+      oneShotResult = oneShotScanner.next();
+      iterationCount++;
+    }
+
+    assertTrue(partialScanner.next() == null);
+
+    partialScanner.close();
+    oneShotScanner.close();
+  }
+
+  /**
+   * Setting the max result size allows us to control how many cells we expect to see on each call
+   * to next on the scanner. Test a variety of different sizes for correctness
+   * @throws Exception
+   */
+  @Test
+  public void testExpectedNumberOfCellsPerPartialResult() throws Exception {
+    Scan scan = new Scan();
+    testExpectedNumberOfCellsPerPartialResult(scan);
+
+    scan.setReversed(true);
+    testExpectedNumberOfCellsPerPartialResult(scan);
+  }
+
+  public void testExpectedNumberOfCellsPerPartialResult(Scan baseScan) throws Exception {
+    for (int expectedCells = 1; expectedCells <= NUM_COLS; expectedCells++) {
+      testExpectedNumberOfCellsPerPartialResult(baseScan, expectedCells);
+    }
+  }
+
+  public void testExpectedNumberOfCellsPerPartialResult(Scan baseScan, int expectedNumberOfCells)
+      throws Exception {
+
+    if (LOG.isInfoEnabled()) LOG.info("groupSize:" + expectedNumberOfCells);
+
+    // Use the cellHeapSize to set maxResultSize such that we know how many cells to expect back
+    // from the call. The returned results should NOT exceed expectedNumberOfCells but may be less
+    // than it in cases where expectedNumberOfCells is not an exact multiple of the number of
+    // columns in the table.
+    Scan scan = new Scan(baseScan);
+    scan.setAllowPartialResults(true);
+    scan.setMaxResultSize(getResultSizeForNumberOfCells(expectedNumberOfCells));
+
+    ResultScanner scanner = TABLE.getScanner(scan);
+    Result result = null;
+    byte[] prevRow = null;
+    while ((result = scanner.next()) != null) {
+      assertTrue(result.rawCells() != null);
+
+      // Cases when cell count won't equal expectedNumberOfCells:
+      // 1. Returned result is the final result needed to form the complete result for that row
+      // 2. It is the first result we have seen for that row and thus may have been fetched as
+      // the last group of cells that fit inside the maxResultSize
+      assertTrue(
+          "Result's cell count differed from expected number. result: " + result,
+          result.rawCells().length == expectedNumberOfCells || !result.isPartial()
+              || !Bytes.equals(prevRow, result.getRow()));
+      prevRow = result.getRow();
+    }
+
+    scanner.close();
+  }
+
+  /**
+   * @return The approximate heap size of a cell in the test table. All cells should have
+   *         approximately the same heap size, so the value is cached to avoid repeating the
+   *         calculation
+   * @throws Exception
+   */
+  private long getCellHeapSize() throws Exception {
+    if (CELL_HEAP_SIZE == -1) {
+      // Do a partial scan that will return a single result with a single cell
+      Scan scan = new Scan();
+      scan.setMaxResultSize(1);
+      scan.setAllowPartialResults(true);
+      ResultScanner scanner = TABLE.getScanner(scan);
+
+      Result result = scanner.next();
+
+      assertTrue(result != null);
+      assertTrue(result.rawCells() != null);
+      assertTrue(result.rawCells().length == 1);
+
+      CELL_HEAP_SIZE = CellUtil.estimatedHeapSizeOf(result.rawCells()[0]);
+      if (LOG.isInfoEnabled()) LOG.info("Cell heap size: " + CELL_HEAP_SIZE);
+      scanner.close();
+    }
+
+    return CELL_HEAP_SIZE;
+  }
+
+  /**
+   * @param numberOfCells
+   * @return the result size that should be used in {@link Scan#setMaxResultSize(long)} if you want
+   *         the server to return exactly numberOfCells cells
+   * @throws Exception
+   */
+  private long getResultSizeForNumberOfCells(int numberOfCells) throws Exception {
+    return getCellHeapSize() * numberOfCells;
+  }
+
+  /**
+   * Test various combinations of batching and partial results for correctness
+   */
+  @Test
+  public void testPartialResultsAndBatch() throws Exception {
+    for (int batch = 1; batch <= NUM_COLS / 4; batch++) {
+      for (int cellsPerPartial = 1; cellsPerPartial <= NUM_COLS / 4; cellsPerPartial++) {
+        testPartialResultsAndBatch(batch, cellsPerPartial);
+      }
+    }
+  }
+
+  public void testPartialResultsAndBatch(final int batch, final int cellsPerPartialResult)
+      throws Exception {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("batch: " + batch + " cellsPerPartialResult: " + cellsPerPartialResult);
+    }
+
+    Scan scan = new Scan();
+    scan.setMaxResultSize(getResultSizeForNumberOfCells(cellsPerPartialResult));
+    scan.setBatch(batch);
+    ResultScanner scanner = TABLE.getScanner(scan);
+    Result result = scanner.next();
+    int repCount = 0;
+
+    while ((result = scanner.next()) != null) {
+      assertTrue(result.rawCells() != null);
+
+      if (result.isPartial()) {
+        final String error =
+            "Cells:" + result.rawCells().length + " Batch size:" + batch
+                + " cellsPerPartialResult:" + cellsPerPartialResult + " rep:" + repCount;
+        assertTrue(error, result.rawCells().length <= Math.min(batch, cellsPerPartialResult));
+      } else {
+        assertTrue(result.rawCells().length <= batch);
+      }
+      repCount++;
+    }
+
+    scanner.close();
+  }
+
+  /**
+   * Test the method {@link Result#createCompleteResult(List, Result)}
+   * @throws Exception
+   */
+  @Test
+  public void testPartialResultsReassembly() throws Exception {
+    Scan scan = new Scan();
+    testPartialResultsReassembly(scan);
+    scan.setReversed(true);
+    testPartialResultsReassembly(scan);
+  }
+
+  public void testPartialResultsReassembly(Scan scanBase) throws Exception {
+    Scan partialScan = new Scan(scanBase);
+    partialScan.setMaxResultSize(1);
+    partialScan.setAllowPartialResults(true);
+    ResultScanner partialScanner = TABLE.getScanner(partialScan);
+
+    Scan oneShotScan = new Scan(scanBase);
+    oneShotScan.setMaxResultSize(Long.MAX_VALUE);
+    ResultScanner oneShotScanner = TABLE.getScanner(oneShotScan);
+
+    ArrayList<Result> partials = new ArrayList<>();
+    for (int i = 0; i < NUM_ROWS; i++) {
+      Result partialResult = null;
+      Result completeResult = null;
+      Result oneShotResult = null;
+      partials.clear();
+
+      do {
+        partialResult = partialScanner.next();
+        partials.add(partialResult);
+      } while (partialResult != null && partialResult.isPartial());
+
+      completeResult = Result.createCompleteResult(partials);
+      oneShotResult = oneShotScanner.next();
+
+      compareResults(completeResult, oneShotResult, null);
+    }
+
+    assertTrue(oneShotScanner.next() == null);
+    assertTrue(partialScanner.next() == null);
+
+    oneShotScanner.close();
+    partialScanner.close();
+  }
+
+  /**
+   * When reconstructing the complete result from its partials we ensure that the row of each
+   * partial result is the same. If one of the rows differs, an exception is thrown.
+   */
+  @Test
+  public void testExceptionThrownOnMismatchedPartialResults() throws IOException {
+    assertTrue(NUM_ROWS >= 2);
+
+    ArrayList<Result> partials = new ArrayList<>();
+    Scan scan = new Scan();
+    scan.setMaxResultSize(Long.MAX_VALUE);
+    ResultScanner scanner = TABLE.getScanner(scan);
+    Result r1 = scanner.next();
+    partials.add(r1);
+    Result r2 = scanner.next();
+    partials.add(r2);
+
+    assertFalse(Bytes.equals(r1.getRow(), r2.getRow()));
+
+    try {
+      Result.createCompleteResult(partials);
+      fail("r1 and r2 are from different rows. It should not be possible to combine them into"
+          + " a single result");
+    } catch (IOException e) {
+    }
+
+    scanner.close();
+  }
+
+  /**
+   * When a scan has a filter where {@link org.apache.hadoop.hbase.filter.Filter#hasFilterRow()} is
+   * true, the scanner should not return partial results. The scanner cannot return partial results
+   * because the entire row needs to be read for the include/exclude decision to be made
+   */
+  @Test
+  public void testNoPartialResultsWhenRowFilterPresent() throws Exception {
+    Scan scan = new Scan();
+    scan.setMaxResultSize(1);
+    scan.setAllowPartialResults(true);
+    // If a filter hasFilter() is true then partial results should not be returned else filter
+    // application server side would break.
+    scan.setFilter(new RandomRowFilter(1.0f));
+    ResultScanner scanner = TABLE.getScanner(scan);
+
+    Result r = null;
+    while ((r = scanner.next()) != null) {
+      assertFalse(r.isPartial());
+    }
+
+    scanner.close();
+  }
+
+  /**
+   * Examine the interaction between the maxResultSize and caching. If the caching limit is reached
+   * before the maxResultSize limit, we should not see partial results. On the other hand, if the
+   * maxResultSize limit is reached before the caching limit, it is likely that partial results will
+   * be seen.
+   * @throws Exception
+   */
+  @Test
+  public void testPartialResultsAndCaching() throws Exception {
+    for (int caching = 1; caching <= NUM_ROWS; caching++) {
+      for (int maxResultRows = 0; maxResultRows <= NUM_ROWS; maxResultRows++) {
+        testPartialResultsAndCaching(maxResultRows, caching);
+      }
+    }
+  }
+
+  /**
+   * @param resultSizeRowLimit The row limit that will be enforced through maxResultSize
+   * @param cachingRowLimit The row limit that will be enforced through caching
+   * @throws Exception
+   */
+  public void testPartialResultsAndCaching(int resultSizeRowLimit, int cachingRowLimit)
+      throws Exception {
+    Scan scan = new Scan();
+    scan.setAllowPartialResults(true);
+
+    // The number of cells specified in the call to getResultSizeForNumberOfCells is offset to
+    // ensure that the result size we specify is not an exact multiple of the number of cells
+    // in a row. This ensures that partial results will be returned when the result size limit
+    // is reached before the caching limit.
+    int cellOffset = NUM_COLS / 3;
+    long maxResultSize = getResultSizeForNumberOfCells(resultSizeRowLimit * NUM_COLS + cellOffset);
+    scan.setMaxResultSize(maxResultSize);
+    scan.setCaching(cachingRowLimit);
+
+    ResultScanner scanner = TABLE.getScanner(scan);
+    ClientScanner clientScanner = (ClientScanner) scanner;
+    Result r = null;
+
+    // Approximate the number of rows we expect will fit into the specified max rsult size. If this
+    // approximation is less than caching, then we expect that the max result size limit will be
+    // hit before the caching limit and thus partial results may be seen
+    boolean expectToSeePartialResults = resultSizeRowLimit < cachingRowLimit;
+    while ((r = clientScanner.next()) != null) {
+      assertTrue(!r.isPartial() || expectToSeePartialResults);
+    }
+
+    scanner.close();
+  }
+
+  /**
+   * Small scans should not return partial results because it would prevent small scans from
+   * retrieving all of the necessary results in a single RPC request which is what makese small
+   * scans useful. Thus, ensure that even when {@link Scan#getAllowPartialResults()} is true, small
+   * scans do not return partial results
+   * @throws Exception
+   */
+  @Test
+  public void testSmallScansDoNotAllowPartials() throws Exception {
+    Scan scan = new Scan();
+    testSmallScansDoNotAllowPartials(scan);
+    scan.setReversed(true);
+    testSmallScansDoNotAllowPartials(scan);
+  }
+
+  public void testSmallScansDoNotAllowPartials(Scan baseScan) throws Exception {
+    Scan scan = new Scan(baseScan);
+    scan.setAllowPartialResults(true);
+    scan.setSmall(true);
+    scan.setMaxResultSize(1);
+
+    ResultScanner scanner = TABLE.getScanner(scan);
+    Result r = null;
+
+    while ((r = scanner.next()) != null) {
+      assertFalse(r.isPartial());
+    }
+
+    scanner.close();
+  }
+
+  /**
+   * Make puts to put the input value into each combination of row, family, and qualifier
+   * @param rows
+   * @param families
+   * @param qualifiers
+   * @param value
+   * @return
+   * @throws IOException
+   */
+  static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
+      byte[] value) throws IOException {
+    Put put;
+    ArrayList<Put> puts = new ArrayList<>();
+
+    for (int row = 0; row < rows.length; row++) {
+      put = new Put(rows[row]);
+      for (int fam = 0; fam < families.length; fam++) {
+        for (int qual = 0; qual < qualifiers.length; qual++) {
+          KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
+          put.add(kv);
+        }
+      }
+      puts.add(put);
+    }
+
+    return puts;
+  }
+
+  /**
+   * Make key values to represent each possible combination of family and qualifier in the specified
+   * row.
+   * @param row
+   * @param families
+   * @param qualifiers
+   * @param value
+   * @return
+   */
+  static ArrayList<Cell> createKeyValuesForRow(byte[] row, byte[][] families, byte[][] qualifiers,
+      byte[] value) {
+    ArrayList<Cell> outList = new ArrayList<>();
+    for (int fam = 0; fam < families.length; fam++) {
+      for (int qual = 0; qual < qualifiers.length; qual++) {
+        outList.add(new KeyValue(row, families[fam], qualifiers[qual], qual, value));
+      }
+    }
+    return outList;
+  }
+
+  /**
+   * Verifies that result contains all the key values within expKvList. Fails the test otherwise
+   * @param result
+   * @param expKvList
+   * @param msg
+   */
+  static void verifyResult(Result result, List<Cell> expKvList, String msg) {
+    if (LOG.isInfoEnabled()) {
+      LOG.info(msg);
+      LOG.info("Expected count: " + expKvList.size());
+      LOG.info("Actual count: " + result.size());
+    }
+
+    if (expKvList.size() == 0) return;
+
+    int i = 0;
+    for (Cell kv : result.rawCells()) {
+      if (i >= expKvList.size()) {
+        break; // we will check the size later
+      }
+
+      Cell kvExp = expKvList.get(i++);
+      assertTrue("Not equal. get kv: " + kv.toString() + " exp kv: " + kvExp.toString(),
+          kvExp.equals(kv));
+    }
+
+    assertEquals(expKvList.size(), result.size());
+  }
+
+  /**
+   * Compares two results and fails the test if the results are different
+   * @param r1
+   * @param r2
+   * @param message
+   */
+  static void compareResults(Result r1, Result r2, final String message) {
+    if (LOG.isInfoEnabled()) {
+      if (message != null) LOG.info(message);
+      LOG.info("r1: " + r1);
+      LOG.info("r2: " + r2);
+    }
+
+    final String failureMessage = "Results r1:" + r1 + " \nr2:" + r2 + " are not equivalent";
+    if (r1 == null && r2 == null) fail(failureMessage);
+    else if (r1 == null || r2 == null) fail(failureMessage);
+
+    try {
+      Result.compareResults(r1, r2);
+    } catch (Exception e) {
+      fail(failureMessage);
+    }
+  }
+
+  @Test
+  public void testReadPointAndPartialResults() throws Exception {
+    TableName testName = TableName.valueOf("testReadPointAndPartialResults");
+    int numRows = 5;
+    int numFamilies = 5;
+    int numQualifiers = 5;
+    byte[][] rows = HTestConst.makeNAscii(Bytes.toBytes("testRow"), numRows);
+    byte[][] families = HTestConst.makeNAscii(Bytes.toBytes("testFamily"), numFamilies);
+    byte[][] qualifiers = HTestConst.makeNAscii(Bytes.toBytes("testQualifier"), numQualifiers);
+    byte[] value = Bytes.createMaxByteArray(100);
+
+    Table tmpTable = createTestTable(testName, rows, families, qualifiers, value);
+
+    Scan scan = new Scan();
+    scan.setMaxResultSize(1);
+    scan.setAllowPartialResults(true);
+
+    // Open scanner before deletes
+    ResultScanner scanner = tmpTable.getScanner(scan);
+
+    Delete delete1 = new Delete(rows[0]);
+    delete1.addColumn(families[0], qualifiers[0], 0);
+    tmpTable.delete(delete1);
+
+    Delete delete2 = new Delete(rows[1]);
+    delete2.addColumn(families[1], qualifiers[1], 1);
+    tmpTable.delete(delete2);
+
+    // Should see all cells because scanner was opened prior to deletes
+    int scannerCount = countCellsFromScanner(scanner);
+    int expectedCount = numRows * numFamilies * numQualifiers;
+    assertTrue("scannerCount: " + scannerCount + " expectedCount: " + expectedCount,
+        scannerCount == expectedCount);
+
+    // Minus 2 for the two cells that were deleted
+    scanner = tmpTable.getScanner(scan);
+    scannerCount = countCellsFromScanner(scanner);
+    expectedCount = numRows * numFamilies * numQualifiers - 2;
+    assertTrue("scannerCount: " + scannerCount + " expectedCount: " + expectedCount,
+        scannerCount == expectedCount);
+
+    scanner = tmpTable.getScanner(scan);
+    // Put in 2 new rows. The timestamps differ from the deleted rows
+    Put put1 = new Put(rows[0]);
+    put1.add(new KeyValue(rows[0], families[0], qualifiers[0], 1, value));
+    tmpTable.put(put1);
+
+    Put put2 = new Put(rows[1]);
+    put2.add(new KeyValue(rows[1], families[1], qualifiers[1], 2, value));
+    tmpTable.put(put2);
+
+    // Scanner opened prior to puts. Cell count shouldn't have changed
+    scannerCount = countCellsFromScanner(scanner);
+    expectedCount = numRows * numFamilies * numQualifiers - 2;
+    assertTrue("scannerCount: " + scannerCount + " expectedCount: " + expectedCount,
+        scannerCount == expectedCount);
+
+    // Now the scanner should see the cells that were added by puts
+    scanner = tmpTable.getScanner(scan);
+    scannerCount = countCellsFromScanner(scanner);
+    expectedCount = numRows * numFamilies * numQualifiers;
+    assertTrue("scannerCount: " + scannerCount + " expectedCount: " + expectedCount,
+        scannerCount == expectedCount);
+
+    TEST_UTIL.deleteTable(testName);
+  }
+
+  /**
+   * Exhausts the scanner by calling next repetitively. Once completely exhausted, close scanner and
+   * return total cell count
+   * @param scanner
+   * @return
+   * @throws Exception
+   */
+  private int countCellsFromScanner(ResultScanner scanner) throws Exception {
+    Result result = null;
+    int numCells = 0;
+    while ((result = scanner.next()) != null) {
+      numCells += result.rawCells().length;
+    }
+
+    scanner.close();
+    return numCells;
+  }
+
+  /**
+   * Test partial Result re-assembly in the presence of different filters. The Results from the
+   * partial scanner should match the Results returned from a scanner that receives all of the
+   * results in one RPC to the server. The partial scanner is tested with a variety of different
+   * result sizes (all of which are less than the size necessary to fetch an entire row)
+   * @throws Exception
+   */
+  @Test
+  public void testPartialResultsWithColumnFilter() throws Exception {
+    testPartialResultsWithColumnFilter(new FirstKeyOnlyFilter());
+    testPartialResultsWithColumnFilter(new ColumnPrefixFilter(Bytes.toBytes("testQualifier5")));
+    testPartialResultsWithColumnFilter(new ColumnRangeFilter(Bytes.toBytes("testQualifer1"), true,
+        Bytes.toBytes("testQualifier7"), true));
+
+    Set<byte[]> qualifiers = new LinkedHashSet<>();
+    qualifiers.add(Bytes.toBytes("testQualifier5"));
+    testPartialResultsWithColumnFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers));
+  }
+
+  public void testPartialResultsWithColumnFilter(Filter filter) throws Exception {
+    assertTrue(!filter.hasFilterRow());
+
+    Scan partialScan = new Scan();
+    partialScan.setFilter(filter);
+
+    Scan oneshotScan = new Scan();
+    oneshotScan.setFilter(filter);
+    oneshotScan.setMaxResultSize(Long.MAX_VALUE);
+
+    for (int i = 1; i <= NUM_COLS; i++) {
+      partialScan.setMaxResultSize(getResultSizeForNumberOfCells(i));
+      testEquivalenceOfScanResults(TABLE, partialScan, oneshotScan);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java
new file mode 100644
index 0000000..dcf20e5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobSnapshotCloneIndependence.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test to verify that the cloned table is independent of the table from which it was cloned
+ */
+@Category(LargeTests.class)
+public class TestMobSnapshotCloneIndependence extends TestSnapshotCloneIndependence {
+  private static final Log LOG = LogFactory.getLog(TestMobSnapshotCloneIndependence.class);
+
+  /**
+   * Setup the config for the cluster and start it
+   * @throws Exception on failure
+   */
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    setupConf(UTIL.getConfiguration());
+    UTIL.startMiniCluster(NUM_RS);
+  }
+
+  protected static void setupConf(Configuration conf) {
+    TestSnapshotCloneIndependence.setupConf(conf);
+    conf.setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
+  }
+
+  @Override
+  protected Table createTable(final TableName table, byte[] family) throws Exception {
+    return MobSnapshotTestingUtils.createMobTable(UTIL, table, family);
+  }
+
+  @Override
+  public void loadData(final Table table, byte[]... families) throws Exception {
+    SnapshotTestingUtils.loadData(UTIL, table.getName(), 1000, families);
+  }
+
+  @Override
+  protected int countRows(final Table table, final byte[]... families) throws Exception {
+    return MobSnapshotTestingUtils.countMobRows(table, families);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotCloneIndependence.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotCloneIndependence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotCloneIndependence.java
new file mode 100644
index 0000000..0a3093b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotCloneIndependence.java
@@ -0,0 +1,481 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test to verify that the cloned table is independent of the table from which it was cloned
+ */
+@Category({LargeTests.class, ClientTests.class})
+public class TestSnapshotCloneIndependence {
+  private static final Log LOG = LogFactory.getLog(TestSnapshotCloneIndependence.class);
+
+  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  protected static final int NUM_RS = 2;
+  private static final String STRING_TABLE_NAME = "test";
+  private static final String TEST_FAM_STR = "fam";
+  protected static final byte[] TEST_FAM = Bytes.toBytes(TEST_FAM_STR);
+  protected static final TableName TABLE_NAME = TableName.valueOf(STRING_TABLE_NAME);
+  private static final int CLEANER_INTERVAL = 100;
+
+  /**
+   * Setup the config for the cluster and start it
+   * @throws Exception on fOailure
+   */
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    setupConf(UTIL.getConfiguration());
+    UTIL.startMiniCluster(NUM_RS);
+  }
+
+  static void setupConf(Configuration conf) {
+    // Up the handlers; this test needs more than usual.
+    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 15);
+    // enable snapshot support
+    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+    // disable the ui
+    conf.setInt("hbase.regionsever.info.port", -1);
+    conf.setInt("hbase.master.info.port", -1);
+    // change the flush size to a small amount, regulating number of store files
+    conf.setInt("hbase.hregion.memstore.flush.size", 25000);
+    // so make sure we get a compaction when doing a load, but keep around
+    // some files in the store
+    conf.setInt("hbase.hstore.compaction.min", 10);
+    conf.setInt("hbase.hstore.compactionThreshold", 10);
+    // block writes if we get to 12 store files
+    conf.setInt("hbase.hstore.blockingStoreFiles", 12);
+    conf.setInt("hbase.regionserver.msginterval", 100);
+    conf.setBoolean("hbase.master.enabletable.roundrobin", true);
+    // Avoid potentially aggressive splitting which would cause snapshot to fail
+    conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
+        ConstantSizeRegionSplitPolicy.class.getName());
+    // Execute cleaner frequently to induce failures
+    conf.setInt("hbase.master.cleaner.interval", CLEANER_INTERVAL);
+    conf.setInt("hbase.master.hfilecleaner.plugins.snapshot.period", CLEANER_INTERVAL);
+    // Effectively disable TimeToLiveHFileCleaner. Don't want to fully disable it because that
+    // will even trigger races between creating the directory containing back references and
+    // the back reference itself.
+    conf.setInt("hbase.master.hfilecleaner.ttl", CLEANER_INTERVAL);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    createTable(TABLE_NAME, TEST_FAM);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    UTIL.deleteTable(TABLE_NAME);
+    SnapshotTestingUtils.deleteAllSnapshots(UTIL.getHBaseAdmin());
+    SnapshotTestingUtils.deleteArchiveDirectory(UTIL);
+  }
+
+  @AfterClass
+  public static void cleanupTest() throws Exception {
+    try {
+      UTIL.shutdownMiniCluster();
+    } catch (Exception e) {
+      LOG.warn("failure shutting down cluster", e);
+    }
+  }
+
+  /**
+   * Verify that adding data to the cloned table will not affect the original, and vice-versa when
+   * it is taken as an online snapshot.
+   */
+  @Ignore ("Flakey. Fix") @Test (timeout=300000)
+  public void testOnlineSnapshotAppendIndependent() throws Exception {
+    runTestSnapshotAppendIndependent(true);
+  }
+
+  /**
+   * Verify that adding data to the cloned table will not affect the original, and vice-versa when
+   * it is taken as an offline snapshot.
+   */
+  @Test (timeout=300000)
+  @Ignore
+  public void testOfflineSnapshotAppendIndependent() throws Exception {
+    runTestSnapshotAppendIndependent(false);
+  }
+
+  /**
+   * Verify that adding metadata to the cloned table will not affect the original, and vice-versa
+   * when it is taken as an online snapshot.
+   */
+  @Test (timeout=300000)
+  public void testOnlineSnapshotMetadataChangesIndependent() throws Exception {
+    runTestSnapshotMetadataChangesIndependent(true);
+  }
+
+  /**
+   * Verify that adding netadata to the cloned table will not affect the original, and vice-versa
+   * when is taken as an online snapshot.
+   */
+  @Test (timeout=300000)
+  @Ignore
+  public void testOfflineSnapshotMetadataChangesIndependent() throws Exception {
+    runTestSnapshotMetadataChangesIndependent(false);
+  }
+
+  /**
+   * Verify that region operations, in this case splitting a region, are independent between the
+   * cloned table and the original.
+   */
+  @Test (timeout=300000)
+  @Ignore
+  public void testOfflineSnapshotRegionOperationsIndependent() throws Exception {
+    runTestRegionOperationsIndependent(false);
+  }
+
+  /**
+   * Verify that region operations, in this case splitting a region, are independent between the
+   * cloned table and the original.
+   */
+  @Test (timeout=300000)
+  public void testOnlineSnapshotRegionOperationsIndependent() throws Exception {
+    runTestRegionOperationsIndependent(true);
+  }
+
+  @Test (timeout=300000)
+  @Ignore
+  public void testOfflineSnapshotDeleteIndependent() throws Exception {
+    runTestSnapshotDeleteIndependent(false);
+  }
+
+  @Ignore ("Flakey test") @Test (timeout=300000)
+  public void testOnlineSnapshotDeleteIndependent() throws Exception {
+    runTestSnapshotDeleteIndependent(true);
+  }
+
+  private static void waitOnSplit(Connection c, final Table t, int originalCount) throws Exception {
+    for (int i = 0; i < 200; i++) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+      }
+      try (RegionLocator locator = c.getRegionLocator(t.getName())) {
+        if (locator.getAllRegionLocations().size() > originalCount) {
+          return;
+        }
+      }
+    }
+    throw new Exception("Split did not increase the number of regions");
+  }
+
+  /*
+   * Take a snapshot of a table, add data, and verify that this only
+   * affects one table
+   * @param online - Whether the table is online or not during the snapshot
+   */
+  private void runTestSnapshotAppendIndependent(boolean online) throws Exception {
+    FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+
+    Admin admin = UTIL.getHBaseAdmin();
+    final long startTime = System.currentTimeMillis();
+    final TableName localTableName =
+        TableName.valueOf(STRING_TABLE_NAME + startTime);
+
+    try (Table original = createTable(localTableName, TEST_FAM)) {
+      loadData(original, TEST_FAM);
+      final int origTableRowCount = countRows(original);
+
+      // Take a snapshot
+      final String snapshotNameAsString = "snapshot_" + localTableName;
+      byte[] snapshotName = Bytes.toBytes(snapshotNameAsString);
+
+      SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR,
+          snapshotNameAsString, rootDir, fs, online);
+
+      if (!online) {
+        tryDisable(admin, localTableName);
+      }
+      TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName);
+      admin.cloneSnapshot(snapshotName, cloneTableName);
+
+      try (Table clonedTable = UTIL.getConnection().getTable(cloneTableName)) {
+
+        // Make sure that all the regions are available before starting
+        UTIL.waitUntilAllRegionsAssigned(cloneTableName);
+
+        final int clonedTableRowCount = countRows(clonedTable);
+
+        Assert.assertEquals(
+            "The line counts of original and cloned tables do not match after clone. ",
+            origTableRowCount, clonedTableRowCount);
+
+        // Attempt to add data to the test
+        final String rowKey = "new-row-" + System.currentTimeMillis();
+
+        Put p = new Put(Bytes.toBytes(rowKey));
+        p.addColumn(TEST_FAM, Bytes.toBytes("someQualifier"), Bytes.toBytes("someString"));
+        original.put(p);
+
+        // Verify that it is not present in the original table
+        Assert.assertEquals("The row count of the original table was not modified by the put",
+            origTableRowCount + 1, countRows(original));
+        Assert.assertEquals(
+            "The row count of the cloned table changed as a result of addition to the original",
+            clonedTableRowCount, countRows(clonedTable));
+
+        p = new Put(Bytes.toBytes(rowKey));
+        p.addColumn(TEST_FAM, Bytes.toBytes("someQualifier"), Bytes.toBytes("someString"));
+        clonedTable.put(p);
+
+        // Verify that the new family is not in the restored table's description
+        Assert.assertEquals(
+            "The row count of the original table was modified by the put to the clone",
+            origTableRowCount + 1, countRows(original));
+        Assert.assertEquals("The row count of the cloned table was not modified by the put",
+            clonedTableRowCount + 1, countRows(clonedTable));
+      }
+    }
+  }
+
+  /*
+   * Take a snapshot of a table, do a split, and verify that this only affects one table
+   * @param online - Whether the table is online or not during the snapshot
+   */
+  private void runTestRegionOperationsIndependent(boolean online) throws Exception {
+    FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+
+    // Create a table
+    Admin admin = UTIL.getHBaseAdmin();
+    final long startTime = System.currentTimeMillis();
+    final TableName localTableName =
+        TableName.valueOf(STRING_TABLE_NAME + startTime);
+    Table original = createTable(localTableName, TEST_FAM);
+    loadData(original, TEST_FAM);
+    final int loadedTableCount = countRows(original);
+    System.out.println("Original table has: " + loadedTableCount + " rows");
+
+    final String snapshotNameAsString = "snapshot_" + localTableName;
+
+    // Create a snapshot
+    SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR,
+        snapshotNameAsString, rootDir, fs, online);
+
+    if (!online) {
+      tryDisable(admin, localTableName);
+    }
+
+    TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName);
+
+    // Clone the snapshot
+    byte[] snapshotName = Bytes.toBytes(snapshotNameAsString);
+    admin.cloneSnapshot(snapshotName, cloneTableName);
+
+    // Verify that region information is the same pre-split
+    ((ClusterConnection) UTIL.getConnection()).clearRegionCache();
+    List<HRegionInfo> originalTableHRegions = admin.getTableRegions(localTableName);
+
+    final int originalRegionCount = originalTableHRegions.size();
+    final int cloneTableRegionCount = admin.getTableRegions(cloneTableName).size();
+    Assert.assertEquals(
+        "The number of regions in the cloned table is different than in the original table.",
+        originalRegionCount, cloneTableRegionCount);
+
+    // Split a region on the parent table
+    admin.splitRegion(originalTableHRegions.get(0).getRegionName());
+    waitOnSplit(UTIL.getConnection(), original, originalRegionCount);
+
+    // Verify that the cloned table region is not split
+    final int cloneTableRegionCount2 = admin.getTableRegions(cloneTableName).size();
+    Assert.assertEquals(
+        "The number of regions in the cloned table changed though none of its regions were split.",
+        cloneTableRegionCount, cloneTableRegionCount2);
+  }
+
+  /*
+   * Take a snapshot of a table, add metadata, and verify that this only
+   * affects one table
+   * @param online - Whether the table is online or not during the snapshot
+   */
+  private void runTestSnapshotMetadataChangesIndependent(boolean online) throws Exception {
+    FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+
+    // Create a table
+    Admin admin = UTIL.getHBaseAdmin();
+    final long startTime = System.currentTimeMillis();
+    final TableName localTableName =
+        TableName.valueOf(STRING_TABLE_NAME + startTime);
+    Table original = createTable(localTableName, TEST_FAM);
+    loadData(original, TEST_FAM);
+
+    final String snapshotNameAsString = "snapshot_" + localTableName;
+
+    // Create a snapshot
+    SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR,
+        snapshotNameAsString, rootDir, fs, online);
+
+    if (!online) {
+      tryDisable(admin, localTableName);
+    }
+
+    TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName);
+
+    // Clone the snapshot
+    byte[] snapshotName = Bytes.toBytes(snapshotNameAsString);
+    admin.cloneSnapshot(snapshotName, cloneTableName);
+
+    // Add a new column family to the original table
+    byte[] TEST_FAM_2 = Bytes.toBytes("fam2");
+    HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM_2);
+
+    tryDisable(admin, localTableName);
+    admin.addColumnFamily(localTableName, hcd);
+
+    // Verify that it is not in the snapshot
+    admin.enableTable(localTableName);
+    UTIL.waitTableAvailable(localTableName);
+
+    // get a description of the cloned table
+    // get a list of its families
+    // assert that the family is there
+    HTableDescriptor originalTableDescriptor = original.getTableDescriptor();
+    HTableDescriptor clonedTableDescriptor = admin.getTableDescriptor(cloneTableName);
+
+    Assert.assertTrue("The original family was not found. There is something wrong. ",
+        originalTableDescriptor.hasFamily(TEST_FAM));
+    Assert.assertTrue("The original family was not found in the clone. There is something wrong. ",
+        clonedTableDescriptor.hasFamily(TEST_FAM));
+
+    Assert.assertTrue("The new family was not found. ",
+        originalTableDescriptor.hasFamily(TEST_FAM_2));
+    Assert.assertTrue("The new family was not found. ",
+        !clonedTableDescriptor.hasFamily(TEST_FAM_2));
+  }
+
+  private void tryDisable(Admin admin, TableName localTableName) throws IOException {
+    int offlineRetry = 0;
+    while ( offlineRetry < 5 && admin.isTableEnabled(localTableName)) {
+      try {
+        admin.disableTable(localTableName);
+      } catch (IOException ioe) {
+        LOG.warn("Error disabling the table", ioe);
+      }
+      offlineRetry ++;
+    }
+  }
+
+  /*
+   * Take a snapshot of a table, add data, and verify that deleting the snapshot does not affect
+   * either table.
+   * @param online - Whether the table is online or not during the snapshot
+   */
+  private void runTestSnapshotDeleteIndependent(boolean online) throws Exception {
+    FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+
+    final Admin admin = UTIL.getHBaseAdmin();
+    final long startTime = System.currentTimeMillis();
+    final TableName localTableName =
+        TableName.valueOf(STRING_TABLE_NAME + startTime);
+
+    try (Table original = createTable(localTableName, TEST_FAM)) {
+      loadData(original, TEST_FAM);
+    }
+
+    // Take a snapshot
+    final String snapshotNameAsString = "snapshot_" + localTableName;
+    byte[] snapshotName = Bytes.toBytes(snapshotNameAsString);
+
+    SnapshotTestingUtils.createSnapshotAndValidate(admin, localTableName, TEST_FAM_STR,
+        snapshotNameAsString, rootDir, fs, online);
+
+    if (!online) {
+      tryDisable(admin, localTableName);
+    }
+
+    TableName cloneTableName = TableName.valueOf("test-clone-" + localTableName);
+    admin.cloneSnapshot(snapshotName, cloneTableName);
+
+    UTIL.waitUntilAllRegionsAssigned(cloneTableName);
+
+    // Ensure the original table does not reference the HFiles anymore
+    admin.majorCompact(localTableName);
+
+    // Deleting the snapshot used to break the cloned table by deleting in-use HFiles
+    admin.deleteSnapshot(snapshotName);
+
+    // Wait for cleaner run and DFS heartbeats so that anything that is deletable is fully deleted
+    do {
+      Thread.sleep(5000);
+    } while (!admin.listSnapshots(snapshotNameAsString).isEmpty());
+
+    try (Table original = UTIL.getConnection().getTable(localTableName)) {
+      try (Table clonedTable = UTIL.getConnection().getTable(cloneTableName)) {
+        // Verify that all regions of both tables are readable
+        final int origTableRowCount = countRows(original);
+        final int clonedTableRowCount = countRows(clonedTable);
+        Assert.assertEquals(origTableRowCount, clonedTableRowCount);
+      }
+    }
+  }
+
+  protected Table createTable(final TableName table, byte[] family) throws Exception {
+    Table t = UTIL.createTable(table, family);
+    // Wait for everything to be ready with the table
+    UTIL.waitUntilAllRegionsAssigned(table);
+
+    // At this point the table should be good to go.
+    return t;
+  }
+
+  protected void loadData(final Table table, byte[]... families) throws Exception {
+    UTIL.loadTable(table, families);
+  }
+
+  protected int countRows(final Table table, final byte[]... families) throws Exception {
+    return UTIL.countRows(table, families);
+  }
+}
\ No newline at end of file