You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/05/15 14:54:03 UTC

[hbase] 03/16: HBASE-27214 Implement the new replication hfile/log cleaner (#4722)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 6d0311c1d968de2750e9efacaaf5f71fb3bb2108
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Aug 31 21:24:09 2022 +0800

    HBASE-27214 Implement the new replication hfile/log cleaner (#4722)
    
    Signed-off-by: Xin Sun <dd...@gmail.com>
---
 .../hbase/master/cleaner/FileCleanerDelegate.java  |   2 +-
 .../hadoop/hbase/master/region/MasterRegion.java   |   2 +-
 .../hbase/master/replication/AddPeerProcedure.java |  21 +-
 .../master/replication/ReplicationPeerManager.java |   8 +
 .../hadoop/hbase/regionserver/HRegionServer.java   |   2 +-
 .../hbase/replication/ReplicationOffsetUtil.java   |  47 +++
 .../replication/master/ReplicationLogCleaner.java  | 234 +++++++++----
 .../master/ReplicationLogCleanerBarrier.java       |  85 +++++
 .../regionserver/ReplicationSourceManager.java     |  18 +-
 .../regionserver/ReplicationSyncUp.java            |   5 +-
 .../hadoop/hbase/wal/AbstractFSWALProvider.java    |  29 ++
 .../org/apache/hadoop/hbase/wal/WALFactory.java    |  29 +-
 .../hbase/master/cleaner/TestLogsCleaner.java      | 227 +++++-------
 .../cleaner/TestReplicationHFileCleaner.java       |  43 ++-
 .../replication/TestReplicationOffsetUtil.java     |  52 +++
 .../replication/master/TestLogCleanerBarrier.java  |  60 ++++
 .../master/TestReplicationLogCleaner.java          | 385 +++++++++++++++++++++
 .../regionserver/TestReplicationSourceManager.java |   2 +-
 .../apache/hadoop/hbase/wal/TestWALFactory.java    |   2 +-
 .../apache/hadoop/hbase/wal/TestWALMethods.java    |  14 +
 20 files changed, 1014 insertions(+), 253 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
index d37bb620273..e08f5329433 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
@@ -50,7 +50,7 @@ public interface FileCleanerDelegate extends Configurable, Stoppable {
   }
 
   /**
-   * Used to do some cleanup work
+   * Will be called after cleaner run.
    */
   default void postClean() {
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
index 86c23114458..e45b6271f7b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
@@ -380,7 +380,7 @@ public final class MasterRegion {
       params.archivedWalSuffix(), params.rollPeriodMs(), params.flushSize());
     walRoller.start();
 
-    WALFactory walFactory = new WALFactory(conf, server.getServerName().toString(), server, false);
+    WALFactory walFactory = new WALFactory(conf, server.getServerName(), server, false);
     Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
     Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG);
     Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
index 6d0acee76ca..1d02fab5f19 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -45,6 +44,8 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
 
   private boolean enabled;
 
+  private boolean cleanerDisabled;
+
   public AddPeerProcedure() {
   }
 
@@ -84,15 +85,24 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
 
   @Override
   protected void releaseLatch(MasterProcedureEnv env) {
+    if (cleanerDisabled) {
+      env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
+    }
     if (peerConfig.isSyncReplication()) {
       env.getReplicationPeerManager().releaseSyncReplicationPeerLock();
     }
-    ProcedurePrepareLatch.releaseLatch(latch, this);
+    super.releaseLatch(env);
   }
 
   @Override
   protected void prePeerModification(MasterProcedureEnv env)
     throws IOException, ReplicationException, ProcedureSuspendedException {
+    if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
+      throw suspend(env.getMasterConfiguration(),
+        backoff -> LOG.warn("LogCleaner is run at the same time when adding peer {}, sleep {} secs",
+          peerId, backoff / 1000));
+    }
+    cleanerDisabled = true;
     MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
     if (cpHost != null) {
       cpHost.preAddReplicationPeer(peerId, peerConfig);
@@ -128,9 +138,14 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
   @Override
   protected void afterReplay(MasterProcedureEnv env) {
     if (getCurrentState() == getInitialState()) {
-      // will try to acquire the lock when executing the procedure, no need to acquire it here
+      // do not need to disable log cleaner or acquire lock if we are in the initial state, later
+      // when executing the procedure we will try to disable and acquire.
       return;
     }
+    if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
+      throw new IllegalStateException("can not disable log cleaner, this should not happen");
+    }
+    cleanerDisabled = true;
     if (peerConfig.isSyncReplication()) {
       if (!env.getReplicationPeerManager().tryAcquireSyncReplicationPeerLock()) {
         throw new IllegalStateException(
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 53270bcbb04..57380920d0f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
@@ -102,6 +103,9 @@ public class ReplicationPeerManager implements ConfigurationObserver {
   // Only allow to add one sync replication peer concurrently
   private final Semaphore syncReplicationPeerLock = new Semaphore(1);
 
+  private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier =
+    new ReplicationLogCleanerBarrier();
+
   private final String clusterId;
 
   private volatile Configuration conf;
@@ -705,6 +709,10 @@ public class ReplicationPeerManager implements ConfigurationObserver {
     syncReplicationPeerLock.release();
   }
 
+  public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
+    return replicationLogCleanerBarrier;
+  }
+
   @Override
   public void onConfigurationChange(Configuration conf) {
     this.conf = conf;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 6daae10b726..1bdf6a225c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1733,7 +1733,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
    * be hooked up to WAL.
    */
   private void setupWALAndReplication() throws IOException {
-    WALFactory factory = new WALFactory(conf, serverName.toString(), this, true);
+    WALFactory factory = new WALFactory(conf, serverName, this, true);
     // TODO Replication make assumptions here based on the default filesystem impl
     Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
     String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationOffsetUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationOffsetUtil.java
new file mode 100644
index 00000000000..052c5542d47
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationOffsetUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class ReplicationOffsetUtil {
+
+  private ReplicationOffsetUtil() {
+  }
+
+  public static boolean shouldReplicate(ReplicationGroupOffset offset, String wal) {
+    // if no offset or the offset is just a place marker, replicate
+    if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
+      return true;
+    }
+    // otherwise, compare the timestamp
+    long walTs = AbstractFSWALProvider.getTimestamp(wal);
+    long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
+    if (walTs < startWalTs) {
+      return false;
+    } else if (walTs > startWalTs) {
+      return true;
+    }
+    // if the timestamp equals, usually it means we should include this wal but there is a special
+    // case, a negative offset means the wal has already been fully replicated, so here we should
+    // check the offset.
+    return offset.getOffset() >= 0;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 7135ca9a9b2..f1fd8f8d6b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -17,18 +17,29 @@
  */
 package org.apache.hadoop.hbase.replication.master;
 
-import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
-import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,35 +51,129 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
 /**
  * Implementation of a log cleaner that checks if a log is still scheduled for replication before
  * deleting it when its TTL is over.
+ * <p/>
+ * The logic is a bit complicated after we switch to use table based replication queue storage, see
+ * the design doc in HBASE-27109 and the comments in HBASE-27214 for more details.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class);
-  private ZKWatcher zkw = null;
-  private boolean shareZK = false;
-  private ReplicationQueueStorage queueStorage;
+  private Set<ServerName> notFullyDeadServers;
+  private Set<String> peerIds;
+  // ServerName -> PeerId -> WalGroup -> Offset
+  // Here the server name is the source server name, so we can make sure that there is only one
+  // queue for a given peer, that why we can use a String peerId as key instead of
+  // ReplicationQueueId.
+  private Map<ServerName, Map<String, Map<String, ReplicationGroupOffset>>> replicationOffsets;
+  private ReplicationPeerManager rpm;
+  private Supplier<Set<ServerName>> getNotFullyDeadServers;
+
+  private boolean canFilter;
   private boolean stopped = false;
-  private Set<String> wals;
-  private long readZKTimestamp = 0;
 
   @Override
   public void preClean() {
-    readZKTimestamp = EnvironmentEdgeManager.currentTime();
-    // TODO: revisit the implementation
-    // try {
-    // // The concurrently created new WALs may not be included in the return list,
-    // // but they won't be deleted because they're not in the checking set.
-    // wals = queueStorage.getAllWALs();
-    // } catch (ReplicationException e) {
-    // LOG.warn("Failed to read zookeeper, skipping checking deletable files");
-    // wals = null;
-    // }
+    if (this.getConf() == null) {
+      return;
+    }
+    canFilter = rpm.getReplicationLogCleanerBarrier().start();
+    if (canFilter) {
+      notFullyDeadServers = getNotFullyDeadServers.get();
+      peerIds = rpm.listPeers(null).stream().map(ReplicationPeerDescription::getPeerId)
+        .collect(Collectors.toSet());
+      // must get the not fully dead servers first and then get the replication queue data, in this
+      // way we can make sure that, we should have added the missing replication queues for the dead
+      // region servers recorded in the above set, otherwise the logic in the
+      // filterForDeadRegionServer method may lead us delete wal still in use.
+      List<ReplicationQueueData> allQueueData;
+      try {
+        allQueueData = rpm.getQueueStorage().listAllQueues();
+      } catch (ReplicationException e) {
+        LOG.error("Can not list all replication queues, give up cleaning", e);
+        rpm.getReplicationLogCleanerBarrier().stop();
+        canFilter = false;
+        notFullyDeadServers = null;
+        peerIds = null;
+        return;
+      }
+      replicationOffsets = new HashMap<>();
+      for (ReplicationQueueData queueData : allQueueData) {
+        ReplicationQueueId queueId = queueData.getId();
+        ServerName serverName = queueId.getServerWALsBelongTo();
+        Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets =
+          replicationOffsets.computeIfAbsent(serverName, k -> new HashMap<>());
+        Map<String, ReplicationGroupOffset> offsets =
+          peerId2Offsets.computeIfAbsent(queueId.getPeerId(), k -> new HashMap<>());
+        offsets.putAll(queueData.getOffsets());
+      }
+    } else {
+      LOG.info("Skip replication log cleaner because an AddPeerProcedure is running");
+    }
   }
 
   @Override
   public void postClean() {
-    // release memory
-    wals = null;
+    if (canFilter) {
+      rpm.getReplicationLogCleanerBarrier().stop();
+      canFilter = false;
+      // release memory
+      notFullyDeadServers = null;
+      peerIds = null;
+      replicationOffsets = null;
+    }
+  }
+
+  private boolean shouldDelete(ReplicationGroupOffset offset, FileStatus file) {
+    return !ReplicationOffsetUtil.shouldReplicate(offset, file.getPath().getName());
+  }
+
+  private boolean filterForLiveRegionServer(ServerName serverName, FileStatus file) {
+    Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets =
+      replicationOffsets.get(serverName);
+    if (peerId2Offsets == null) {
+      // if there are replication queues missing, we can not delete the wal
+      return false;
+    }
+    for (String peerId : peerIds) {
+      Map<String, ReplicationGroupOffset> offsets = peerId2Offsets.get(peerId);
+      // if no replication queue for a peer, we can not delete the wal
+      if (offsets == null) {
+        return false;
+      }
+      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName());
+      ReplicationGroupOffset offset = offsets.get(walGroupId);
+      // if a replication queue still need to replicate this wal, we can not delete it
+      if (!shouldDelete(offset, file)) {
+        return false;
+      }
+    }
+    // if all replication queues have already finished replicating this wal, we can delete it.
+    return true;
+  }
+
+  private boolean filterForDeadRegionServer(ServerName serverName, FileStatus file) {
+    Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets =
+      replicationOffsets.get(serverName);
+    if (peerId2Offsets == null) {
+      // no replication queue for this dead rs, we can delete all wal files for it
+      return true;
+    }
+    for (String peerId : peerIds) {
+      Map<String, ReplicationGroupOffset> offsets = peerId2Offsets.get(peerId);
+      if (offsets == null) {
+        // for dead server, we only care about existing replication queues, as we will delete a
+        // queue after we finish replicating it.
+        continue;
+      }
+      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName());
+      ReplicationGroupOffset offset = offsets.get(walGroupId);
+      // if a replication queue still need to replicate this wal, we can not delete it
+      if (!shouldDelete(offset, file)) {
+        return false;
+      }
+    }
+    // if all replication queues have already finished replicating this wal, we can delete it.
+    return true;
   }
 
   @Override
@@ -78,10 +183,12 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
     if (this.getConf() == null) {
       return files;
     }
-
-    if (wals == null) {
+    if (!canFilter) {
+      // We can not delete anything if there are AddPeerProcedure running at the same time
+      // See HBASE-27214 for more details.
       return Collections.emptyList();
     }
+
     return Iterables.filter(files, new Predicate<FileStatus>() {
       @Override
       public boolean apply(FileStatus file) {
@@ -90,65 +197,56 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
         if (file == null) {
           return false;
         }
-        String wal = file.getPath().getName();
-        boolean logInReplicationQueue = wals.contains(wal);
-        if (logInReplicationQueue) {
-          LOG.debug("Found up in ZooKeeper, NOT deleting={}", wal);
+        if (peerIds.isEmpty()) {
+          // no peer, can always delete
+          return true;
+        }
+        // not a valid wal file name, delete
+        if (!AbstractFSWALProvider.validateWALFilename(file.getPath().getName())) {
+          return true;
+        }
+        // meta wal is always deletable as we will never replicate it
+        if (AbstractFSWALProvider.isMetaFile(file.getPath())) {
+          return true;
+        }
+        ServerName serverName =
+          AbstractFSWALProvider.parseServerNameFromWALName(file.getPath().getName());
+        if (notFullyDeadServers.contains(serverName)) {
+          return filterForLiveRegionServer(serverName, file);
+        } else {
+          return filterForDeadRegionServer(serverName, file);
         }
-        return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp);
       }
     });
   }
 
+  private Set<ServerName> getNotFullyDeadServers(MasterServices services) {
+    List<ServerName> onlineServers = services.getServerManager().getOnlineServersList();
+    return Stream.concat(onlineServers.stream(),
+      services.getMasterProcedureExecutor().getProcedures().stream()
+        .filter(p -> p instanceof ServerCrashProcedure).filter(p -> !p.isFinished())
+        .map(p -> ((ServerCrashProcedure) p).getServerName()))
+      .collect(Collectors.toSet());
+  }
+
   @Override
   public void init(Map<String, Object> params) {
     super.init(params);
-    try {
-      if (MapUtils.isNotEmpty(params)) {
-        Object master = params.get(HMaster.MASTER);
-        if (master != null && master instanceof HMaster) {
-          zkw = ((HMaster) master).getZooKeeper();
-          shareZK = true;
-        }
-      }
-      if (zkw == null) {
-        zkw = new ZKWatcher(getConf(), "replicationLogCleaner", null);
+    if (MapUtils.isNotEmpty(params)) {
+      Object master = params.get(HMaster.MASTER);
+      if (master != null && master instanceof MasterServices) {
+        MasterServices m = (MasterServices) master;
+        rpm = m.getReplicationPeerManager();
+        getNotFullyDeadServers = () -> getNotFullyDeadServers(m);
+        return;
       }
-      // TODO: revisit the implementation
-      // this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
-    } catch (IOException e) {
-      LOG.error("Error while configuring " + this.getClass().getName(), e);
     }
-  }
-
-  @InterfaceAudience.Private
-  public void setConf(Configuration conf, ZKWatcher zk) {
-    super.setConf(conf);
-    try {
-      this.zkw = zk;
-      // TODO: revisit the implementation
-      // this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf);
-    } catch (Exception e) {
-      LOG.error("Error while configuring " + this.getClass().getName(), e);
-    }
-  }
-
-  @InterfaceAudience.Private
-  public void setConf(Configuration conf, ZKWatcher zk,
-    ReplicationQueueStorage replicationQueueStorage) {
-    super.setConf(conf);
-    this.zkw = zk;
-    this.queueStorage = replicationQueueStorage;
+    throw new IllegalArgumentException("Missing " + HMaster.MASTER + " parameter");
   }
 
   @Override
   public void stop(String why) {
-    if (this.stopped) return;
     this.stopped = true;
-    if (!shareZK && this.zkw != null) {
-      LOG.info("Stopping " + this.zkw);
-      this.zkw.close();
-    }
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleanerBarrier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleanerBarrier.java
new file mode 100644
index 00000000000..d8756518728
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleanerBarrier.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A barrier to guard the execution of {@link ReplicationLogCleaner}.
+ * <p/>
+ * The reason why we introduce this class is because there could be race between
+ * {@link org.apache.hadoop.hbase.master.replication.AddPeerProcedure} and
+ * {@link ReplicationLogCleaner}. See HBASE-27214 for more details.
+ */
+@InterfaceAudience.Private
+public class ReplicationLogCleanerBarrier {
+
+  private enum State {
+    // the cleaner is not running
+    NOT_RUNNING,
+    // the cleaner is running
+    RUNNING,
+    // the cleaner is disabled
+    DISABLED
+  }
+
+  private State state = State.NOT_RUNNING;
+
+  // we could have multiple AddPeerProcedure running at the same time, so here we need to do
+  // reference counting.
+  private int numberDisabled = 0;
+
+  public synchronized boolean start() {
+    if (state == State.NOT_RUNNING) {
+      state = State.RUNNING;
+      return true;
+    }
+    if (state == State.DISABLED) {
+      return false;
+    }
+    throw new IllegalStateException("Unexpected state " + state);
+  }
+
+  public synchronized void stop() {
+    if (state != State.RUNNING) {
+      throw new IllegalStateException("Unexpected state " + state);
+    }
+    state = State.NOT_RUNNING;
+  }
+
+  public synchronized boolean disable() {
+    if (state == State.RUNNING) {
+      return false;
+    }
+    if (state == State.NOT_RUNNING) {
+      state = State.DISABLED;
+    }
+    numberDisabled++;
+    return true;
+  }
+
+  public synchronized void enable() {
+    if (state != State.DISABLED) {
+      throw new IllegalStateException("Unexpected state " + state);
+    }
+    numberDisabled--;
+    if (numberDisabled == 0) {
+      state = State.NOT_RUNNING;
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 5d77600a187..b521766ae3d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
@@ -809,22 +810,7 @@ public class ReplicationSourceManager {
     if (AbstractFSWALProvider.isMetaFile(wal)) {
       return false;
     }
-    // if no offset or the offset is just a place marker, replicate
-    if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
-      return true;
-    }
-    // otherwise, compare the timestamp
-    long walTs = AbstractFSWALProvider.getTimestamp(wal);
-    long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
-    if (walTs < startWalTs) {
-      return false;
-    } else if (walTs > startWalTs) {
-      return true;
-    }
-    // if the timestamp equals, usually it means we should include this wal but there is a special
-    // case, a negative offset means the wal has already been fully replicated, so here we should
-    // check the offset.
-    return offset.getOffset() >= 0;
+    return ReplicationOffsetUtil.shouldReplicate(offset, wal);
   }
 
   void claimQueue(ReplicationQueueId queueId) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index 50ffd6df1af..b63ad473719 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -117,7 +117,10 @@ public class ReplicationSyncUp extends Configured implements Tool {
       System.out.println("Start Replication Server start");
       Replication replication = new Replication();
       replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
-        new WALFactory(conf, "test", null, false));
+        new WALFactory(conf,
+          ServerName
+            .valueOf(getClass().getSimpleName() + ",16010," + EnvironmentEdgeManager.currentTime()),
+          null, false));
       ReplicationSourceManager manager = replication.getReplicationManager();
       manager.init();
       claimReplicationQueues(zkw, manager);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index db39a8ba023..48086694999 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.wal;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -38,6 +41,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
@@ -582,4 +586,29 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
   public static String getWALPrefixFromWALName(String name) {
     return getWALNameGroupFromWALName(name, 1);
   }
+
+  private static final Pattern SERVER_NAME_PATTERN = Pattern.compile("^[^"
+    + ServerName.SERVERNAME_SEPARATOR + "]+" + ServerName.SERVERNAME_SEPARATOR
+    + Addressing.VALID_PORT_REGEX + ServerName.SERVERNAME_SEPARATOR + Addressing.VALID_PORT_REGEX);
+
+  /**
+   * Parse the server name from wal prefix. A wal's name is always started with a server name in non
+   * test code.
+   * @throws IllegalArgumentException if the name passed in is not started with a server name
+   * @return the server name
+   */
+  public static ServerName parseServerNameFromWALName(String name) {
+    String decoded;
+    try {
+      decoded = URLDecoder.decode(name, StandardCharsets.UTF_8.name());
+    } catch (UnsupportedEncodingException e) {
+      throw new AssertionError("should never happen", e);
+    }
+    Matcher matcher = SERVER_NAME_PATTERN.matcher(decoded);
+    if (matcher.find()) {
+      return ServerName.valueOf(matcher.group());
+    } else {
+      throw new IllegalArgumentException(name + " is not started with a server name");
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 92d96c5e210..bc0a9eec73a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
@@ -191,17 +192,35 @@ public class WALFactory {
   }
 
   /**
-   * @param conf      must not be null, will keep a reference to read params in later reader/writer
-   *                  instances.
-   * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
-   *                  to make a directory
+   * Create a WALFactory.
    */
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*|.*/HBaseTestingUtility.java")
   public WALFactory(Configuration conf, String factoryId) throws IOException {
     // default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider
     // for HMaster or HRegionServer which take system table only. See HBASE-19999
     this(conf, factoryId, null, true);
   }
 
+  /**
+   * Create a WALFactory.
+   * <p/>
+   * This is the constructor you should use when creating a WALFactory in normal code, to make sure
+   * that the {@code factoryId} is the server name. We need this assumption in some places for
+   * parsing the server name out from the wal file name.
+   * @param conf                             must not be null, will keep a reference to read params
+   *                                         in later reader/writer instances.
+   * @param serverName                       use to generate the factoryId, which will be append at
+   *                                         the first of the final file name
+   * @param abortable                        the server associated with this WAL file
+   * @param enableSyncReplicationWALProvider whether wrap the wal provider to a
+   *                                         {@link SyncReplicationWALProvider} n
+   */
+  public WALFactory(Configuration conf, ServerName serverName, Abortable abortable,
+    boolean enableSyncReplicationWALProvider) throws IOException {
+    this(conf, serverName.toString(), abortable, enableSyncReplicationWALProvider);
+  }
+
   /**
    * @param conf                             must not be null, will keep a reference to read params
    *                                         in later reader/writer instances.
@@ -211,7 +230,7 @@ public class WALFactory {
    * @param enableSyncReplicationWALProvider whether wrap the wal provider to a
    *                                         {@link SyncReplicationWALProvider}
    */
-  public WALFactory(Configuration conf, String factoryId, Abortable abortable,
+  private WALFactory(Configuration conf, String factoryId, Abortable abortable,
     boolean enableSyncReplicationWALProvider) throws IOException {
     // until we've moved reader/writer construction down into providers, this initialization must
     // happen prior to provider initialization, in case they need to instantiate a reader/writer.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 1a0537bcbaf..d7ba6c227c6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -18,57 +18,60 @@
 package org.apache.hadoop.hbase.master.cleaner;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
+import java.util.Collections;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNameTestRule;
 import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
-import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
+import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.MockServer;
-import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-// revisit later after we implement new replication log cleaner
-@Ignore
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
 @Category({ MasterTests.class, MediumTests.class })
 public class TestLogsCleaner {
 
@@ -88,22 +91,29 @@ public class TestLogsCleaner {
 
   private static DirScanPool POOL;
 
+  private static String peerId = "1";
+
+  private MasterServices masterServices;
+
+  private ReplicationQueueStorage queueStorage;
+
+  @Rule
+  public final TableNameTestRule tableNameRule = new TableNameTestRule();
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL.startMiniZKCluster();
-    TEST_UTIL.startMiniDFSCluster(1);
+    TEST_UTIL.startMiniCluster();
     POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration());
   }
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniZKCluster();
-    TEST_UTIL.shutdownMiniDFSCluster();
+    TEST_UTIL.shutdownMiniCluster();
     POOL.shutdownNow();
   }
 
   @Before
-  public void beforeTest() throws IOException {
+  public void beforeTest() throws Exception {
     conf = TEST_UTIL.getConfiguration();
 
     FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
@@ -112,14 +122,51 @@ public class TestLogsCleaner {
 
     // root directory
     fs.mkdirs(OLD_WALS_DIR);
+
+    TableName tableName = tableNameRule.getTableName();
+    TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
+    TEST_UTIL.getAdmin().createTable(td);
+    TEST_UTIL.waitTableAvailable(tableName);
+    queueStorage =
+      ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(), tableName);
+
+    masterServices = mock(MasterServices.class);
+    when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
+    ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
+    when(masterServices.getReplicationPeerManager()).thenReturn(rpm);
+    when(rpm.getQueueStorage()).thenReturn(queueStorage);
+    when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
+    when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
+    ServerManager sm = mock(ServerManager.class);
+    when(masterServices.getServerManager()).thenReturn(sm);
+    when(sm.getOnlineServersList()).thenReturn(Collections.emptyList());
+    @SuppressWarnings("unchecked")
+    ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
+    when(masterServices.getMasterProcedureExecutor()).thenReturn(procExec);
+    when(procExec.getProcedures()).thenReturn(Collections.emptyList());
   }
 
   /**
    * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same
-   * oldWALs directory. Created files: - 2 invalid files - 5 old Procedure WALs - 30 old WALs from
-   * which 3 are in replication - 5 recent Procedure WALs - 1 recent WAL - 1 very new WAL (timestamp
-   * in future) - masterProcedureWALs subdirectory Files which should stay: - 3 replication WALs - 2
-   * new WALs - 5 latest Procedure WALs - masterProcedureWALs subdirectory
+   * oldWALs directory.
+   * <p/>
+   * Created files:
+   * <ul>
+   * <li>2 invalid files</li>
+   * <li>5 old Procedure WALs</li>
+   * <li>30 old WALs from which 3 are in replication</li>
+   * <li>5 recent Procedure WALs</li>
+   * <li>1 recent WAL</li>
+   * <li>1 very new WAL (timestamp in future)</li>
+   * <li>masterProcedureWALs subdirectory</li>
+   * </ul>
+   * Files which should stay:
+   * <ul>
+   * <li>3 replication WALs</li>
+   * <li>2 new WALs</li>
+   * <li>5 latest Procedure WALs</li>
+   * <li>masterProcedureWALs subdirectory</li>
+   * </ul>
    */
   @Test
   public void testLogCleaning() throws Exception {
@@ -131,9 +178,6 @@ public class TestLogsCleaner {
 
     HMaster.decorateMasterConfiguration(conf);
     Server server = new DummyServer();
-    ReplicationQueueStorage queueStorage = ReplicationStorageFactory
-      .getReplicationQueueStorage(ConnectionFactory.createConnection(conf), conf);
-
     String fakeMachineName =
       URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name());
 
@@ -159,14 +203,12 @@ public class TestLogsCleaner {
     for (int i = 1; i <= 30; i++) {
       Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i));
       fs.createNewFile(fileName);
-      // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
-      // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
-      if (i % (30 / 3) == 0) {
-        // queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName());
-        LOG.info("Replication log file: " + fileName);
-      }
     }
-
+    // Case 4: the newest 3 WALs will be kept because they are beyond the replication offset
+    masterServices.getReplicationPeerManager().listPeers(null)
+      .add(new ReplicationPeerDescription(peerId, true, null, null));
+    queueStorage.setOffset(new ReplicationQueueId(server.getServerName(), peerId), fakeMachineName,
+      new ReplicationGroupOffset(fakeMachineName + "." + (now - 3), 0), Collections.emptyMap());
     // Case 5: 5 Procedure WALs that are new, will stay
     for (int i = 6; i <= 10; i++) {
       Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
@@ -189,7 +231,8 @@ public class TestLogsCleaner {
     // 10 procedure WALs
     assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
 
-    LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null);
+    LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL,
+      ImmutableMap.of(HMaster.MASTER, masterServices));
     cleaner.chore();
 
     // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
@@ -208,98 +251,14 @@ public class TestLogsCleaner {
     }
   }
 
-  @Test
-  public void testZooKeeperRecoveryDuringGetListOfReplicators() throws Exception {
-    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
-
-    List<FileStatus> dummyFiles = Arrays.asList(
-      new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log1")),
-      new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log2")));
-
-    FaultyZooKeeperWatcher faultyZK =
-      new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
-    final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false);
-
-    try {
-      faultyZK.init(false);
-      ReplicationQueueStorage queueStorage = spy(ReplicationStorageFactory
-        .getReplicationQueueStorage(ConnectionFactory.createConnection(conf), conf));
-      // doAnswer(new Answer<Object>() {
-      // @Override
-      // public Object answer(InvocationOnMock invocation) throws Throwable {
-      // try {
-      // return invocation.callRealMethod();
-      // } catch (ReplicationException e) {
-      // LOG.debug("Caught Exception", e);
-      // getListOfReplicatorsFailed.set(true);
-      // throw e;
-      // }
-      // }
-      // }).when(queueStorage).getAllWALs();
-
-      cleaner.setConf(conf, faultyZK, queueStorage);
-      // should keep all files due to a ConnectionLossException getting the queues znodes
-      cleaner.preClean();
-      Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
-
-      assertTrue(getListOfReplicatorsFailed.get());
-      assertFalse(toDelete.iterator().hasNext());
-      assertFalse(cleaner.isStopped());
-
-      // zk recovery.
-      faultyZK.init(true);
-      cleaner.preClean();
-      Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
-      Iterator<FileStatus> iter = filesToDelete.iterator();
-      assertTrue(iter.hasNext());
-      assertEquals(new Path("log1"), iter.next().getPath());
-      assertTrue(iter.hasNext());
-      assertEquals(new Path("log2"), iter.next().getPath());
-      assertFalse(iter.hasNext());
-
-    } finally {
-      faultyZK.close();
-    }
-  }
-
-  /**
-   * When zk is working both files should be returned
-   * @throws Exception from ZK watcher
-   */
-  @Test
-  public void testZooKeeperNormal() throws Exception {
-    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
-
-    // Subtract 1000 from current time so modtime is for sure older
-    // than 'now'.
-    long modTime = EnvironmentEdgeManager.currentTime() - 1000;
-    List<FileStatus> dummyFiles =
-      Arrays.asList(new FileStatus(100, false, 3, 100, modTime, new Path("log1")),
-        new FileStatus(100, false, 3, 100, modTime, new Path("log2")));
-
-    ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null);
-    try {
-      cleaner.setConf(conf, zkw);
-      cleaner.preClean();
-      Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
-      Iterator<FileStatus> iter = filesToDelete.iterator();
-      assertTrue(iter.hasNext());
-      assertEquals(new Path("log1"), iter.next().getPath());
-      assertTrue(iter.hasNext());
-      assertEquals(new Path("log2"), iter.next().getPath());
-      assertFalse(iter.hasNext());
-    } finally {
-      zkw.close();
-    }
-  }
-
   @Test
   public void testOnConfigurationChange() throws Exception {
     // Prepare environments
     Server server = new DummyServer();
 
     FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
-    LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, null);
+    LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL,
+      ImmutableMap.of(HMaster.MASTER, masterServices));
     int size = cleaner.getSizeOfCleaners();
     assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
       cleaner.getCleanerThreadTimeoutMsec());
@@ -338,7 +297,7 @@ public class TestLogsCleaner {
     }
   }
 
-  static class DummyServer extends MockServer {
+  private static final class DummyServer extends MockServer {
 
     @Override
     public Configuration getConfiguration() {
@@ -355,26 +314,4 @@ public class TestLogsCleaner {
       return null;
     }
   }
-
-  static class FaultyZooKeeperWatcher extends ZKWatcher {
-    private RecoverableZooKeeper zk;
-
-    public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
-      throws ZooKeeperConnectionException, IOException {
-      super(conf, identifier, abortable);
-    }
-
-    public void init(boolean autoRecovery) throws Exception {
-      this.zk = spy(super.getRecoverableZooKeeper());
-      if (!autoRecovery) {
-        doThrow(new KeeperException.ConnectionLossException()).when(zk)
-          .getChildren("/hbase/replication/rs", null);
-      }
-    }
-
-    @Override
-    public RecoverableZooKeeper getRecoverableZooKeeper() {
-      return zk;
-    }
-  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index 2409b081cce..5aef1eaf1c6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -26,6 +26,7 @@ import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,7 +35,9 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
@@ -48,19 +51,19 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.MockServer;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-// TODO: revisit later
-@Ignore
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
 @Category({ MasterTests.class, SmallTests.class })
 public class TestReplicationHFileCleaner {
 
@@ -71,19 +74,25 @@ public class TestReplicationHFileCleaner {
   private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHFileCleaner.class);
   private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
   private static Server server;
+  private static final TableName tableName = TableName.valueOf("test_cleaner");
   private static ReplicationQueueStorage rq;
   private static ReplicationPeers rp;
   private static final String peerId = "TestReplicationHFileCleaner";
   private static Configuration conf = TEST_UTIL.getConfiguration();
-  static FileSystem fs = null;
-  Path root;
+  private static FileSystem fs = null;
+  private static Map<String, Object> params;
+  private Path root;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.startMiniCluster();
     server = new DummyServer();
+    params = ImmutableMap.of(HMaster.MASTER, server);
     conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
     HMaster.decorateMasterConfiguration(conf);
+    TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
+    TEST_UTIL.getAdmin().createTable(td);
+    conf.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());
     rp =
       ReplicationFactory.getReplicationPeers(server.getFileSystem(), server.getZooKeeper(), conf);
     rp.init();
@@ -93,7 +102,7 @@ public class TestReplicationHFileCleaner {
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniZKCluster();
+    TEST_UTIL.shutdownMiniCluster();
   }
 
   @Before
@@ -116,6 +125,13 @@ public class TestReplicationHFileCleaner {
     rp.getPeerStorage().removePeer(peerId);
   }
 
+  private ReplicationHFileCleaner createCleaner() {
+    ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
+    cleaner.setConf(conf);
+    cleaner.init(params);
+    return cleaner;
+  }
+
   @Test
   public void testIsFileDeletable() throws IOException, ReplicationException {
     // 1. Create a file
@@ -123,8 +139,7 @@ public class TestReplicationHFileCleaner {
     fs.createNewFile(file);
     // 2. Assert file is successfully created
     assertTrue("Test file not created!", fs.exists(file));
-    ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
-    cleaner.setConf(conf);
+    ReplicationHFileCleaner cleaner = createCleaner();
     // 3. Assert that file as is should be deletable
     assertTrue("Cleaner should allow to delete this file as there is no hfile reference node "
       + "for it in the queue.", cleaner.isFileDeletable(fs.getFileStatus(file)));
@@ -161,8 +176,7 @@ public class TestReplicationHFileCleaner {
     // 2. Add one file to hfile-refs queue
     rq.addHFileRefs(peerId, hfiles);
 
-    ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
-    cleaner.setConf(conf);
+    ReplicationHFileCleaner cleaner = createCleaner();
     Iterator<FileStatus> deletableFilesIterator = cleaner.getDeletableFiles(files).iterator();
     int i = 0;
     while (deletableFilesIterator.hasNext() && i < 2) {
@@ -183,6 +197,15 @@ public class TestReplicationHFileCleaner {
       return TEST_UTIL.getConfiguration();
     }
 
+    @Override
+    public ZKWatcher getZooKeeper() {
+      try {
+        return TEST_UTIL.getZooKeeperWatcher();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+
     @Override
     public Connection getConnection() {
       try {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationOffsetUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationOffsetUtil.java
new file mode 100644
index 00000000000..f54a4958374
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationOffsetUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.apache.hadoop.hbase.replication.ReplicationOffsetUtil.shouldReplicate;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, SmallTests.class })
+public class TestReplicationOffsetUtil {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationOffsetUtil.class);
+
+  @Test
+  public void test() {
+    assertTrue(shouldReplicate(null, "whatever"));
+    assertTrue(shouldReplicate(ReplicationGroupOffset.BEGIN, "whatever"));
+    ServerName sn = ServerName.valueOf("host", 16010, EnvironmentEdgeManager.currentTime());
+    ReplicationGroupOffset offset = new ReplicationGroupOffset(sn + ".12345", 100);
+    assertTrue(shouldReplicate(offset, sn + ".12346"));
+    assertFalse(shouldReplicate(offset, sn + ".12344"));
+    assertTrue(shouldReplicate(offset, sn + ".12345"));
+    // -1 means finish replication, so should not replicate
+    assertFalse(shouldReplicate(new ReplicationGroupOffset(sn + ".12345", -1), sn + ".12345"));
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestLogCleanerBarrier.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestLogCleanerBarrier.java
new file mode 100644
index 00000000000..06cb85523d3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestLogCleanerBarrier.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestLogCleanerBarrier {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestLogCleanerBarrier.class);
+
+  @Test
+  public void test() {
+    ReplicationLogCleanerBarrier barrier = new ReplicationLogCleanerBarrier();
+    assertThrows(IllegalStateException.class, () -> barrier.stop());
+    assertThrows(IllegalStateException.class, () -> barrier.enable());
+    assertTrue(barrier.start());
+    assertThrows(IllegalStateException.class, () -> barrier.start());
+    assertThrows(IllegalStateException.class, () -> barrier.enable());
+    assertFalse(barrier.disable());
+    assertThrows(IllegalStateException.class, () -> barrier.enable());
+    barrier.stop();
+
+    for (int i = 0; i < 3; i++) {
+      assertTrue(barrier.disable());
+      assertFalse(barrier.start());
+    }
+    for (int i = 0; i < 3; i++) {
+      assertFalse(barrier.start());
+      barrier.enable();
+    }
+    assertTrue(barrier.start());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java
new file mode 100644
index 00000000000..7a227fb0603
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestReplicationLogCleaner {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationLogCleaner.class);
+
+  private static final Configuration CONF = HBaseConfiguration.create();
+
+  private MasterServices services;
+
+  private ReplicationLogCleaner cleaner;
+
+  @Before
+  public void setUp() throws ReplicationException {
+    services = mock(MasterServices.class);
+    ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
+    when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
+    when(services.getReplicationPeerManager()).thenReturn(rpm);
+    when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
+    ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class);
+    when(rpm.getQueueStorage()).thenReturn(rqs);
+    when(rqs.listAllQueues()).thenReturn(new ArrayList<>());
+    ServerManager sm = mock(ServerManager.class);
+    when(services.getServerManager()).thenReturn(sm);
+    when(sm.getOnlineServersList()).thenReturn(new ArrayList<>());
+    @SuppressWarnings("unchecked")
+    ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
+    when(services.getMasterProcedureExecutor()).thenReturn(procExec);
+    when(procExec.getProcedures()).thenReturn(new ArrayList<>());
+
+    cleaner = new ReplicationLogCleaner();
+    cleaner.setConf(CONF);
+    Map<String, Object> params = ImmutableMap.of(HMaster.MASTER, services);
+    cleaner.init(params);
+  }
+
+  @After
+  public void tearDown() {
+    cleaner.postClean();
+  }
+
+  private static Iterable<FileStatus> runCleaner(ReplicationLogCleaner cleaner,
+    Iterable<FileStatus> files) {
+    cleaner.preClean();
+    return cleaner.getDeletableFiles(files);
+  }
+
+  private static FileStatus createFileStatus(Path path) {
+    return new FileStatus(100, false, 3, 256, EnvironmentEdgeManager.currentTime(), path);
+  }
+
+  private static FileStatus createFileStatus(ServerName sn, int number) {
+    Path path = new Path(sn.toString() + "." + number);
+    return createFileStatus(path);
+  }
+
+  private static ReplicationPeerDescription createPeer(String peerId) {
+    return new ReplicationPeerDescription(peerId, true, null, null);
+  }
+
+  private void addServer(ServerName serverName) {
+    services.getServerManager().getOnlineServersList().add(serverName);
+  }
+
+  private void addSCP(ServerName serverName, boolean finished) {
+    ServerCrashProcedure scp = mock(ServerCrashProcedure.class);
+    when(scp.getServerName()).thenReturn(serverName);
+    when(scp.isFinished()).thenReturn(finished);
+    services.getMasterProcedureExecutor().getProcedures().add(scp);
+  }
+
+  private void addPeer(String... peerIds) {
+    services.getReplicationPeerManager().listPeers(null).addAll(
+      Stream.of(peerIds).map(TestReplicationLogCleaner::createPeer).collect(Collectors.toList()));
+  }
+
+  private void addQueueData(ReplicationQueueData... datas) throws ReplicationException {
+    services.getReplicationPeerManager().getQueueStorage().listAllQueues()
+      .addAll(Arrays.asList(datas));
+  }
+
+  @Test
+  public void testNoConf() {
+    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
+    List<FileStatus> files = Arrays.asList(new FileStatus());
+    assertSame(files, runCleaner(cleaner, files));
+    cleaner.postClean();
+  }
+
+  @Test
+  public void testCanNotFilter() {
+    assertTrue(services.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable());
+    List<FileStatus> files = Arrays.asList(new FileStatus());
+    assertSame(Collections.emptyList(), runCleaner(cleaner, files));
+  }
+
+  @Test
+  public void testNoPeer() {
+    Path path = new Path("/wal." + EnvironmentEdgeManager.currentTime());
+    assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName()));
+    FileStatus file = createFileStatus(path);
+    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+    assertSame(file, iter.next());
+    assertFalse(iter.hasNext());
+  }
+
+  @Test
+  public void testNotValidWalFile() {
+    addPeer("1");
+    Path path = new Path("/whatever");
+    assertFalse(AbstractFSWALProvider.validateWALFilename(path.getName()));
+    FileStatus file = createFileStatus(path);
+    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+    assertSame(file, iter.next());
+    assertFalse(iter.hasNext());
+  }
+
+  @Test
+  public void testMetaWalFile() {
+    addPeer("1");
+    Path path = new Path(
+      "/wal." + EnvironmentEdgeManager.currentTime() + AbstractFSWALProvider.META_WAL_PROVIDER_ID);
+    assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName()));
+    assertTrue(AbstractFSWALProvider.isMetaFile(path));
+    FileStatus file = createFileStatus(path);
+    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+    assertSame(file, iter.next());
+    assertFalse(iter.hasNext());
+  }
+
+  @Test
+  public void testLiveRegionServerNoQueues() {
+    addPeer("1");
+    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+    addServer(sn);
+    List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1));
+    assertThat(runCleaner(cleaner, files), emptyIterable());
+  }
+
+  @Test
+  public void testLiveRegionServerWithSCPNoQueues() {
+    addPeer("1");
+    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+    addSCP(sn, false);
+    List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1));
+    assertThat(runCleaner(cleaner, files), emptyIterable());
+  }
+
+  @Test
+  public void testDeadRegionServerNoQueues() {
+    addPeer("1");
+    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+    FileStatus file = createFileStatus(sn, 1);
+    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+    assertSame(file, iter.next());
+    assertFalse(iter.hasNext());
+  }
+
+  @Test
+  public void testDeadRegionServerWithSCPNoQueues() {
+    addPeer("1");
+    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+    addSCP(sn, true);
+    FileStatus file = createFileStatus(sn, 1);
+    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+    assertSame(file, iter.next());
+    assertFalse(iter.hasNext());
+  }
+
+  @Test
+  public void testLiveRegionServerMissingQueue() throws ReplicationException {
+    String peerId1 = "1";
+    String peerId2 = "2";
+    addPeer(peerId1, peerId2);
+    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+    addServer(sn);
+    FileStatus file = createFileStatus(sn, 1);
+    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
+      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+    addQueueData(data1);
+    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
+  }
+
+  @Test
+  public void testLiveRegionServerShouldNotDelete() throws ReplicationException {
+    String peerId = "1";
+    addPeer(peerId);
+    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+    addServer(sn);
+    FileStatus file = createFileStatus(sn, 1);
+    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
+      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
+    addQueueData(data);
+    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
+  }
+
+  @Test
+  public void testLiveRegionServerShouldNotDeleteTwoPeers() throws ReplicationException {
+    String peerId1 = "1";
+    String peerId2 = "2";
+    addPeer(peerId1, peerId2);
+    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+    addServer(sn);
+    FileStatus file = createFileStatus(sn, 1);
+    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
+      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
+      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
+    addQueueData(data1, data2);
+    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
+  }
+
+  @Test
+  public void testLiveRegionServerShouldDelete() throws ReplicationException {
+    String peerId = "1";
+    addPeer(peerId);
+    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+    addServer(sn);
+    FileStatus file = createFileStatus(sn, 1);
+    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
+      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+    services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data);
+    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+    assertSame(file, iter.next());
+    assertFalse(iter.hasNext());
+  }
+
+  @Test
+  public void testLiveRegionServerShouldDeleteTwoPeers() throws ReplicationException {
+    String peerId1 = "1";
+    String peerId2 = "2";
+    addPeer(peerId1, peerId2);
+    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+    addServer(sn);
+    FileStatus file = createFileStatus(sn, 1);
+    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
+      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
+      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+    addQueueData(data1, data2);
+    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+    assertSame(file, iter.next());
+    assertFalse(iter.hasNext());
+  }
+
+  @Test
+  public void testDeadRegionServerMissingQueue() throws ReplicationException {
+    String peerId1 = "1";
+    String peerId2 = "2";
+    addPeer(peerId1, peerId2);
+    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+    FileStatus file = createFileStatus(sn, 1);
+    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
+      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+    addQueueData(data1);
+    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+    assertSame(file, iter.next());
+    assertFalse(iter.hasNext());
+  }
+
+  @Test
+  public void testDeadRegionServerShouldNotDelete() throws ReplicationException {
+    String peerId = "1";
+    addPeer(peerId);
+    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+    FileStatus file = createFileStatus(sn, 1);
+    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
+      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
+    addQueueData(data);
+    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
+  }
+
+  @Test
+  public void testDeadRegionServerShouldNotDeleteTwoPeers() throws ReplicationException {
+    String peerId1 = "1";
+    String peerId2 = "2";
+    addPeer(peerId1, peerId2);
+    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+    FileStatus file = createFileStatus(sn, 1);
+    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
+      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
+      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
+    addQueueData(data1, data2);
+    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
+  }
+
+  @Test
+  public void testDeadRegionServerShouldDelete() throws ReplicationException {
+    String peerId = "1";
+    addPeer(peerId);
+    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+    FileStatus file = createFileStatus(sn, 1);
+    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
+      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+    services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data);
+    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+    assertSame(file, iter.next());
+    assertFalse(iter.hasNext());
+  }
+
+  @Test
+  public void testDeadRegionServerShouldDeleteTwoPeers() throws ReplicationException {
+    String peerId1 = "1";
+    String peerId2 = "2";
+    addPeer(peerId1, peerId2);
+    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+    FileStatus file = createFileStatus(sn, 1);
+    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
+      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
+      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+    addQueueData(data1, data2);
+    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+    assertSame(file, iter.next());
+    assertFalse(iter.hasNext());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 6aba327d791..b7564ed9168 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -190,7 +190,7 @@ public class TestReplicationSourceManager {
 
     replication = new Replication();
     replication.initialize(server, FS, logDir, oldLogDir,
-      new WALFactory(CONF, "test", null, false));
+      new WALFactory(CONF, server.getServerName(), null, false));
     manager = replication.getReplicationManager();
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index b59ebc0d9a6..26c1152c05a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -630,7 +630,7 @@ public class TestWALFactory {
     assertEquals(wrappedWALProvider.getClass(), walFactory.getMetaProvider().getClass());
 
     // if providers are not set and do not enable SyncReplicationWALProvider
-    walFactory = new WALFactory(conf, this.currentServername.toString(), null, false);
+    walFactory = new WALFactory(conf, this.currentServername, null, false);
     assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass());
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
index 8273b3d6041..6a1e98d9fd5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -183,4 +184,17 @@ public class TestWALMethods {
     return entry;
   }
 
+  @Test
+  public void testParseServerNameFromWALName() {
+    assertEquals(ServerName.valueOf("abc,123,123"),
+      AbstractFSWALProvider.parseServerNameFromWALName("abc,123,123.1.12345.meta"));
+    assertEquals(ServerName.valueOf("abc,123,123"),
+      AbstractFSWALProvider.parseServerNameFromWALName("abc,123,123.12345"));
+    assertEquals(ServerName.valueOf("abc,123,123"),
+      AbstractFSWALProvider.parseServerNameFromWALName("abc,123,123"));
+    assertThrows(IllegalArgumentException.class,
+      () -> AbstractFSWALProvider.parseServerNameFromWALName("test,abc,123,123.12345"));
+    assertThrows(IllegalArgumentException.class,
+      () -> AbstractFSWALProvider.parseServerNameFromWALName("abc"));
+  }
 }