You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/11/08 01:43:29 UTC

[2/5] hbase git commit: HBASE-19128 Purge Distributed Log Replay from codebase, configurations, text; mark the feature as unsupported, broken.

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index eafc412..3d59639 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -33,12 +33,9 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.NavigableSet;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -67,28 +64,18 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.CompactionState;
-import org.apache.hadoop.hbase.client.ConnectionUtils;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.NonceGenerator;
-import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
-import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -110,10 +97,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -145,7 +130,6 @@ public class TestDistributedLogSplitting {
   Configuration conf;
   static Configuration originalConf;
   static HBaseTestingUtility TEST_UTIL;
-  static MiniDFSCluster dfsCluster;
   static MiniZooKeeperCluster zkCluster;
 
   @Rule
@@ -154,7 +138,6 @@ public class TestDistributedLogSplitting {
   @BeforeClass
   public static void setup() throws Exception {
     TEST_UTIL = new HBaseTestingUtility(HBaseConfiguration.create());
-    dfsCluster = TEST_UTIL.startMiniDFSCluster(1);
     zkCluster = TEST_UTIL.startMiniZKCluster();
     originalConf = TEST_UTIL.getConfiguration();
   }
@@ -178,7 +161,6 @@ public class TestDistributedLogSplitting {
     conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
     TEST_UTIL.shutdownMiniHBaseCluster();
     TEST_UTIL = new HBaseTestingUtility(conf);
-    TEST_UTIL.setDFSCluster(dfsCluster);
     TEST_UTIL.setZkCluster(zkCluster);
     TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, num_rs);
     cluster = TEST_UTIL.getHBaseCluster();
@@ -211,14 +193,12 @@ public class TestDistributedLogSplitting {
     }
   }
 
-  @Ignore("DLR is broken by HBASE-12751") @Test (timeout=300000)
+  @Test (timeout=300000)
   public void testRecoveredEdits() throws Exception {
-    LOG.info("testRecoveredEdits");
     conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
     startCluster(NUM_RS);
 
-    final int NUM_LOG_LINES = 1000;
+    final int NUM_LOG_LINES = 10000;
     final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
     // turn off load balancing to prevent regions from moving around otherwise
     // they will consume recovered.edits
@@ -229,23 +209,18 @@ public class TestDistributedLogSplitting {
 
     Path rootdir = FSUtils.getRootDir(conf);
 
+    int numRegions = 50;
     Table t = installTable(new ZooKeeperWatcher(conf, "table-creation", null),
-        "table", "family", 40);
+        "table", "family", numRegions);
     try {
       TableName table = t.getName();
       List<RegionInfo> regions = null;
       HRegionServer hrs = null;
       for (int i = 0; i < NUM_RS; i++) {
-        boolean foundRs = false;
         hrs = rsts.get(i).getRegionServer();
         regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-        for (RegionInfo region : regions) {
-          if (region.getTable().getNameAsString().equalsIgnoreCase("table")) {
-            foundRs = true;
-            break;
-          }
-        }
-        if (foundRs) break;
+        // At least one RS will have >= to average number of regions.
+        if (regions.size() >= numRegions/NUM_RS) break;
       }
       final Path logDir = new Path(rootdir, AbstractFSWALProvider.getWALDirectoryName(hrs
           .getServerName().toString()));
@@ -266,11 +241,9 @@ public class TestDistributedLogSplitting {
 
       int count = 0;
       for (RegionInfo hri : regions) {
-
         Path tdir = FSUtils.getTableDir(rootdir, table);
-        Path editsdir =
-            WALSplitter.getRegionDirRecoveredEditsDir(
-                HRegion.getRegionDir(tdir, hri.getEncodedName()));
+        Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(
+            HRegion.getRegionDir(tdir, hri.getEncodedName()));
         LOG.debug("checking edits dir " + editsdir);
         FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
           @Override
@@ -293,195 +266,14 @@ public class TestDistributedLogSplitting {
 
       // check that the log file is moved
       assertFalse(fs.exists(logDir));
-
       assertEquals(NUM_LOG_LINES, count);
     } finally {
       if (t != null) t.close();
     }
   }
 
-  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
-  public void testLogReplayWithNonMetaRSDown() throws Exception {
-    LOG.info("testLogReplayWithNonMetaRSDown");
-    conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-    startCluster(NUM_RS);
-    final int NUM_REGIONS_TO_CREATE = 40;
-    final int NUM_LOG_LINES = 1000;
-    // turn off load balancing to prevent regions from moving around otherwise
-    // they will consume recovered.edits
-    master.balanceSwitch(false);
-
-    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
-    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
-    try {
-      HRegionServer hrs = findRSToKill(false, "table");
-      List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-      makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
-
-      // wait for abort completes
-      this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
-    } finally {
-      if (ht != null) ht.close();
-      if (zkw != null) zkw.close();
-    }
-  }
-
-  private static class NonceGeneratorWithDups implements NonceGenerator {
-
-    private final PerClientRandomNonceGenerator delegate = PerClientRandomNonceGenerator.get();
-    private boolean isDups = false;
-    private LinkedList<Long> nonces = new LinkedList<>();
-
-    public void startDups() {
-      isDups = true;
-    }
-
-    @Override
-    public long newNonce() {
-      long nonce = isDups ? nonces.removeFirst() : delegate.newNonce();
-      if (!isDups) {
-        nonces.add(nonce);
-      }
-      return nonce;
-    }
-
-    @Override
-    public long getNonceGroup() {
-      return delegate.getNonceGroup();
-    }
-  }
-
-  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
-  public void testNonceRecovery() throws Exception {
-    LOG.info("testNonceRecovery");
-    final String TABLE_NAME = "table";
-    final String FAMILY_NAME = "family";
-    final int NUM_REGIONS_TO_CREATE = 40;
-
-    conf.setLong("hbase.regionserver.hlog.blocksize", 100*1024);
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-    startCluster(NUM_RS);
-    master.balanceSwitch(false);
-
-    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
-    Table ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE);
-    NonceGeneratorWithDups ng = new NonceGeneratorWithDups();
-    NonceGenerator oldNg =
-        ConnectionUtils.injectNonceGeneratorForTesting(
-            (ClusterConnection)TEST_UTIL.getConnection(), ng);
-
-    try {
-      List<Increment> reqs = new ArrayList<>();
-      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
-        HRegionServer hrs = rst.getRegionServer();
-        List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-        for (RegionInfo hri : hris) {
-          if (TABLE_NAME.equalsIgnoreCase(hri.getTable().getNameAsString())) {
-            byte[] key = hri.getStartKey();
-            if (key == null || key.length == 0) {
-              key = Bytes.copy(hri.getEndKey());
-              --(key[key.length - 1]);
-            }
-            Increment incr = new Increment(key);
-            incr.addColumn(Bytes.toBytes(FAMILY_NAME), Bytes.toBytes("q"), 1);
-            ht.increment(incr);
-            reqs.add(incr);
-          }
-        }
-      }
-
-      HRegionServer hrs = findRSToKill(false, "table");
-      abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
-      ng.startDups();
-      for (Increment incr : reqs) {
-        try {
-          ht.increment(incr);
-          fail("should have thrown");
-        } catch (IOException ope) {
-          LOG.debug("Caught as expected: " + ope.getMessage());
-        }
-      }
-    } finally {
-      ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)
-          TEST_UTIL.getConnection(), oldNg);
-      if (ht != null) ht.close();
-      if (zkw != null) zkw.close();
-    }
-  }
-
-  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
-  public void testLogReplayWithMetaRSDown() throws Exception {
-    LOG.info("testRecoveredEditsReplayWithMetaRSDown");
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-    startCluster(NUM_RS);
-    final int NUM_REGIONS_TO_CREATE = 40;
-    final int NUM_LOG_LINES = 1000;
-    // turn off load balancing to prevent regions from moving around otherwise
-    // they will consume recovered.edits
-    master.balanceSwitch(false);
-
-    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
-    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
-    try {
-      HRegionServer hrs = findRSToKill(true, "table");
-      List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-      makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
-
-      this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
-    } finally {
-      if (ht != null) ht.close();
-      if (zkw != null) zkw.close();
-    }
-  }
-
-  private void abortRSAndVerifyRecovery(HRegionServer hrs, Table ht, final ZooKeeperWatcher zkw,
-      final int numRegions, final int numofLines) throws Exception {
-
-    abortRSAndWaitForRecovery(hrs, zkw, numRegions);
-    assertEquals(numofLines, TEST_UTIL.countRows(ht));
-  }
-
-  private void abortRSAndWaitForRecovery(HRegionServer hrs, final ZooKeeperWatcher zkw,
-      final int numRegions) throws Exception {
-    final MiniHBaseCluster tmpCluster = this.cluster;
-
-    // abort RS
-    LOG.info("Aborting region server: " + hrs.getServerName());
-    hrs.abort("testing");
-
-    // wait for abort completes
-    TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
-      @Override
-      public boolean evaluate() throws Exception {
-        return (tmpCluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
-      }
-    });
-
-    // wait for regions come online
-    TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
-      @Override
-      public boolean evaluate() throws Exception {
-        return (HBaseTestingUtility.getAllOnlineRegions(tmpCluster).size()
-            >= (numRegions + 1));
-      }
-    });
-
-    // wait for all regions are fully recovered
-    TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
-      @Override
-      public boolean evaluate() throws Exception {
-        List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
-            zkw.znodePaths.recoveringRegionsZNode, false);
-        return (recoveringRegions != null && recoveringRegions.isEmpty());
-      }
-    });
-  }
-
-  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
+  @Test(timeout = 300000)
   public void testMasterStartsUpWithLogSplittingWork() throws Exception {
-    LOG.info("testMasterStartsUpWithLogSplittingWork");
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
     startCluster(NUM_RS);
 
@@ -536,473 +328,6 @@ public class TestDistributedLogSplitting {
     }
   }
 
-  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
-  public void testMasterStartsUpWithLogReplayWork() throws Exception {
-    LOG.info("testMasterStartsUpWithLogReplayWork");
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
-    startCluster(NUM_RS);
-
-    final int NUM_REGIONS_TO_CREATE = 40;
-    final int NUM_LOG_LINES = 1000;
-    // turn off load balancing to prevent regions from moving around otherwise
-    // they will consume recovered.edits
-    master.balanceSwitch(false);
-
-    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
-    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
-    try {
-      HRegionServer hrs = findRSToKill(false, "table");
-      List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-      makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
-
-      // abort master
-      abortMaster(cluster);
-
-      // abort RS
-      LOG.info("Aborting region server: " + hrs.getServerName());
-      hrs.abort("testing");
-
-      // wait for the RS dies
-      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
-        }
-      });
-
-      Thread.sleep(2000);
-      LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
-
-      // wait for all regions are fully recovered
-      TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
-              zkw.znodePaths.recoveringRegionsZNode, false);
-          boolean done = recoveringRegions != null && recoveringRegions.isEmpty();
-          if (!done) {
-            LOG.info("Recovering regions: " + recoveringRegions);
-          }
-          return done;
-        }
-      });
-
-      LOG.info("Current Open Regions After Master Node Starts Up:"
-          + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
-
-      assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
-    } finally {
-      if (ht != null) ht.close();
-      if (zkw != null) zkw.close();
-    }
-  }
-
-
-  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
-  public void testLogReplayTwoSequentialRSDown() throws Exception {
-    LOG.info("testRecoveredEditsReplayTwoSequentialRSDown");
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-    startCluster(NUM_RS);
-    final int NUM_REGIONS_TO_CREATE = 40;
-    final int NUM_LOG_LINES = 1000;
-    // turn off load balancing to prevent regions from moving around otherwise
-    // they will consume recovered.edits
-    master.balanceSwitch(false);
-
-    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
-    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
-    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
-    try {
-      List<RegionInfo> regions = null;
-      HRegionServer hrs1 = findRSToKill(false, "table");
-      regions = ProtobufUtil.getOnlineRegions(hrs1.getRSRpcServices());
-
-      makeWAL(hrs1, regions, "table", "family", NUM_LOG_LINES, 100);
-
-      // abort RS1
-      LOG.info("Aborting region server: " + hrs1.getServerName());
-      hrs1.abort("testing");
-
-      // wait for abort completes
-      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
-        }
-      });
-
-      // wait for regions come online
-      TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
-              >= (NUM_REGIONS_TO_CREATE + 1));
-        }
-      });
-
-      // sleep a little bit in order to interrupt recovering in the middle
-      Thread.sleep(300);
-      // abort second region server
-      rsts = cluster.getLiveRegionServerThreads();
-      HRegionServer hrs2 = rsts.get(0).getRegionServer();
-      LOG.info("Aborting one more region server: " + hrs2.getServerName());
-      hrs2.abort("testing");
-
-      // wait for abort completes
-      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 2));
-        }
-      });
-
-      // wait for regions come online
-      TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
-              >= (NUM_REGIONS_TO_CREATE + 1));
-        }
-      });
-
-      // wait for all regions are fully recovered
-      TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
-              zkw.znodePaths.recoveringRegionsZNode, false);
-          return (recoveringRegions != null && recoveringRegions.isEmpty());
-        }
-      });
-
-      assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
-    } finally {
-      if (ht != null) ht.close();
-      if (zkw != null) zkw.close();
-    }
-  }
-
-  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
-  public void testMarkRegionsRecoveringInZK() throws Exception {
-    LOG.info("testMarkRegionsRecoveringInZK");
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-    startCluster(NUM_RS);
-    master.balanceSwitch(false);
-    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
-    final ZooKeeperWatcher zkw = master.getZooKeeper();
-    Table ht = installTable(zkw, "table", "family", 40);
-    try {
-      final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
-
-      Set<RegionInfo> regionSet = new HashSet<>();
-      RegionInfo region = null;
-      HRegionServer hrs = null;
-      ServerName firstFailedServer = null;
-      ServerName secondFailedServer = null;
-      for (int i = 0; i < NUM_RS; i++) {
-        hrs = rsts.get(i).getRegionServer();
-        List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-        if (regions.isEmpty()) continue;
-        region = regions.get(0);
-        regionSet.add(region);
-        firstFailedServer = hrs.getServerName();
-        secondFailedServer = rsts.get((i + 1) % NUM_RS).getRegionServer().getServerName();
-        break;
-      }
-
-      slm.markRegionsRecovering(firstFailedServer, regionSet);
-      slm.markRegionsRecovering(secondFailedServer, regionSet);
-
-      List<String> recoveringRegions = ZKUtil.listChildrenNoWatch(zkw,
-          ZKUtil.joinZNode(zkw.znodePaths.recoveringRegionsZNode, region.getEncodedName()));
-
-      assertEquals(recoveringRegions.size(), 2);
-
-      // wait for splitLogWorker to mark them up because there is no WAL files recorded in ZK
-      final HRegionServer tmphrs = hrs;
-      TEST_UTIL.waitFor(60000, 1000, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return (tmphrs.getRecoveringRegions().isEmpty());
-        }
-      });
-    } finally {
-      if (ht != null) ht.close();
-      if (zkw != null) zkw.close();
-    }
-  }
-
-  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
-  public void testReplayCmd() throws Exception {
-    LOG.info("testReplayCmd");
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-    startCluster(NUM_RS);
-    final int NUM_REGIONS_TO_CREATE = 40;
-    // turn off load balancing to prevent regions from moving around otherwise
-    // they will consume recovered.edits
-    master.balanceSwitch(false);
-
-    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
-    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
-    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
-    try {
-      List<RegionInfo> regions = null;
-      HRegionServer hrs = null;
-      for (int i = 0; i < NUM_RS; i++) {
-        boolean isCarryingMeta = false;
-        hrs = rsts.get(i).getRegionServer();
-        regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-        for (RegionInfo region : regions) {
-          if (region.isMetaRegion()) {
-            isCarryingMeta = true;
-            break;
-          }
-        }
-        if (isCarryingMeta) {
-          continue;
-        }
-        if (regions.size() > 0) break;
-      }
-
-      this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1"));
-      String originalCheckSum = TEST_UTIL.checksumRows(ht);
-
-      // abort RA and trigger replay
-      abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
-
-      assertEquals("Data should remain after reopening of regions", originalCheckSum,
-          TEST_UTIL.checksumRows(ht));
-    } finally {
-      if (ht != null) ht.close();
-      if (zkw != null) zkw.close();
-    }
-  }
-
-  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
-  public void testLogReplayForDisablingTable() throws Exception {
-    LOG.info("testLogReplayForDisablingTable");
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-    startCluster(NUM_RS);
-    final int NUM_REGIONS_TO_CREATE = 40;
-    final int NUM_LOG_LINES = 1000;
-
-    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
-    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
-    Table disablingHT = installTable(zkw, "disableTable", "family", NUM_REGIONS_TO_CREATE);
-    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE, NUM_REGIONS_TO_CREATE);
-    try {
-      // turn off load balancing to prevent regions from moving around otherwise
-      // they will consume recovered.edits
-      master.balanceSwitch(false);
-
-      List<RegionInfo> regions = null;
-      HRegionServer hrs = null;
-      boolean hasRegionsForBothTables = false;
-      String tableName = null;
-      for (int i = 0; i < NUM_RS; i++) {
-        tableName = null;
-        hasRegionsForBothTables = false;
-        boolean isCarryingSystem = false;
-        hrs = rsts.get(i).getRegionServer();
-        regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-        for (RegionInfo region : regions) {
-          if (region.getTable().isSystemTable()) {
-            isCarryingSystem = true;
-            break;
-          }
-          if (tableName != null &&
-              !tableName.equalsIgnoreCase(region.getTable().getNameAsString())) {
-            // make sure that we find a RS has online regions for both "table" and "disableTable"
-            hasRegionsForBothTables = true;
-            break;
-          } else if (tableName == null) {
-            tableName = region.getTable().getNameAsString();
-          }
-        }
-        if (isCarryingSystem) {
-          continue;
-        }
-        if (hasRegionsForBothTables) {
-          break;
-        }
-      }
-
-      // make sure we found a good RS
-      Assert.assertTrue(hasRegionsForBothTables);
-
-      LOG.info("#regions = " + regions.size());
-      Iterator<RegionInfo> it = regions.iterator();
-      while (it.hasNext()) {
-        RegionInfo region = it.next();
-        if (region.isMetaRegion()) {
-          it.remove();
-        }
-      }
-      makeWAL(hrs, regions, "disableTable", "family", NUM_LOG_LINES, 100, false);
-      makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
-
-      LOG.info("Disabling table\n");
-      TEST_UTIL.getAdmin().disableTable(TableName.valueOf(name.getMethodName()));
-      TEST_UTIL.waitTableDisabled(TableName.valueOf(name.getMethodName()).getName());
-
-      // abort RS
-      LOG.info("Aborting region server: " + hrs.getServerName());
-      hrs.abort("testing");
-
-      // wait for abort completes
-      TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
-        }
-      });
-
-      // wait for regions come online
-      TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
-              >= (NUM_REGIONS_TO_CREATE + 1));
-        }
-      });
-
-      // wait for all regions are fully recovered
-      TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
-              zkw.znodePaths.recoveringRegionsZNode, false);
-          ServerManager serverManager = master.getServerManager();
-          return (!serverManager.areDeadServersInProgress() &&
-              recoveringRegions != null && recoveringRegions.isEmpty());
-        }
-      });
-
-      int count = 0;
-      FileSystem fs = master.getMasterFileSystem().getFileSystem();
-      Path rootdir = FSUtils.getRootDir(conf);
-      Path tdir = FSUtils.getTableDir(rootdir, TableName.valueOf(name.getMethodName()));
-      for (RegionInfo hri : regions) {
-        Path editsdir =
-            WALSplitter.getRegionDirRecoveredEditsDir(
-                HRegion.getRegionDir(tdir, hri.getEncodedName()));
-        LOG.debug("checking edits dir " + editsdir);
-        if(!fs.exists(editsdir)) continue;
-        FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
-          @Override
-          public boolean accept(Path p) {
-            if (WALSplitter.isSequenceIdFile(p)) {
-              return false;
-            }
-            return true;
-          }
-        });
-        if(files != null) {
-          for(FileStatus file : files) {
-            int c = countWAL(file.getPath(), fs, conf);
-            count += c;
-            LOG.info(c + " edits in " + file.getPath());
-          }
-        }
-      }
-
-      LOG.info("Verify edits in recovered.edits files");
-      assertEquals(NUM_LOG_LINES, count);
-      LOG.info("Verify replayed edits");
-      assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
-
-      // clean up
-      for (RegionInfo hri : regions) {
-        Path editsdir =
-            WALSplitter.getRegionDirRecoveredEditsDir(
-                HRegion.getRegionDir(tdir, hri.getEncodedName()));
-        fs.delete(editsdir, true);
-      }
-      disablingHT.close();
-    } finally {
-      if (ht != null) ht.close();
-      if (zkw != null) zkw.close();
-    }
-  }
-
-  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
-  public void testDisallowWritesInRecovering() throws Exception {
-    LOG.info("testDisallowWritesInRecovering");
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
-    conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true);
-    startCluster(NUM_RS);
-    final int NUM_REGIONS_TO_CREATE = 40;
-    // turn off load balancing to prevent regions from moving around otherwise
-    // they will consume recovered.edits
-    master.balanceSwitch(false);
-
-    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
-    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
-    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
-    try {
-      final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
-
-      Set<RegionInfo> regionSet = new HashSet<>();
-      RegionInfo region = null;
-      HRegionServer hrs = null;
-      HRegionServer dstRS = null;
-      for (int i = 0; i < NUM_RS; i++) {
-        hrs = rsts.get(i).getRegionServer();
-        List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-        if (regions.isEmpty()) continue;
-        region = regions.get(0);
-        regionSet.add(region);
-        dstRS = rsts.get((i+1) % NUM_RS).getRegionServer();
-        break;
-      }
-
-      slm.markRegionsRecovering(hrs.getServerName(), regionSet);
-      // move region in order for the region opened in recovering state
-      final RegionInfo hri = region;
-      final HRegionServer tmpRS = dstRS;
-      TEST_UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
-          Bytes.toBytes(dstRS.getServerName().getServerName()));
-      // wait for region move completes
-      final RegionStates regionStates =
-          TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
-      TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          ServerName sn = regionStates.getRegionServerOfRegion(hri);
-          return (sn != null && sn.equals(tmpRS.getServerName()));
-        }
-      });
-
-      try {
-        byte[] key = region.getStartKey();
-        if (key == null || key.length == 0) {
-          key = new byte[] { 0, 0, 0, 0, 1 };
-        }
-        Put put = new Put(key);
-        put.addColumn(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'});
-        ht.put(put);
-      } catch (IOException ioe) {
-        Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException);
-        RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe;
-        boolean foundRegionInRecoveryException = false;
-        for (Throwable t : re.getCauses()) {
-          if (t instanceof RegionInRecoveryException) {
-            foundRegionInRecoveryException = true;
-            break;
-          }
-        }
-        Assert.assertTrue(
-            "No RegionInRecoveryException. Following exceptions returned=" + re.getCauses(),
-            foundRegionInRecoveryException);
-      }
-    } finally {
-      if (ht != null) ht.close();
-      if (ht != null) zkw.close();
-    }
-  }
-
   /**
    * The original intention of this test was to force an abort of a region
    * server and to make sure that the failure path in the region servers is
@@ -1012,7 +337,8 @@ public class TestDistributedLogSplitting {
    * detects that the region server has aborted.
    * @throws Exception
    */
-  @Ignore ("Disabled because flakey") @Test (timeout=300000)
+  // Was marked flaky before Distributed Log Replay cleanup.
+  @Test (timeout=300000)
   public void testWorkerAbort() throws Exception {
     LOG.info("testWorkerAbort");
     startCluster(3);
@@ -1112,16 +438,6 @@ public class TestDistributedLogSplitting {
         Thread.sleep(200);
       }
 
-      // wait for all regions are fully recovered
-      TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
-              zkw.znodePaths.recoveringRegionsZNode, false);
-          return (recoveringRegions != null && recoveringRegions.isEmpty());
-        }
-      });
-
       assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION,
           TEST_UTIL.countRows(ht));
     } finally {
@@ -1200,255 +516,6 @@ public class TestDistributedLogSplitting {
     }
   }
 
-  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
-  public void testMetaRecoveryInZK() throws Exception {
-    LOG.info("testMetaRecoveryInZK");
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-    startCluster(NUM_RS);
-
-    // turn off load balancing to prevent regions from moving around otherwise
-    // they will consume recovered.edits
-    master.balanceSwitch(false);
-    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
-
-    // only testing meta recovery in ZK operation
-    HRegionServer hrs = findRSToKill(true, null);
-    List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-
-    LOG.info("#regions = " + regions.size());
-    Set<RegionInfo> tmpRegions = new HashSet<>();
-    tmpRegions.add(RegionInfoBuilder.FIRST_META_REGIONINFO);
-    master.getMasterWalManager().prepareLogReplay(hrs.getServerName(), tmpRegions);
-    Set<RegionInfo> userRegionSet = new HashSet<>();
-    userRegionSet.addAll(regions);
-    master.getMasterWalManager().prepareLogReplay(hrs.getServerName(), userRegionSet);
-    boolean isMetaRegionInRecovery = false;
-    List<String> recoveringRegions =
-        zkw.getRecoverableZooKeeper().getChildren(zkw.znodePaths.recoveringRegionsZNode, false);
-    for (String curEncodedRegionName : recoveringRegions) {
-      if (curEncodedRegionName.equals(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName())) {
-        isMetaRegionInRecovery = true;
-        break;
-      }
-    }
-    assertTrue(isMetaRegionInRecovery);
-
-    master.getMasterWalManager().splitMetaLog(hrs.getServerName());
-
-    isMetaRegionInRecovery = false;
-    recoveringRegions =
-        zkw.getRecoverableZooKeeper().getChildren(zkw.znodePaths.recoveringRegionsZNode, false);
-    for (String curEncodedRegionName : recoveringRegions) {
-      if (curEncodedRegionName.equals(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName())) {
-        isMetaRegionInRecovery = true;
-        break;
-      }
-    }
-    // meta region should be recovered
-    assertFalse(isMetaRegionInRecovery);
-    zkw.close();
-  }
-
-  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
-  public void testSameVersionUpdatesRecovery() throws Exception {
-    LOG.info("testSameVersionUpdatesRecovery");
-    conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-    startCluster(NUM_RS);
-    final AtomicLong sequenceId = new AtomicLong(100);
-    final int NUM_REGIONS_TO_CREATE = 40;
-    final int NUM_LOG_LINES = 1000;
-    // turn off load balancing to prevent regions from moving around otherwise
-    // they will consume recovered.edits
-    master.balanceSwitch(false);
-
-    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
-    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
-    Table ht = installTable(zkw, name.getMethodName(), "family", NUM_REGIONS_TO_CREATE);
-    try {
-      List<RegionInfo> regions = null;
-      HRegionServer hrs = null;
-      for (int i = 0; i < NUM_RS; i++) {
-        boolean isCarryingMeta = false;
-        hrs = rsts.get(i).getRegionServer();
-        regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-        for (RegionInfo region : regions) {
-          if (region.isMetaRegion()) {
-            isCarryingMeta = true;
-            break;
-          }
-        }
-        if (isCarryingMeta) {
-          continue;
-        }
-        break;
-      }
-
-      LOG.info("#regions = " + regions.size());
-      Iterator<RegionInfo> it = regions.iterator();
-      while (it.hasNext()) {
-        RegionInfo region = it.next();
-        if (region.isMetaRegion()
-            || region.getEncodedName().equals(
-            RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName())) {
-          it.remove();
-        }
-      }
-      if (regions.isEmpty()) return;
-      RegionInfo curRegionInfo = regions.get(0);
-      byte[] startRow = curRegionInfo.getStartKey();
-      if (startRow == null || startRow.length == 0) {
-        startRow = new byte[] { 0, 0, 0, 0, 1 };
-      }
-      byte[] row = Bytes.incrementBytes(startRow, 1);
-      // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key
-      row = Arrays.copyOfRange(row, 3, 8);
-      long value = 0;
-      TableName tableName = TableName.valueOf(name.getMethodName());
-      byte[] family = Bytes.toBytes("family");
-      byte[] qualifier = Bytes.toBytes("c1");
-      long timeStamp = System.currentTimeMillis();
-      HTableDescriptor htd = new HTableDescriptor(tableName);
-      htd.addFamily(new HColumnDescriptor(family));
-      final WAL wal = hrs.getWAL(curRegionInfo);
-      for (int i = 0; i < NUM_LOG_LINES; i += 1) {
-        WALEdit e = new WALEdit();
-        value++;
-        e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
-        wal.append(curRegionInfo, new WALKey(curRegionInfo.getEncodedNameAsBytes(), tableName,
-            System.currentTimeMillis()), e, true);
-      }
-      wal.sync();
-      wal.shutdown();
-
-      // wait for abort completes
-      this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
-
-      // verify we got the last value
-      LOG.info("Verification Starts...");
-      Get g = new Get(row);
-      Result r = ht.get(g);
-      long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
-      assertEquals(value, theStoredVal);
-
-      // after flush
-      LOG.info("Verification after flush...");
-      TEST_UTIL.getAdmin().flush(tableName);
-      r = ht.get(g);
-      theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
-      assertEquals(value, theStoredVal);
-    } finally {
-      if (ht != null) ht.close();
-      if (zkw != null) zkw.close();
-    }
-  }
-
-  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000)
-  public void testSameVersionUpdatesRecoveryWithCompaction() throws Exception {
-    LOG.info("testSameVersionUpdatesRecoveryWithWrites");
-    conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
-    conf.setInt("hbase.hstore.compactionThreshold", 3);
-    startCluster(NUM_RS);
-    final AtomicLong sequenceId = new AtomicLong(100);
-    final int NUM_REGIONS_TO_CREATE = 40;
-    final int NUM_LOG_LINES = 2000;
-    // turn off load balancing to prevent regions from moving around otherwise
-    // they will consume recovered.edits
-    master.balanceSwitch(false);
-
-    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
-    final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
-    Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
-    try {
-      List<RegionInfo> regions = null;
-      HRegionServer hrs = null;
-      for (int i = 0; i < NUM_RS; i++) {
-        boolean isCarryingMeta = false;
-        hrs = rsts.get(i).getRegionServer();
-        regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-        for (RegionInfo region : regions) {
-          if (region.isMetaRegion()) {
-            isCarryingMeta = true;
-            break;
-          }
-        }
-        if (isCarryingMeta) {
-          continue;
-        }
-        break;
-      }
-
-      LOG.info("#regions = " + regions.size());
-      Iterator<RegionInfo> it = regions.iterator();
-      while (it.hasNext()) {
-        RegionInfo region = it.next();
-        if (region.isMetaRegion()
-            || region.getEncodedName().equals(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName())) {
-          it.remove();
-        }
-      }
-      if (regions.isEmpty()) return;
-      RegionInfo curRegionInfo = regions.get(0);
-      byte[] startRow = curRegionInfo.getStartKey();
-      if (startRow == null || startRow.length == 0) {
-        startRow = new byte[] { 0, 0, 0, 0, 1 };
-      }
-      byte[] row = Bytes.incrementBytes(startRow, 1);
-      // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key
-      row = Arrays.copyOfRange(row, 3, 8);
-      long value = 0;
-      final TableName tableName = TableName.valueOf(name.getMethodName());
-      byte[] family = Bytes.toBytes("family");
-      byte[] qualifier = Bytes.toBytes("c1");
-      long timeStamp = System.currentTimeMillis();
-      HTableDescriptor htd = new HTableDescriptor(tableName);
-      htd.addFamily(new HColumnDescriptor(family));
-      final WAL wal = hrs.getWAL(curRegionInfo);
-      for (int i = 0; i < NUM_LOG_LINES; i += 1) {
-        WALEdit e = new WALEdit();
-        value++;
-        e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
-        wal.append(curRegionInfo, new WALKey(curRegionInfo.getEncodedNameAsBytes(),
-            tableName, System.currentTimeMillis()), e, true);
-      }
-      wal.sync();
-      wal.shutdown();
-
-      // wait for abort completes
-      this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
-
-      // verify we got the last value
-      LOG.info("Verification Starts...");
-      Get g = new Get(row);
-      Result r = ht.get(g);
-      long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
-      assertEquals(value, theStoredVal);
-
-      // after flush & compaction
-      LOG.info("Verification after flush...");
-      TEST_UTIL.getAdmin().flush(tableName);
-      TEST_UTIL.getAdmin().compact(tableName);
-
-      // wait for compaction completes
-      TEST_UTIL.waitFor(30000, 200, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return (TEST_UTIL.getAdmin()
-              .getCompactionState(tableName) == CompactionState.NONE);
-        }
-      });
-
-      r = ht.get(g);
-      theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
-      assertEquals(value, theStoredVal);
-    } finally {
-      if (ht != null) ht.close();
-      if (zkw != null) zkw.close();
-    }
-  }
-
   @Test(timeout = 300000)
   public void testReadWriteSeqIdFiles() throws Exception {
     LOG.info("testReadWriteSeqIdFiles");
@@ -1578,8 +645,6 @@ public class TestDistributedLogSplitting {
     TableName fullTName = TableName.valueOf(tname);
     // remove root and meta region
     regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO);
-    // using one sequenceId for edits across all regions is ok.
-    final AtomicLong sequenceId = new AtomicLong(10);
 
 
     for(Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext(); ) {
@@ -1608,6 +673,7 @@ public class TestDistributedLogSplitting {
     int[] counts = new int[n];
     // sync every ~30k to line up with desired wal rolls
     final int syncEvery = 30 * 1024 / edit_size;
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
     if (n > 0) {
       for (int i = 0; i < num_edits; i += 1) {
         WALEdit e = new WALEdit();
@@ -1619,13 +685,12 @@ public class TestDistributedLogSplitting {
         }
         byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
         row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
-        // HBaseTestingUtility.createMultiRegions use 5 bytes
-        // key
+        // HBaseTestingUtility.createMultiRegions use 5 bytes key
         byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
         e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
         log.append(curRegionInfo,
             new WALKey(curRegionInfo.getEncodedNameAsBytes(), fullTName,
-                System.currentTimeMillis()), e, true);
+                System.currentTimeMillis(), mvcc), e, true);
         if (0 == i % syncEvery) {
           log.sync();
         }
@@ -1688,37 +753,6 @@ public class TestDistributedLogSplitting {
     }
   }
 
-  /**
-   * Load table with puts and deletes with expected values so that we can verify later
-   */
-  private void prepareData(final Table t, final byte[] f, final byte[] column) throws IOException {
-    byte[] k = new byte[3];
-
-    // add puts
-    List<Put> puts = new ArrayList<>();
-    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
-      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
-        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
-          k[0] = b1;
-          k[1] = b2;
-          k[2] = b3;
-          Put put = new Put(k);
-          put.addColumn(f, column, k);
-          puts.add(put);
-        }
-      }
-    }
-    t.put(puts);
-    // add deletes
-    for (byte b3 = 'a'; b3 <= 'z'; b3++) {
-      k[0] = 'a';
-      k[1] = 'a';
-      k[2] = b3;
-      Delete del = new Delete(k);
-      t.delete(del);
-    }
-  }
-
   private void waitForCounter(LongAdder ctr, long oldval, long newval,
       long timems) {
     long curt = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java
deleted file mode 100644
index 8641b20..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.SplitLogTask;
-import org.apache.hadoop.hbase.testclassification.MasterTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-/**
- * Test the master wal manager in a local cluster
- */
-@Category({MasterTests.class, MediumTests.class})
-public class TestMasterWalManager {
-  private static final Log LOG = LogFactory.getLog(TestMasterWalManager.class);
-  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-
-  @BeforeClass
-  public static void setupTest() throws Exception {
-    UTIL.startMiniCluster();
-  }
-
-  @AfterClass
-  public static void teardownTest() throws Exception {
-    UTIL.shutdownMiniCluster();
-  }
-
-  @Test
-  public void testRemoveStaleRecoveringRegionsDuringMasterInitialization() throws Exception {
-    // this test is for when distributed log replay is enabled
-    if (!UTIL.getConfiguration().getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false)) return;
-
-    LOG.info("Starting testRemoveStaleRecoveringRegionsDuringMasterInitialization");
-    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
-    MasterWalManager mwm = master.getMasterWalManager();
-
-    String failedRegion = "failedRegoin1";
-    String staleRegion = "staleRegion";
-    ServerName inRecoveryServerName = ServerName.valueOf("mgr,1,1");
-    ServerName previouselyFaildServerName = ServerName.valueOf("previous,1,1");
-    String walPath = "/hbase/data/.logs/" + inRecoveryServerName.getServerName()
-        + "-splitting/test";
-    // Create a ZKW to use in the test
-    ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(UTIL);
-    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, walPath),
-      new SplitLogTask.Owned(inRecoveryServerName).toByteArray(),
-        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-    String staleRegionPath = ZKUtil.joinZNode(zkw.znodePaths.recoveringRegionsZNode, staleRegion);
-    ZKUtil.createWithParents(zkw, staleRegionPath);
-    String inRecoveringRegionPath = ZKUtil.joinZNode(zkw.znodePaths.recoveringRegionsZNode,
-      failedRegion);
-    inRecoveringRegionPath = ZKUtil.joinZNode(inRecoveringRegionPath,
-      inRecoveryServerName.getServerName());
-    ZKUtil.createWithParents(zkw, inRecoveringRegionPath);
-    Set<ServerName> servers = new HashSet<>();
-    servers.add(previouselyFaildServerName);
-    mwm.removeStaleRecoveringRegionsFromZK(servers);
-
-    // verification
-    assertFalse(ZKUtil.checkExists(zkw, staleRegionPath) != -1);
-    assertTrue(ZKUtil.checkExists(zkw, inRecoveringRegionPath) != -1);
-
-    ZKUtil.deleteChildrenRecursively(zkw, zkw.znodePaths.recoveringRegionsZNode);
-    ZKUtil.deleteChildrenRecursively(zkw, zkw.znodePaths.splitLogZNode);
-    zkw.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
index 3dcd849..ca64326 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
@@ -36,7 +36,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.LongAdder;
@@ -49,7 +48,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
@@ -59,7 +57,6 @@ import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
 import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
@@ -73,7 +70,6 @@ import org.apache.zookeeper.ZooDefs.Ids;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
@@ -93,7 +89,6 @@ public class TestSplitLogManager {
   private SplitLogManager slm;
   private Configuration conf;
   private int to;
-  private RecoveryMode mode;
 
   private static HBaseTestingUtility TEST_UTIL;
 
@@ -153,10 +148,6 @@ public class TestSplitLogManager {
 
     conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
     to = to + 16 * 100;
-
-    this.mode =
-        (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY
-            : RecoveryMode.LOG_SPLITTING);
   }
 
   @After
@@ -245,7 +236,7 @@ public class TestSplitLogManager {
     LOG.info("TestOrphanTaskAcquisition");
 
     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
-    SplitLogTask slt = new SplitLogTask.Owned(master.getServerName(), this.mode);
+    SplitLogTask slt = new SplitLogTask.Owned(master.getServerName());
     zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
@@ -270,7 +261,7 @@ public class TestSplitLogManager {
         " startup");
     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
     //create an unassigned orphan task
-    SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName(), this.mode);
+    SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName());
     zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
     int version = ZKUtil.checkExists(zkw, tasknode);
@@ -305,19 +296,19 @@ public class TestSplitLogManager {
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
     final ServerName worker2 = ServerName.valueOf("worker2,1,1");
     final ServerName worker3 = ServerName.valueOf("worker3,1,1");
-    SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
+    SplitLogTask slt = new SplitLogTask.Owned(worker1);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
     waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
     int version1 = ZKUtil.checkExists(zkw, tasknode);
     assertTrue(version1 > version);
-    slt = new SplitLogTask.Owned(worker2, this.mode);
+    slt = new SplitLogTask.Owned(worker2);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
     waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
     int version2 = ZKUtil.checkExists(zkw, tasknode);
     assertTrue(version2 > version1);
-    slt = new SplitLogTask.Owned(worker3, this.mode);
+    slt = new SplitLogTask.Owned(worker3);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
     waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
@@ -335,7 +326,7 @@ public class TestSplitLogManager {
     String tasknode = submitTaskAndWait(batch, "foo/1");
     int version = ZKUtil.checkExists(zkw, tasknode);
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-    SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
+    SplitLogTask slt = new SplitLogTask.Owned(worker1);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
     waitForCounter(new Expr() {
@@ -362,7 +353,7 @@ public class TestSplitLogManager {
     TaskBatch batch = new TaskBatch();
     String tasknode = submitTaskAndWait(batch, "foo/1");
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-    SplitLogTask slt = new SplitLogTask.Done(worker1, this.mode);
+    SplitLogTask slt = new SplitLogTask.Done(worker1);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     synchronized (batch) {
       while (batch.installed != batch.done) {
@@ -383,7 +374,7 @@ public class TestSplitLogManager {
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-    SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode);
+    SplitLogTask slt = new SplitLogTask.Err(worker1);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
 
     synchronized (batch) {
@@ -407,7 +398,7 @@ public class TestSplitLogManager {
     assertEquals(tot_mgr_resubmit.sum(), 0);
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
     assertEquals(tot_mgr_resubmit.sum(), 0);
-    SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode);
+    SplitLogTask slt = new SplitLogTask.Resigned(worker1);
     assertEquals(tot_mgr_resubmit.sum(), 0);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     ZKUtil.checkExists(zkw, tasknode);
@@ -430,7 +421,7 @@ public class TestSplitLogManager {
     // create an orphan task in OWNED state
     String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-    SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
+    SplitLogTask slt = new SplitLogTask.Owned(worker1);
     zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
@@ -445,7 +436,7 @@ public class TestSplitLogManager {
     for (int i = 0; i < (3 * to)/100; i++) {
       Thread.sleep(100);
       final ServerName worker2 = ServerName.valueOf("worker1,1,1");
-      slt = new SplitLogTask.Owned(worker2, this.mode);
+      slt = new SplitLogTask.Owned(worker2);
       ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
     }
 
@@ -469,7 +460,7 @@ public class TestSplitLogManager {
     String tasknode = submitTaskAndWait(batch, "foo/1");
     int version = ZKUtil.checkExists(zkw, tasknode);
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-    SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
+    SplitLogTask slt = new SplitLogTask.Owned(worker1);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
     slm.handleDeadWorker(worker1);
@@ -494,7 +485,7 @@ public class TestSplitLogManager {
     String tasknode = submitTaskAndWait(batch, "foo/1");
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
 
-    SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
+    SplitLogTask slt = new SplitLogTask.Owned(worker1);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
 
@@ -544,7 +535,7 @@ public class TestSplitLogManager {
         while (!done) {
           for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
             final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-            SplitLogTask slt = new SplitLogTask.Done(worker1, RecoveryMode.LOG_SPLITTING);
+            SplitLogTask slt = new SplitLogTask.Done(worker1);
             boolean encounteredZKException = false;
             try {
               ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
@@ -564,50 +555,4 @@ public class TestSplitLogManager {
 
     assertFalse(fs.exists(logDirPath));
   }
-
-  /**
-   * The following test case is aiming to test the situation when distributedLogReplay is turned off
-   * and restart a cluster there should no recovery regions in ZK left.
-   * @throws Exception
-   */
-  @Test(timeout = 300000)
-  public void testRecoveryRegionRemovedFromZK() throws Exception {
-    LOG.info("testRecoveryRegionRemovedFromZK");
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
-    String nodePath =
-        ZKUtil.joinZNode(zkw.znodePaths.recoveringRegionsZNode,
-          HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
-    ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L));
-
-    slm = new SplitLogManager(master, conf);
-    slm.removeStaleRecoveringRegions(null);
-
-    List<String> recoveringRegions =
-        zkw.getRecoverableZooKeeper().getChildren(zkw.znodePaths.recoveringRegionsZNode, false);
-
-    assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty());
-  }
-
-  @Ignore("DLR is broken by HBASE-12751") @Test(timeout=60000)
-  public void testGetPreviousRecoveryMode() throws Exception {
-    LOG.info("testGetPreviousRecoveryMode");
-    SplitLogCounters.resetCounters();
-    // Not actually enabling DLR for the cluster, just for the ZkCoordinatedStateManager to use.
-    // The test is just manipulating ZK manually anyways.
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-
-    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"),
-      new SplitLogTask.Unassigned(
-        ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(),
-        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-    slm = new SplitLogManager(master, conf);
-    LOG.info("Mode1=" + slm.getRecoveryMode());
-    assertTrue(slm.isLogSplitting());
-    zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);
-    LOG.info("Mode2=" + slm.getRecoveryMode());
-    slm.setRecoveryMode(false);
-    LOG.info("Mode3=" + slm.getRecoveryMode());
-    assertTrue("Mode4=" + slm.getRecoveryMode(), slm.isLogReplaying());
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index fec7151..127f949 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -48,7 +48,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -134,7 +133,6 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
 import org.apache.hadoop.hbase.regionserver.Region.RowLock;
 import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
-import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
@@ -176,7 +174,6 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -5902,93 +5899,6 @@ public class TestHRegion {
     }
   }
 
-  @Test
-  public void testOpenRegionWrittenToWALForLogReplay() throws Exception {
-    // similar to the above test but with distributed log replay
-    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
-    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
-
-    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
-    htd.addFamily(new HColumnDescriptor(fam1));
-    htd.addFamily(new HColumnDescriptor(fam2));
-
-    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
-      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
-
-    // open the region w/o rss and wal and flush some files
-    HRegion region =
-         HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
-             .getConfiguration(), htd);
-    assertNotNull(region);
-
-    // create a file in fam1 for the region before opening in OpenRegionHandler
-    region.put(new Put(Bytes.toBytes("a")).addColumn(fam1, fam1, fam1));
-    region.flush(true);
-    HBaseTestingUtility.closeRegionAndWAL(region);
-
-    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
-
-    // capture append() calls
-    WAL wal = mockWAL();
-    when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
-
-    // add the region to recovering regions
-    HashMap<String, HRegion> recoveringRegions = Maps.newHashMap();
-    recoveringRegions.put(region.getRegionInfo().getEncodedName(), null);
-    when(rss.getRecoveringRegions()).thenReturn(recoveringRegions);
-
-    try {
-      Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
-      conf.set(HConstants.REGION_IMPL, HRegionWithSeqId.class.getName());
-      region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
-        conf, rss, null);
-
-      // verify that we have not appended region open event to WAL because this region is still
-      // recovering
-      verify(wal, times(0)).append((HRegionInfo)any(), (WALKey)any()
-        , editCaptor.capture(), anyBoolean());
-
-      // not put the region out of recovering state
-      new FinishRegionRecoveringHandler(rss, region.getRegionInfo().getEncodedName(), "/foo")
-        .prepare().process();
-
-      // now we should have put the entry
-      verify(wal, times(1)).append((HRegionInfo)any(), (WALKey)any()
-        , editCaptor.capture(), anyBoolean());
-
-      WALEdit edit = editCaptor.getValue();
-      assertNotNull(edit);
-      assertNotNull(edit.getCells());
-      assertEquals(1, edit.getCells().size());
-      RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
-      assertNotNull(desc);
-
-      LOG.info("RegionEventDescriptor from WAL: " + desc);
-
-      assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
-      assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
-      assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
-        hri.getEncodedNameAsBytes()));
-      assertTrue(desc.getLogSequenceNumber() > 0);
-      assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
-      assertEquals(2, desc.getStoresCount());
-
-      StoreDescriptor store = desc.getStores(0);
-      assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
-      assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
-      assertEquals(1, store.getStoreFileCount()); // 1store file
-      assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
-
-      store = desc.getStores(1);
-      assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
-      assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
-      assertEquals(0, store.getStoreFileCount()); // no store files
-
-    } finally {
-      HBaseTestingUtility.closeRegionAndWAL(region);
-    }
-  }
-
   /**
    * Utility method to setup a WAL mock.
    * Needs to do the bit where we close latch on the WALKey on append else test hangs.

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
index 49a61c5..b8155e4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -417,20 +416,9 @@ public class TestPerColumnFamilyFlush {
     }
   }
 
-  // Test Log Replay with Distributed Replay on.
-  // In distributed log replay, the log splitters ask the master for the
-  // last flushed sequence id for a region. This test would ensure that we
-  // are doing the book-keeping correctly.
-  @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 180000)
-  public void testLogReplayWithDistributedReplay() throws Exception {
-    TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-    doTestLogReplay();
-  }
-
   // Test Log Replay with Distributed log split on.
   @Test(timeout = 180000)
   public void testLogReplayWithDistributedLogSplit() throws Exception {
-    TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
     doTestLogReplay();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
index ffcc5c0..7a6e2fb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
@@ -22,8 +22,6 @@ package org.apache.hadoop.hbase.regionserver;
 import static org.junit.Assert.*;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -57,14 +55,10 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 /**
  * Tests failover of secondary region replicas.
  */
-@RunWith(Parameterized.class)
 @Category(LargeTests.class)
 public class TestRegionReplicaFailover {
 
@@ -90,19 +84,6 @@ public class TestRegionReplicaFailover {
 
   private HTableDescriptor htd;
 
-  /*
-   * We are testing with dist log split and dist log replay separately
-   */
-  @Parameters
-  public static Collection<Object[]> getParameters() {
-    Object[][] params =
-        new Boolean[][] { /*{true}, Disable DLR!!! It is going to be removed*/ {false} };
-    return Arrays.asList(params);
-  }
-
-  @Parameterized.Parameter(0)
-  public boolean distributedLogReplay;
-
   @Before
   public void before() throws Exception {
     Configuration conf = HTU.getConfiguration();
@@ -112,7 +93,6 @@ public class TestRegionReplicaFailover {
     conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true);
     conf.setInt("replication.stats.thread.period.seconds", 5);
     conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
 
     HTU.startMiniCluster(NB_SERVERS);
     htd = HTU.createTableDescriptor(

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
index f8b9f6e..1f7320e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
@@ -153,7 +153,7 @@ public class TestRegionServerNoMaster {
   public static void openRegion(HBaseTestingUtility HTU, HRegionServer rs, HRegionInfo hri)
       throws Exception {
     AdminProtos.OpenRegionRequest orr =
-      RequestConverter.buildOpenRegionRequest(rs.getServerName(), hri, null, null);
+      RequestConverter.buildOpenRegionRequest(rs.getServerName(), hri, null);
     AdminProtos.OpenRegionResponse responseOpen = rs.rpcServices.openRegion(null, orr);
 
     Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
@@ -295,7 +295,7 @@ public class TestRegionServerNoMaster {
     closeRegionNoZK();
     try {
       AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
-        earlierServerName, hri, null, null);
+        earlierServerName, hri, null);
       getRS().getRSRpcServices().openRegion(null, orr);
       Assert.fail("The openRegion should have been rejected");
     } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException se) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index 40077f9..3eec9d8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
@@ -47,7 +46,6 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.executor.ExecutorType;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -78,7 +76,6 @@ public class TestSplitLogWorker {
   private ZooKeeperWatcher zkw;
   private SplitLogWorker slw;
   private ExecutorService executorService;
-  private RecoveryMode mode;
 
   class DummyServer implements Server {
     private ZooKeeperWatcher zkw;
@@ -212,8 +209,6 @@ public class TestSplitLogWorker {
     SplitLogCounters.resetCounters();
     executorService = new ExecutorService("TestSplitLogWorker");
     executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
-    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
-        RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
   }
 
   @After
@@ -228,7 +223,7 @@ public class TestSplitLogWorker {
     new SplitLogWorker.TaskExecutor() {
 
       @Override
-      public Status exec(String name, RecoveryMode mode, CancelableProgressable p) {
+      public Status exec(String name, CancelableProgressable p) {
         while (true) {
           try {
             Thread.sleep(1000);
@@ -251,7 +246,7 @@ public class TestSplitLogWorker {
     final ServerName RS = ServerName.valueOf("rs,1,1");
     RegionServerServices mockedRS = getRegionServer(RS);
     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
-      new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
+      new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
         Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
@@ -287,7 +282,7 @@ public class TestSplitLogWorker {
     final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
     final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
-      new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
+      new SplitLogTask.Unassigned(MANAGER).toByteArray(),
         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     RegionServerServices mockedRS1 = getRegionServer(SVR1);
     RegionServerServices mockedRS2 = getRegionServer(SVR2);
@@ -330,7 +325,7 @@ public class TestSplitLogWorker {
 
       // this time create a task node after starting the splitLogWorker
       zkw.getRecoverableZooKeeper().create(PATH,
-        new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
+        new SplitLogTask.Unassigned(MANAGER).toByteArray(),
         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
@@ -338,7 +333,7 @@ public class TestSplitLogWorker {
       byte [] bytes = ZKUtil.getData(zkw, PATH);
       SplitLogTask slt = SplitLogTask.parseFrom(bytes);
       assertTrue(slt.isOwned(SRV));
-      slt = new SplitLogTask.Owned(MANAGER, this.mode);
+      slt = new SplitLogTask.Owned(MANAGER);
       ZKUtil.setData(zkw, PATH, slt.toByteArray());
       waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
     } finally {
@@ -362,7 +357,7 @@ public class TestSplitLogWorker {
       waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
 
       SplitLogTask unassignedManager =
-        new SplitLogTask.Unassigned(MANAGER, this.mode);
+        new SplitLogTask.Unassigned(MANAGER);
       zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
@@ -376,7 +371,7 @@ public class TestSplitLogWorker {
 
       // preempt the first task, have it owned by another worker
       final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
-      SplitLogTask slt = new SplitLogTask.Owned(anotherWorker, this.mode);
+      SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
       ZKUtil.setData(zkw, PATH1, slt.toByteArray());
       waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
 
@@ -402,7 +397,7 @@ public class TestSplitLogWorker {
     Thread.sleep(100);
 
     String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
-    SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER, this.mode);
+    SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER);
     zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
       CreateMode.PERSISTENT);
 
@@ -454,7 +449,7 @@ public class TestSplitLogWorker {
     RegionServerServices mockedRS = getRegionServer(RS);
     for (int i = 0; i < maxTasks; i++) {
       zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
-        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
+        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
           Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
@@ -497,7 +492,7 @@ public class TestSplitLogWorker {
 
     for (int i = 0; i < maxTasks; i++) {
       zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
-        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
+        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
           Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
@@ -521,8 +516,6 @@ public class TestSplitLogWorker {
 
   /**
    * Create a mocked region server service instance
-   * @param server
-   * @return
    */
   private RegionServerServices getRegionServer(ServerName name) {
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
index 99cf91d..4db5734 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
@@ -79,7 +79,6 @@ public class TestSplitWalDataLoss {
   @Before
   public void setUp() throws Exception {
     testUtil.getConfiguration().setInt("hbase.regionserver.msginterval", 30000);
-    testUtil.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
     testUtil.startMiniCluster(2);
     Admin admin = testUtil.getAdmin();
     admin.createNamespace(namespace);

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index 283b85d..5acbf23 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -112,8 +112,6 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-
 /**
  * Test replay of edits out of a WAL split.
  */
@@ -127,7 +125,6 @@ public abstract class AbstractTestWALReplay {
   private Path logDir;
   private FileSystem fs;
   private Configuration conf;
-  private RecoveryMode mode;
   private WALFactory wals;
 
   @Rule
@@ -165,9 +162,6 @@ public abstract class AbstractTestWALReplay {
     if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
       TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
     }
-    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
-            HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG) ?
-        RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
     this.wals = new WALFactory(conf, null, currentTest.getMethodName());
   }
 
@@ -908,7 +902,7 @@ public abstract class AbstractTestWALReplay {
     assertNotNull(listStatus);
     assertTrue(listStatus.length > 0);
     WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
-        this.fs, this.conf, null, null, null, null, mode, wals);
+        this.fs, this.conf, null, null, null, wals);
     FileStatus[] listStatus1 = this.fs.listStatus(
       new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(),
           "recovered.edits")), new PathFilter() {
@@ -1062,10 +1056,8 @@ public abstract class AbstractTestWALReplay {
       first = fs.getFileStatus(smallFile);
       second = fs.getFileStatus(largeFile);
     }
-    WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, null,
-      RecoveryMode.LOG_SPLITTING, wals);
-    WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, null,
-      RecoveryMode.LOG_SPLITTING, wals);
+    WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals);
+    WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals);
     WAL wal = createWAL(this.conf, hbaseRootDir, logName);
     region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
     assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index 1089b7a..45682fc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -276,56 +276,4 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
     closeRegion(HTU, rs0, hriSecondary);
     connection.close();
   }
-
-  @Test (timeout = 240000)
-  public void testReplayedEditsAreSkipped() throws Exception {
-    openRegion(HTU, rs0, hriSecondary);
-    ClusterConnection connection =
-        (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
-    RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
-
-    ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
-    when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
-    when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
-
-    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
-    when(mockPeer.getNamespaces()).thenReturn(null);
-    when(mockPeer.getTableCFs()).thenReturn(null);
-    when(mockPeer.getPeerConfig()).thenReturn(new ReplicationPeerConfig());
-    when(context.getReplicationPeer()).thenReturn(mockPeer);
-
-    replicator.init(context);
-    replicator.startAsync();
-
-    // test the filter for the RE, not actual replication
-    WALEntryFilter filter = replicator.getWALEntryfilter();
-
-    //load some data to primary
-    HTU.loadNumericRows(table, f, 0, 1000);
-
-    Assert.assertEquals(1000, entries.size());
-    for (Entry e: entries) {
-      Cell _c = e.getEdit().getCells().get(0);
-      if (Integer.parseInt(
-        Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())) % 2 == 0) {
-        e.getKey().setOrigLogSeqNum(1); // simulate dist log replay by setting orig seq id
-      }
-    }
-
-    long skipped = 0, replayed = 0;
-    for (Entry e : entries) {
-      if (filter.filter(e) == null) {
-        skipped++;
-      } else {
-        replayed++;
-      }
-    }
-
-    assertEquals(500, skipped);
-    assertEquals(500, replayed);
-
-    HTU.deleteNumericRows(table, f, 0, 1000);
-    closeRegion(HTU, rs0, hriSecondary);
-    connection.close();
-  }
 }