You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2022/11/08 21:05:48 UTC

[hbase] branch branch-2 updated: HBASE-26913 Replication Observability Framework (#4862)

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new ecf3debd425 HBASE-26913 Replication Observability Framework (#4862)
ecf3debd425 is described below

commit ecf3debd425dce17e1af50a7eaa4cf7783d0f2ac
Author: Rushabh Shah <sh...@gmail.com>
AuthorDate: Tue Nov 8 13:05:42 2022 -0800

    HBASE-26913 Replication Observability Framework (#4862)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: Viraj Jasani <vj...@apache.org>
---
 .../hadoop/hbase/slowlog/SlowLogTableAccessor.java |  23 +-
 .../java/org/apache/hadoop/hbase/HConstants.java   |   8 +
 hbase-common/src/main/resources/hbase-default.xml  |   2 +-
 .../namequeues/MetricsWALEventTrackerSource.java   |  65 +++++
 ...p.hbase.namequeues.MetricsWALEventTrackerSource |  18 ++
 .../MetricsWALEventTrackerSourceImpl.java          |  59 +++++
 hbase-protocol-shaded/src/main/protobuf/WAL.proto  |   9 +
 .../org/apache/hadoop/hbase/master/HMaster.java    |   6 +
 .../WALEventTrackerTableCreator.java               |  87 +++++++
 .../hadoop/hbase/namequeues/LogEventHandler.java   |   8 +-
 .../hadoop/hbase/namequeues/NamedQueuePayload.java |   6 +-
 .../hbase/namequeues/NamedQueueRecorder.java       |   8 +-
 .../hadoop/hbase/namequeues/NamedQueueService.java |   3 +-
 ...leOpsChore.java => NamedQueueServiceChore.java} |  32 ++-
 .../hbase/namequeues/SlowLogPersistentService.java |   7 +-
 .../hbase/namequeues/WALEventTrackerPayload.java   |  73 ++++++
 .../namequeues/WALEventTrackerQueueService.java    | 148 ++++++++++++
 .../namequeues/WALEventTrackerTableAccessor.java   | 140 +++++++++++
 .../impl/BalancerDecisionQueueService.java         |   3 +-
 .../impl/BalancerRejectionQueueService.java        |   3 +-
 .../hbase/namequeues/impl/SlowLogQueueService.java |   5 +-
 .../hadoop/hbase/regionserver/HRegionServer.java   | 100 +++++---
 .../hbase/regionserver/wal/AbstractFSWAL.java      |   2 +-
 .../regionserver/wal/WALEventTrackerListener.java  |  94 ++++++++
 .../hadoop/hbase/regionserver/wal/WALUtil.java     |  12 +
 .../master/ReplicationSinkTrackerTableCreator.java |  99 ++++++++
 .../regionserver/ReplicationMarkerChore.java       | 129 ++++++++++
 .../replication/regionserver/ReplicationSink.java  |  53 +++++
 .../regionserver/ReplicationSource.java            |   5 +
 .../ReplicationSourceWALActionListener.java        |   4 +
 .../regionserver/ReplicationSourceWALReader.java   |  40 ++++
 .../java/org/apache/hadoop/hbase/wal/WALEdit.java  |  40 ++++
 .../org/apache/hadoop/hbase/wal/WALSplitter.java   |   7 +
 .../hbase/namequeues/TestWALEventTracker.java      | 246 +++++++++++++++++++
 .../TestWALEventTrackerTableAccessor.java          |  58 +++++
 .../TestWalEventTrackerQueueService.java           |  86 +++++++
 .../hadoop/hbase/regionserver/wal/TestWALEdit.java |  61 +++++
 .../regionserver/TestReplicationMarker.java        | 265 +++++++++++++++++++++
 .../org/apache/hadoop/hbase/wal/TestWALSplit.java  |  39 +++
 39 files changed, 1971 insertions(+), 82 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java
index 5ea6144d037..e6db8f43017 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java
@@ -21,12 +21,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
@@ -48,8 +46,6 @@ public class SlowLogTableAccessor {
 
   private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableAccessor.class);
 
-  private static Connection connection;
-
   /**
    * hbase:slowlog table name - can be enabled with config -
    * hbase.regionserver.slowlog.systable.enabled
@@ -66,10 +62,10 @@ public class SlowLogTableAccessor {
   /**
    * Add slow/large log records to hbase:slowlog table
    * @param slowLogPayloads List of SlowLogPayload to process
-   * @param configuration   Configuration to use for connection
+   * @param connection      connection
    */
   public static void addSlowLogRecords(final List<TooSlowLog.SlowLogPayload> slowLogPayloads,
-    final Configuration configuration) {
+    Connection connection) {
     List<Put> puts = new ArrayList<>(slowLogPayloads.size());
     for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) {
       final byte[] rowKey = getRowKey(slowLogPayload);
@@ -102,26 +98,12 @@ public class SlowLogTableAccessor {
       puts.add(put);
     }
     try {
-      if (connection == null) {
-        createConnection(configuration);
-      }
       doPut(connection, puts);
     } catch (Exception e) {
       LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", e);
     }
   }
 
-  private static synchronized void createConnection(Configuration configuration)
-    throws IOException {
-    Configuration conf = new Configuration(configuration);
-    // rpc timeout: 20s
-    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 20000);
-    // retry count: 5
-    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
-    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
-    connection = ConnectionFactory.createConnection(conf);
-  }
-
   /**
    * Create rowKey: currentTime APPEND slowLogPayload.hashcode Scan on slowlog table should keep
    * records with sorted order of time, however records added at the very same time could be in
@@ -140,5 +122,4 @@ public class SlowLogTableAccessor {
     final long rowKeyLong = Long.parseLong(timeAndHashcode);
     return Bytes.toBytes(rowKeyLong);
   }
-
 }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 91861d34e92..609da637852 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1643,6 +1643,14 @@ public final class HConstants {
     "hbase.regionserver.slowlog.systable.enabled";
   public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;
 
+  @Deprecated
+  // since <need to know the version number> and will be removed in <version number>
+  // Instead use hbase.regionserver.named.queue.chore.duration config property
+  public static final String SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY =
+    "hbase.slowlog.systable.chore.duration";
+  // Default 10 mins.
+  public static final int DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION = 10 * 60 * 1000;
+
   public static final String SHELL_TIMESTAMP_FORMAT_EPOCH_KEY =
     "hbase.shell.timestamp.format.epoch";
 
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index c0212607d56..25d4ebac53f 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -2004,7 +2004,7 @@ possible configurations would overwhelm and obscure the important.
   </property>
   <property>
     <name>hbase.namedqueue.provider.classes</name>
-    <value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService</value>
+    <value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService,org.apache.hadoop.hbase.namequeues.WALEventTrackerQueueService</value>
     <description>
       Default values for NamedQueueService implementors. This comma separated full class names
       represent all implementors of NamedQueueService that we would like to be invoked by
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSource.java
new file mode 100644
index 00000000000..8bd95aefe8e
--- /dev/null
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface MetricsWALEventTrackerSource extends BaseSource {
+  /**
+   * The name of the metrics
+   */
+  String METRICS_NAME = "WALEventTracker";
+
+  /**
+   * The name of the metrics context that metrics will be under.
+   */
+  String METRICS_CONTEXT = "regionserver";
+
+  /**
+   * Description
+   */
+  String METRICS_DESCRIPTION = "Metrics about HBase RegionServer WALEventTracker";
+
+  /**
+   * The name of the metrics context that metrics will be under in jmx
+   */
+  String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
+
+  String NUM_FAILED_PUTS = "numFailedPuts";
+  String NUM_FAILED_PUTS_DESC = "Number of put requests that failed";
+
+  String NUM_RECORDS_FAILED_PUTS = "numRecordsFailedPuts";
+  String NUM_RECORDS_FAILED_PUTS_DESC = "number of records in failed puts";
+
+  /*
+   * Increment 2 counters, numFailedPuts and numRecordsFailedPuts
+   */
+  void incrFailedPuts(long numRecords);
+
+  /*
+   * Get the failed puts counter.
+   */
+  long getFailedPuts();
+
+  /*
+   * Get the number of records in failed puts.
+   */
+  long getNumRecordsFailedPuts();
+}
diff --git a/hbase-hadoop-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSource b/hbase-hadoop-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSource
new file mode 100644
index 00000000000..5870bf1a9cf
--- /dev/null
+++ b/hbase-hadoop-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSource
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSourceImpl
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSourceImpl.java
new file mode 100644
index 00000000000..0ae5b12c4d6
--- /dev/null
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSourceImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class MetricsWALEventTrackerSourceImpl extends BaseSourceImpl
+  implements MetricsWALEventTrackerSource {
+
+  private final MutableFastCounter numFailedPutsCount;
+  private final MutableFastCounter numRecordsFailedPutsCount;
+
+  public MetricsWALEventTrackerSourceImpl() {
+    this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
+  }
+
+  public MetricsWALEventTrackerSourceImpl(String metricsName, String metricsDescription,
+    String metricsContext, String metricsJmxContext) {
+    super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+    numFailedPutsCount =
+      this.getMetricsRegistry().newCounter(NUM_FAILED_PUTS, NUM_FAILED_PUTS_DESC, 0L);
+    numRecordsFailedPutsCount = this.getMetricsRegistry().newCounter(NUM_RECORDS_FAILED_PUTS,
+      NUM_RECORDS_FAILED_PUTS_DESC, 0L);
+  }
+
+  @Override
+  public void incrFailedPuts(long numRecords) {
+    numFailedPutsCount.incr();
+    numRecordsFailedPutsCount.incr(numRecords);
+  }
+
+  @Override
+  public long getFailedPuts() {
+    return numFailedPutsCount.value();
+  }
+
+  @Override
+  public long getNumRecordsFailedPuts() {
+    return numRecordsFailedPutsCount.value();
+  }
+}
diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
index 48a108bb8a7..ba12dcf3edf 100644
--- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
@@ -182,3 +182,12 @@ message RegionEventDescriptor {
  */
 message WALTrailer {
 }
+
+/**
+ * Special WAL entry for replication marker event.
+ */
+message ReplicationMarkerDescriptor {
+  required string region_server_name = 1;
+  required string wal_name = 2;
+  required uint64 offset = 3;
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index c49c0db0672..5a9d9fc1aef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -172,6 +172,7 @@ import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
 import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
 import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator;
 import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
 import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
 import org.apache.hadoop.hbase.mob.MobFileCleanerChore;
@@ -214,6 +215,7 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
 import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
+import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.SecurityConstants;
@@ -1243,6 +1245,10 @@ public class HMaster extends HRegionServer implements MasterServices {
     final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this);
     slowLogMasterService.init();
 
+    WALEventTrackerTableCreator.createIfNeededAndNotExists(conf, this);
+    // Create REPLICATION.SINK_TRACKER table if needed.
+    ReplicationSinkTrackerTableCreator.createIfNeededAndNotExists(conf, this);
+
     // clear the dead servers with same host name and port of online server because we are not
     // removing dead server with same hostname and port of rs which is trying to check in before
     // master initialization. See HBASE-5916.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/waleventtracker/WALEventTrackerTableCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/waleventtracker/WALEventTrackerTableCreator.java
new file mode 100644
index 00000000000..8ad1b93f77e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/waleventtracker/WALEventTrackerTableCreator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.waleventtracker;
+
+import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME_STR;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * WALEventTracker Table creation to be used by HMaster
+ */
+@InterfaceAudience.Private
+public final class WALEventTrackerTableCreator {
+  private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerTableCreator.class);
+
+  public static final String WAL_EVENT_TRACKER_ENABLED_KEY =
+    "hbase.regionserver.wal.event.tracker.enabled";
+  public static final boolean WAL_EVENT_TRACKER_ENABLED_DEFAULT = false;
+
+  /** The walEventTracker info family as a string */
+  private static final String WAL_EVENT_TRACKER_INFO_FAMILY_STR = "info";
+
+  /** The walEventTracker info family in array of bytes */
+  public static final byte[] WAL_EVENT_TRACKER_INFO_FAMILY =
+    Bytes.toBytes(WAL_EVENT_TRACKER_INFO_FAMILY_STR);
+
+  private static final long TTL = TimeUnit.DAYS.toSeconds(365); // 1 year in seconds
+
+  private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER = TableDescriptorBuilder
+    .newBuilder(WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME).setRegionReplication(1)
+    .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(WAL_EVENT_TRACKER_INFO_FAMILY)
+      .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBlockCacheEnabled(false).setMaxVersions(1)
+      .setTimeToLive((int) TTL).build());
+
+  /* Private default constructor */
+  private WALEventTrackerTableCreator() {
+  }
+
+  /*
+   * We will create this table only if hbase.regionserver.wal.event.tracker.enabled is enabled and
+   * table doesn't exists already.
+   */
+  public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices)
+    throws IOException {
+    boolean walEventTrackerEnabled =
+      conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
+    if (!walEventTrackerEnabled) {
+      LOG.info("wal event tracker requests logging to table " + WAL_EVENT_TRACKER_TABLE_NAME_STR
+        + " is disabled. Quitting.");
+      return;
+    }
+    if (
+      !masterServices.getTableDescriptors()
+        .exists(WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME)
+    ) {
+      LOG.info(WAL_EVENT_TRACKER_TABLE_NAME_STR + " table not found. Creating.");
+      masterServices.createTable(TABLE_DESCRIPTOR_BUILDER.build(), null, 0L, NO_NONCE);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java
index ed4b470d577..2d6f5bf5734 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java
@@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
 import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -70,7 +71,8 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
         namedQueueServices.put(namedQueueService.getEvent(), namedQueueService);
       } catch (InstantiationException | IllegalAccessException | NoSuchMethodException
         | InvocationTargetException e) {
-        LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", clz);
+        LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", clz,
+          e);
       }
     }
   }
@@ -105,8 +107,8 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
    * Add all in memory queue records to system table. The implementors can use system table or
    * direct HDFS file or ZK as persistence system.
    */
-  void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
-    namedQueueServices.get(namedQueueEvent).persistAll();
+  void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent, Connection connection) {
+    namedQueueServices.get(namedQueueEvent).persistAll(connection);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
index ba2eb3322d6..39cc093b2aa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
@@ -29,7 +29,8 @@ public class NamedQueuePayload {
   public enum NamedQueueEvent {
     SLOW_LOG(0),
     BALANCE_DECISION(1),
-    BALANCE_REJECTION(2);
+    BALANCE_REJECTION(2),
+    WAL_EVENT_TRACKER(3);
 
     private final int value;
 
@@ -48,6 +49,9 @@ public class NamedQueuePayload {
         case 2: {
           return BALANCE_REJECTION;
         }
+        case 3: {
+          return WAL_EVENT_TRACKER;
+        }
         default: {
           throw new IllegalArgumentException(
             "NamedQueue event with ordinal " + value + " not defined");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
index efe512b1a85..e079cef9745 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
@@ -22,6 +22,7 @@ import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
 import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
 import org.apache.hadoop.hbase.util.Threads;
@@ -60,7 +61,7 @@ public class NamedQueueRecorder {
 
     // disruptor initialization with BlockingWaitStrategy
     this.disruptor = new Disruptor<>(RingBufferEnvelope::new, getEventCount(eventCount),
-      new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d")
+      new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".named-queue-events-pool-%d")
         .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
       ProducerType.MULTI, new BlockingWaitStrategy());
     this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
@@ -139,10 +140,9 @@ public class NamedQueueRecorder {
    * Add all in memory queue records to system table. The implementors can use system table or
    * direct HDFS file or ZK as persistence system.
    */
-  public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
+  public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent, Connection connection) {
     if (this.logEventHandler != null) {
-      this.logEventHandler.persistAll(namedQueueEvent);
+      this.logEventHandler.persistAll(namedQueueEvent, connection);
     }
   }
-
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java
index 889323d9592..e0504a3c495 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.namequeues;
 
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
 import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -58,5 +59,5 @@ public interface NamedQueueService {
    * Add all in memory queue records to system table. The implementors can use system table or
    * direct HDFS file or ZK as persistence system.
    */
-  void persistAll();
+  void persistAll(Connection connection);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueServiceChore.java
similarity index 61%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueServiceChore.java
index 0de6c876989..67974681252 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueServiceChore.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.namequeues;
 
 import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,11 +28,16 @@ import org.slf4j.LoggerFactory;
  * Chore to insert multiple accumulated slow/large logs to hbase:slowlog system table
  */
 @InterfaceAudience.Private
-public class SlowLogTableOpsChore extends ScheduledChore {
+public class NamedQueueServiceChore extends ScheduledChore {
 
-  private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableOpsChore.class);
+  private static final Logger LOG = LoggerFactory.getLogger(NamedQueueServiceChore.class);
+  public static final String NAMED_QUEUE_CHORE_DURATION_KEY =
+    "hbase.regionserver.named.queue.chore.duration";
+  // 10 mins default.
+  public static final int NAMED_QUEUE_CHORE_DURATION_DEFAULT = 10 * 60 * 1000;
 
   private final NamedQueueRecorder namedQueueRecorder;
+  private final Connection connection;
 
   /**
    * Chore Constructor
@@ -41,21 +47,23 @@ public class SlowLogTableOpsChore extends ScheduledChore {
    *                           scheduled
    * @param namedQueueRecorder {@link NamedQueueRecorder} instance
    */
-  public SlowLogTableOpsChore(final Stoppable stopper, final int period,
-    final NamedQueueRecorder namedQueueRecorder) {
-    super("SlowLogTableOpsChore", stopper, period);
+  public NamedQueueServiceChore(final Stoppable stopper, final int period,
+    final NamedQueueRecorder namedQueueRecorder, Connection connection) {
+    super("NamedQueueServiceChore", stopper, period);
     this.namedQueueRecorder = namedQueueRecorder;
+    this.connection = connection;
   }
 
   @Override
   protected void chore() {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("SlowLog Table Ops Chore is starting up.");
-    }
-    namedQueueRecorder.persistAll(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("SlowLog Table Ops Chore is closing.");
+    for (NamedQueuePayload.NamedQueueEvent event : NamedQueuePayload.NamedQueueEvent.values()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Starting chore for event %s", event.name()));
+      }
+      namedQueueRecorder.persistAll(event, connection);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Stopping chore for event %s", event.name()));
+      }
     }
   }
-
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java
index 95c1ed53f52..b4104e6008f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -66,7 +67,7 @@ public class SlowLogPersistentService {
   /**
    * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
    */
-  public void addAllLogsToSysTable() {
+  public void addAllLogsToSysTable(Connection connection) {
     if (queueForSysTable == null) {
       LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.");
       return;
@@ -82,13 +83,13 @@ public class SlowLogPersistentService {
         slowLogPayloads.add(queueForSysTable.poll());
         i++;
         if (i == SYSTABLE_PUT_BATCH_SIZE) {
-          SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
+          SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, connection);
           slowLogPayloads.clear();
           i = 0;
         }
       }
       if (slowLogPayloads.size() > 0) {
-        SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
+        SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, connection);
       }
     } finally {
       LOCK.unlock();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerPayload.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerPayload.java
new file mode 100644
index 00000000000..9f549a72e51
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerPayload.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class WALEventTrackerPayload extends NamedQueuePayload {
+
+  private final String rsName;
+  private final String walName;
+  private final long timeStamp;
+  private final String state;
+  private final long walLength;
+
+  public WALEventTrackerPayload(String rsName, String walName, long timeStamp, String state,
+    long walLength) {
+    super(NamedQueueEvent.WAL_EVENT_TRACKER.getValue());
+    this.rsName = rsName;
+    this.walName = walName;
+    this.timeStamp = timeStamp;
+    this.state = state;
+    this.walLength = walLength;
+  }
+
+  public String getRsName() {
+    return rsName;
+  }
+
+  public String getWalName() {
+    return walName;
+  }
+
+  public long getTimeStamp() {
+    return timeStamp;
+  }
+
+  public String getState() {
+    return state;
+  }
+
+  public long getWalLength() {
+    return walLength;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(this.getClass().getSimpleName());
+    sb.append("[");
+    sb.append("rsName=").append(rsName);
+    sb.append(", walName=").append(walName);
+    sb.append(", timeStamp=").append(timeStamp);
+    sb.append(", walState=").append(state);
+    sb.append(", walLength=").append(walLength);
+    sb.append("]");
+    return sb.toString();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerQueueService.java
new file mode 100644
index 00000000000..ee57e23ab99
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerQueueService.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
+import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
+
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.Queue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
+
+/*
+  This class provides the queue to save Wal events from backing RingBuffer.
+ */
+@InterfaceAudience.Private
+public class WALEventTrackerQueueService implements NamedQueueService {
+
+  private EvictingQueue<WALEventTrackerPayload> queue;
+  private static final String WAL_EVENT_TRACKER_RING_BUFFER_SIZE =
+    "hbase.regionserver.wal.event.tracker.ringbuffer.size";
+  private final boolean walEventTrackerEnabled;
+  private int queueSize;
+  private MetricsWALEventTrackerSource source = null;
+
+  private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerQueueService.class);
+
+  public WALEventTrackerQueueService(Configuration conf) {
+    this(conf, null);
+  }
+
+  public WALEventTrackerQueueService(Configuration conf, MetricsWALEventTrackerSource source) {
+    this.walEventTrackerEnabled =
+      conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
+    if (!walEventTrackerEnabled) {
+      return;
+    }
+
+    this.queueSize = conf.getInt(WAL_EVENT_TRACKER_RING_BUFFER_SIZE, 256);
+    queue = EvictingQueue.create(queueSize);
+    if (source == null) {
+      this.source = CompatibilitySingletonFactory.getInstance(MetricsWALEventTrackerSource.class);
+    } else {
+      this.source = source;
+    }
+  }
+
+  @Override
+  public NamedQueuePayload.NamedQueueEvent getEvent() {
+    return NamedQueuePayload.NamedQueueEvent.WAL_EVENT_TRACKER;
+  }
+
+  @Override
+  public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
+    if (!walEventTrackerEnabled) {
+      return;
+    }
+    if (!(namedQueuePayload instanceof WALEventTrackerPayload)) {
+      LOG.warn("WALEventTrackerQueueService: NamedQueuePayload is not of type"
+        + " WALEventTrackerPayload.");
+      return;
+    }
+
+    WALEventTrackerPayload payload = (WALEventTrackerPayload) namedQueuePayload;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding wal event tracker payload " + payload);
+    }
+    addToQueue(payload);
+  }
+
+  /*
+   * Made it default to use it in testing.
+   */
+  synchronized void addToQueue(WALEventTrackerPayload payload) {
+    queue.add(payload);
+  }
+
+  @Override
+  public boolean clearNamedQueue() {
+    if (!walEventTrackerEnabled) {
+      return false;
+    }
+    LOG.debug("Clearing wal event tracker queue");
+    queue.clear();
+    return true;
+  }
+
+  @Override
+  public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
+    return null;
+  }
+
+  @Override
+  public void persistAll(Connection connection) {
+    if (!walEventTrackerEnabled) {
+      return;
+    }
+    if (queue.isEmpty()) {
+      LOG.debug("Wal Event tracker queue is empty.");
+      return;
+    }
+
+    Queue<WALEventTrackerPayload> queue = getWALEventTrackerList();
+    try {
+      WALEventTrackerTableAccessor.addWalEventTrackerRows(queue, connection);
+    } catch (Exception ioe) {
+      // If we fail to persist the records with retries then just forget about them.
+      // This is a best effort service.
+      LOG.error("Failed while persisting wal tracker records", ioe);
+      // Increment metrics for failed puts
+      source.incrFailedPuts(queue.size());
+    }
+  }
+
+  private synchronized Queue<WALEventTrackerPayload> getWALEventTrackerList() {
+    Queue<WALEventTrackerPayload> retQueue = new ArrayDeque<>();
+    Iterator<WALEventTrackerPayload> iterator = queue.iterator();
+    while (iterator.hasNext()) {
+      retQueue.add(iterator.next());
+    }
+    queue.clear();
+    return retQueue;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerTableAccessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerTableAccessor.java
new file mode 100644
index 00000000000..46ac5e3f914
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerTableAccessor.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_INFO_FAMILY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class WALEventTrackerTableAccessor {
+  public static final String RS_COLUMN = "region_server_name";
+  public static final String WAL_NAME_COLUMN = "wal_name";
+  public static final String TIMESTAMP_COLUMN = "timestamp";
+  public static final String WAL_STATE_COLUMN = "wal_state";
+  public static final String WAL_LENGTH_COLUMN = "wal_length";
+  public static final String MAX_ATTEMPTS_KEY = "wal.event.tracker.max.attempts";
+  public static final String SLEEP_INTERVAL_KEY = "wal.event.tracker.sleep.interval.msec";
+  public static final String MAX_SLEEP_TIME_KEY = "wal.event.tracker.max.sleep.time.msec";
+  public static final int DEFAULT_MAX_ATTEMPTS = 3;
+  public static final long DEFAULT_SLEEP_INTERVAL = 1000L; // 1 second
+  public static final long DEFAULT_MAX_SLEEP_TIME = 60000L; // 60 seconds
+  public static final String WAL_EVENT_TRACKER_TABLE_NAME_STR = "REPLICATION.WALEVENTTRACKER";
+  public static final String DELIMITER = "_";
+
+  private WALEventTrackerTableAccessor() {
+  }
+
+  /**
+   * {@link #WAL_EVENT_TRACKER_TABLE_NAME_STR} table name - can be enabled with config -
+   * hbase.regionserver.wal.event.tracker.enabled
+   */
+  public static final TableName WAL_EVENT_TRACKER_TABLE_NAME =
+    TableName.valueOf(WAL_EVENT_TRACKER_TABLE_NAME_STR);
+
+  private static void doPut(final Connection connection, final List<Put> puts) throws Exception {
+    RetryCounter retryCounter = getRetryFactory(connection.getConfiguration()).create();
+    while (true) {
+      try (Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME)) {
+        table.put(puts);
+        return;
+      } catch (IOException ioe) {
+        retryOrThrow(retryCounter, ioe);
+      }
+      retryCounter.sleepUntilNextRetry();
+    }
+  }
+
+  private static RetryCounterFactory getRetryFactory(Configuration conf) {
+    int maxAttempts = conf.getInt(MAX_ATTEMPTS_KEY, DEFAULT_MAX_ATTEMPTS);
+    long sleepIntervalMs = conf.getLong(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL);
+    long maxSleepTimeMs = conf.getLong(MAX_SLEEP_TIME_KEY, DEFAULT_MAX_SLEEP_TIME);
+    RetryCounter.RetryConfig retryConfig =
+      new RetryCounter.RetryConfig(maxAttempts, sleepIntervalMs, maxSleepTimeMs,
+        TimeUnit.MILLISECONDS, new RetryCounter.ExponentialBackoffPolicyWithLimit());
+    return new RetryCounterFactory(retryConfig);
+  }
+
+  private static void retryOrThrow(RetryCounter retryCounter, IOException ioe) throws IOException {
+    if (retryCounter.shouldRetry()) {
+      return;
+    }
+    throw ioe;
+  }
+
+  /**
+   * Add wal event tracker rows to hbase:waleventtracker table
+   * @param walEventPayloads List of walevents to process
+   * @param connection       Connection to use.
+   */
+  public static void addWalEventTrackerRows(Queue<WALEventTrackerPayload> walEventPayloads,
+    final Connection connection) throws Exception {
+    List<Put> puts = new ArrayList<>(walEventPayloads.size());
+    for (WALEventTrackerPayload payload : walEventPayloads) {
+      final byte[] rowKey = getRowKey(payload);
+      final Put put = new Put(rowKey);
+      // TODO Do we need to SKIP_WAL ?
+      put.setPriority(HConstants.NORMAL_QOS);
+      put
+        .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(RS_COLUMN),
+          Bytes.toBytes(payload.getRsName()))
+        .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_NAME_COLUMN),
+          Bytes.toBytes(payload.getWalName()))
+        .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(TIMESTAMP_COLUMN),
+          Bytes.toBytes(payload.getTimeStamp()))
+        .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_STATE_COLUMN),
+          Bytes.toBytes(payload.getState()))
+        .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_LENGTH_COLUMN),
+          Bytes.toBytes(payload.getWalLength()));
+      puts.add(put);
+    }
+    doPut(connection, puts);
+  }
+
+  /**
+   * Create rowKey: 1. We want RS name to be the leading part of rowkey so that we can query by RS
+   * name filter. WAL name contains rs name as a leading part. 2. Timestamp when the event was
+   * generated. 3. Add state of the wal. Combination of 1 + 2 + 3 is definitely going to create a
+   * unique rowkey.
+   * @param payload payload to process
+   * @return rowKey byte[]
+   */
+  public static byte[] getRowKey(final WALEventTrackerPayload payload) {
+    String walName = payload.getWalName();
+    // converting to string since this will help seeing the timestamp in string format using
+    // hbase shell commands.
+    String timestampStr = String.valueOf(payload.getTimeStamp());
+    String walState = payload.getState();
+    final String rowKeyStr = walName + DELIMITER + timestampStr + DELIMITER + walState;
+    return Bytes.toBytes(rowKeyStr);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java
index 45bfca11270..885e2d44279 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java
@@ -24,6 +24,7 @@ import java.util.Queue;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.BalancerDecision;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
 import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
 import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
@@ -141,7 +142,7 @@ public class BalancerDecisionQueueService implements NamedQueueService {
   }
 
   @Override
-  public void persistAll() {
+  public void persistAll(Connection connection) {
     // no-op for now
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java
index 79b7325b305..fb94db2b917 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java
@@ -24,6 +24,7 @@ import java.util.Queue;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.BalancerRejection;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
 import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
 import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
@@ -127,7 +128,7 @@ public class BalancerRejectionQueueService implements NamedQueueService {
   }
 
   @Override
-  public void persistAll() {
+  public void persistAll(Connection connection) {
     // no-op for now
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java
index 03b6aa719ea..86b24e9d975 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java
@@ -25,6 +25,7 @@ import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.SlowLogParams;
 import org.apache.hadoop.hbase.ipc.RpcCall;
 import org.apache.hadoop.hbase.namequeues.LogHandlerUtils;
@@ -223,12 +224,12 @@ public class SlowLogQueueService implements NamedQueueService {
    * table.
    */
   @Override
-  public void persistAll() {
+  public void persistAll(Connection connection) {
     if (!isOnlineLogProviderEnabled) {
       return;
     }
     if (slowLogPersistentService != null) {
-      slowLogPersistentService.addAllLogsToSysTable();
+      slowLogPersistentService.addAllLogsToSysTable(connection);
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index b3bda6c27d7..c3589d57df1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -19,8 +19,17 @@ package org.apache.hadoop.hbase.regionserver;
 
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION;
 import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
 import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
+import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
+import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
+import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT;
+import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
+import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT;
+import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
+import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
+import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
 import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
 
 import io.opentelemetry.api.trace.Span;
@@ -129,12 +138,11 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.LoadBalancer;
-import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
 import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
-import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
+import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore;
 import org.apache.hadoop.hbase.net.Address;
 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
 import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
@@ -156,7 +164,10 @@ import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
 import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
 import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
 import org.apache.hadoop.hbase.security.SecurityConstants;
@@ -457,7 +468,7 @@ public class HRegionServer extends Thread
 
   private final RegionServerAccounting regionServerAccounting;
 
-  private SlowLogTableOpsChore slowLogTableOpsChore = null;
+  private NamedQueueServiceChore namedQueueServiceChore = null;
 
   // Block cache
   private BlockCache blockCache;
@@ -590,6 +601,11 @@ public class HRegionServer extends Thread
   // A timer to shutdown the process if abort takes too long
   private Timer abortMonitor;
 
+  /*
+   * Chore that creates replication marker rows.
+   */
+  private ReplicationMarkerChore replicationMarkerChore;
+
   /**
    * Starts a HRegionServer at the default location.
    * <p/>
@@ -636,7 +652,7 @@ public class HRegionServer extends Thread
       this.abortRequested = new AtomicBoolean(false);
       this.stopped = false;
 
-      initNamedQueueRecorder(conf);
+      this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
       rpcServices = createRpcServices();
       useThisHostnameInstead = getUseThisHostnameInstead(conf);
 
@@ -725,26 +741,6 @@ public class HRegionServer extends Thread
     }
   }
 
-  private void initNamedQueueRecorder(Configuration conf) {
-    if (!(this instanceof HMaster)) {
-      final boolean isOnlineLogProviderEnabled = conf.getBoolean(
-        HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
-      if (isOnlineLogProviderEnabled) {
-        this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
-      }
-    } else {
-      final boolean isBalancerDecisionRecording =
-        conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
-          BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
-      final boolean isBalancerRejectionRecording =
-        conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
-          BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
-      if (isBalancerDecisionRecording || isBalancerRejectionRecording) {
-        this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
-      }
-    }
-  }
-
   // HMaster should override this method to load the specific config for master
   protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
     String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
@@ -1036,6 +1032,17 @@ public class HRegionServer extends Thread
       || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
   }
 
+  private void initializeReplicationMarkerChore() {
+    boolean replicationMarkerEnabled =
+      conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
+    // If replication or replication marker is not enabled then return immediately.
+    if (replicationMarkerEnabled) {
+      int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY,
+        REPLICATION_MARKER_CHORE_DURATION_DEFAULT);
+      replicationMarkerChore = new ReplicationMarkerChore(this, this, period);
+    }
+  }
+
   /**
    * The HRegionServer sticks in this loop until closed.
    */
@@ -2049,9 +2056,23 @@ public class HRegionServer extends Thread
     }
     // Instantiate replication if replication enabled. Pass it the log directories.
     createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
+
+    WALActionsListener walEventListener = getWALEventTrackerListener(conf);
+    if (walEventListener != null && factory.getWALProvider() != null) {
+      factory.getWALProvider().addWALActionsListener(walEventListener);
+    }
     this.walFactory = factory;
   }
 
+  private WALActionsListener getWALEventTrackerListener(Configuration conf) {
+    if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) {
+      WALEventTrackerListener listener =
+        new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName());
+      return listener;
+    }
+    return null;
+  }
+
   /**
    * Start up replication source and sink handlers.
    */
@@ -2205,16 +2226,19 @@ public class HRegionServer extends Thread
     if (this.fsUtilizationChore != null) {
       choreService.scheduleChore(fsUtilizationChore);
     }
-    if (this.slowLogTableOpsChore != null) {
-      choreService.scheduleChore(slowLogTableOpsChore);
+    if (this.namedQueueServiceChore != null) {
+      choreService.scheduleChore(namedQueueServiceChore);
     }
     if (this.brokenStoreFileCleaner != null) {
       choreService.scheduleChore(brokenStoreFileCleaner);
     }
-
     if (this.rsMobFileCleanerChore != null) {
       choreService.scheduleChore(rsMobFileCleanerChore);
     }
+    if (replicationMarkerChore != null) {
+      LOG.info("Starting replication marker chore");
+      choreService.scheduleChore(replicationMarkerChore);
+    }
 
     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
     // an unhandled exception, it will just exit.
@@ -2262,10 +2286,22 @@ public class HRegionServer extends Thread
 
     final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
       HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
-    if (isSlowLogTableEnabled) {
+    final boolean walEventTrackerEnabled =
+      conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
+
+    if (isSlowLogTableEnabled || walEventTrackerEnabled) {
       // default chore duration: 10 min
-      final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
-      slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder);
+      // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property
+      final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY,
+        DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION);
+
+      final int namedQueueChoreDuration =
+        conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT);
+      // Considering min of slowLogChoreDuration and namedQueueChoreDuration
+      int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration);
+
+      namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration,
+        this.namedQueueRecorder, this.getConnection());
     }
 
     if (this.nonceManager != null) {
@@ -2315,6 +2351,7 @@ public class HRegionServer extends Thread
     this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
 
     registerConfigurationObservers();
+    initializeReplicationMarkerChore();
   }
 
   private void registerConfigurationObservers() {
@@ -2795,7 +2832,8 @@ public class HRegionServer extends Thread
       shutdownChore(healthCheckChore);
       shutdownChore(storefileRefresher);
       shutdownChore(fsUtilizationChore);
-      shutdownChore(slowLogTableOpsChore);
+      shutdownChore(namedQueueServiceChore);
+      shutdownChore(replicationMarkerChore);
       shutdownChore(rsMobFileCleanerChore);
       // cancel the remaining scheduled chores (in case we missed out any)
       // TODO: cancel will not cleanup the chores, so we need make sure we do not miss any
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 3d7678c37c6..18b8f7990c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -601,7 +601,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
     return newPath;
   }
 
-  Path getOldPath() {
+  public Path getOldPath() {
     long currentFilenum = this.filenum.get();
     Path oldPath = null;
     if (currentFilenum > 0) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEventTrackerListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEventTrackerListener.java
new file mode 100644
index 00000000000..487c7de4170
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEventTrackerListener.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
+import org.apache.hadoop.hbase.namequeues.WALEventTrackerPayload;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class WALEventTrackerListener implements WALActionsListener {
+  private final Configuration conf;
+  private final NamedQueueRecorder namedQueueRecorder;
+  private final String serverName;
+
+  public enum WalState {
+    ROLLING,
+    ROLLED,
+    ACTIVE
+  }
+
+  public WALEventTrackerListener(Configuration conf, NamedQueueRecorder namedQueueRecorder,
+    ServerName serverName) {
+    this.conf = conf;
+    this.namedQueueRecorder = namedQueueRecorder;
+    this.serverName = serverName.getHostname();
+  }
+
+  @Override
+  public void preLogRoll(Path oldPath, Path newPath) {
+    if (oldPath != null) {
+      // oldPath can be null for first wal
+      // Just persist the last component of path not the whole walName which includes filesystem
+      // scheme, walDir.
+      WALEventTrackerPayload payloadForOldPath =
+        getPayload(oldPath.getName(), WalState.ROLLING.name(), 0L);
+      this.namedQueueRecorder.addRecord(payloadForOldPath);
+    }
+  }
+
+  @Override
+  public void postLogRoll(Path oldPath, Path newPath) {
+    // Create 2 entries entry in RingBuffer.
+    // 1. Change state to Rolled for oldPath
+    // 2. Change state to Active for newPath.
+    if (oldPath != null) {
+      // oldPath can be null for first wal
+      // Just persist the last component of path not the whole walName which includes filesystem
+      // scheme, walDir.
+
+      long fileLength = 0L;
+      try {
+        FileSystem fs = oldPath.getFileSystem(this.conf);
+        fileLength = fs.getFileStatus(oldPath).getLen();
+      } catch (IOException ioe) {
+        // Saving wal length is best effort. In case of any exception just ignore.
+      }
+      WALEventTrackerPayload payloadForOldPath =
+        getPayload(oldPath.getName(), WalState.ROLLED.name(), fileLength);
+      this.namedQueueRecorder.addRecord(payloadForOldPath);
+    }
+
+    WALEventTrackerPayload payloadForNewPath =
+      getPayload(newPath.getName(), WalState.ACTIVE.name(), 0L);
+    this.namedQueueRecorder.addRecord(payloadForNewPath);
+  }
+
+  private WALEventTrackerPayload getPayload(String path, String state, long walLength) {
+    long timestamp = EnvironmentEdgeManager.currentTime();
+    WALEventTrackerPayload payload =
+      new WALEventTrackerPayload(serverName, path, timestamp, state, walLength);
+    return payload;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index d89d03a145c..cf41beb1074 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.TreeMap;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -28,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -217,4 +221,12 @@ public class WALUtil {
       cells.trimToSize();
     }
   }
+
+  public static void writeReplicationMarkerAndSync(WAL wal, MultiVersionConcurrencyControl mvcc,
+    RegionInfo regionInfo, byte[] rowKey, long timestamp) throws IOException {
+    NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    replicationScope.put(WALEdit.METAFAMILY, REPLICATION_SCOPE_GLOBAL);
+    writeMarker(wal, replicationScope, regionInfo,
+      WALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null);
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java
new file mode 100644
index 00000000000..38cf33090d9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This will create {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} table if
+ * hbase.regionserver.replication.sink.tracker.enabled config key is enabled and table not created
+ **/
+@InterfaceAudience.Private
+public final class ReplicationSinkTrackerTableCreator {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(ReplicationSinkTrackerTableCreator.class);
+  private static final long TTL = TimeUnit.DAYS.toSeconds(365); // 1 year in seconds
+
+  public static final byte[] RS_COLUMN = Bytes.toBytes("region_server_name");
+  public static final byte[] WAL_NAME_COLUMN = Bytes.toBytes("wal_name");
+  public static final byte[] TIMESTAMP_COLUMN = Bytes.toBytes("timestamp");
+  public static final byte[] OFFSET_COLUMN = Bytes.toBytes("offset");
+
+  /** Will create {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} table if this conf is enabled **/
+  public static final String REPLICATION_SINK_TRACKER_ENABLED_KEY =
+    "hbase.regionserver.replication.sink.tracker.enabled";
+  public static final boolean REPLICATION_SINK_TRACKER_ENABLED_DEFAULT = false;
+
+  /** The {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} info family as a string */
+  private static final String REPLICATION_SINK_TRACKER_INFO_FAMILY_STR = "info";
+
+  /** The {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} info family in array of bytes */
+  public static final byte[] REPLICATION_SINK_TRACKER_INFO_FAMILY =
+    Bytes.toBytes(REPLICATION_SINK_TRACKER_INFO_FAMILY_STR);
+
+  public static final String REPLICATION_SINK_TRACKER_TABLE_NAME_STR = "REPLICATION.SINK_TRACKER";
+
+  /* Private default constructor */
+  private ReplicationSinkTrackerTableCreator() {
+  }
+
+  /**
+   * {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} table name - can be enabled with config -
+   * hbase.regionserver.replication.sink.tracker.enabled
+   */
+  public static final TableName REPLICATION_SINK_TRACKER_TABLE_NAME =
+    TableName.valueOf(REPLICATION_SINK_TRACKER_TABLE_NAME_STR);
+
+  private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER = TableDescriptorBuilder
+    .newBuilder(REPLICATION_SINK_TRACKER_TABLE_NAME).setRegionReplication(1)
+    .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(REPLICATION_SINK_TRACKER_INFO_FAMILY)
+      .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBlockCacheEnabled(false).setMaxVersions(1)
+      .setTimeToLive((int) TTL).build());
+
+  /*
+   * We will create this table only if hbase.regionserver.replication.sink.tracker.enabled is
+   * enabled and table doesn't exists already.
+   */
+  public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices)
+    throws IOException {
+    boolean replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY,
+      REPLICATION_SINK_TRACKER_ENABLED_DEFAULT);
+    if (!replicationSinkTrackerEnabled) {
+      LOG.info("replication sink tracker requests logging to table {} is disabled." + " Quitting.",
+        REPLICATION_SINK_TRACKER_TABLE_NAME_STR);
+      return;
+    }
+    if (!masterServices.getTableDescriptors().exists(REPLICATION_SINK_TRACKER_TABLE_NAME)) {
+      LOG.info("{} table not found. Creating.", REPLICATION_SINK_TRACKER_TABLE_NAME_STR);
+      masterServices.createTable(TABLE_DESCRIPTOR_BUILDER.build(), null, 0L, NO_NONCE);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationMarkerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationMarkerChore.java
new file mode 100644
index 00000000000..b14f546540f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationMarkerChore.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This chore is responsible to create replication marker rows with special WALEdit with family as
+ * {@link org.apache.hadoop.hbase.wal.WALEdit#METAFAMILY} and column qualifier as
+ * {@link WALEdit#REPLICATION_MARKER} and empty value. If config key
+ * {@link #REPLICATION_MARKER_ENABLED_KEY} is set to true, then we will create 1 marker row every
+ * {@link #REPLICATION_MARKER_CHORE_DURATION_KEY} ms
+ * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader} will populate
+ * the Replication Marker edit with region_server_name, wal_name and wal_offset encoded in
+ * {@link org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ReplicationMarkerDescriptor}
+ * object. {@link org.apache.hadoop.hbase.replication.regionserver.Replication} will change the
+ * REPLICATION_SCOPE for this edit to GLOBAL so that it can replicate. On the sink cluster,
+ * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSink} will convert the
+ * ReplicationMarkerDescriptor into a Put mutation to REPLICATION_SINK_TRACKER_TABLE_NAME_STR table.
+ */
+@InterfaceAudience.Private
+public class ReplicationMarkerChore extends ScheduledChore {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplicationMarkerChore.class);
+  private static final MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl();
+  public static final RegionInfo REGION_INFO =
+    RegionInfoBuilder.newBuilder(REPLICATION_SINK_TRACKER_TABLE_NAME).build();
+  private static final String DELIMITER = "_";
+  private final RegionServerServices rsServices;
+  private WAL wal;
+
+  public static final String REPLICATION_MARKER_ENABLED_KEY =
+    "hbase.regionserver.replication.marker.enabled";
+  public static final boolean REPLICATION_MARKER_ENABLED_DEFAULT = false;
+
+  public static final String REPLICATION_MARKER_CHORE_DURATION_KEY =
+    "hbase.regionserver.replication.marker.chore.duration";
+  public static final int REPLICATION_MARKER_CHORE_DURATION_DEFAULT = 30 * 1000; // 30 seconds
+
+  public ReplicationMarkerChore(final Stoppable stopper, final RegionServerServices rsServices,
+    int period) {
+    super("ReplicationTrackerChore", stopper, period);
+    this.rsServices = rsServices;
+  }
+
+  @Override
+  protected void chore() {
+    if (wal == null) {
+      try {
+        // TODO: We need to add support for multi WAL implementation.
+        wal = rsServices.getWAL(null);
+      } catch (IOException ioe) {
+        LOG.warn("Unable to get WAL ", ioe);
+        // Shouldn't happen. Ignore and wait for the next chore run.
+        return;
+      }
+    }
+    String serverName = rsServices.getServerName().getServerName();
+    long timeStamp = EnvironmentEdgeManager.currentTime();
+    // We only have timestamp in ReplicationMarkerDescriptor and the remaining properties walname,
+    // regionserver name and wal offset at ReplicationSourceWALReaderThread.
+    byte[] rowKey = getRowKey(serverName, timeStamp);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Creating replication marker edit.");
+    }
+
+    // This creates a new ArrayList of all the online regions for every call.
+    List<? extends Region> regions = rsServices.getRegions();
+
+    if (regions.isEmpty()) {
+      LOG.info("There are no online regions for this server, so skipping adding replication marker"
+        + " rows for this regionserver");
+      return;
+    }
+    Region region = regions.get(ThreadLocalRandom.current().nextInt(regions.size()));
+    try {
+      WALUtil.writeReplicationMarkerAndSync(wal, MVCC, region.getRegionInfo(), rowKey, timeStamp);
+    } catch (IOException ioe) {
+      LOG.error("Exception while sync'ing replication tracker edit", ioe);
+      // TODO: Should we stop region server or add a metric and keep going.
+    }
+  }
+
+  /**
+   * Creates a rowkey with region server name and timestamp.
+   * @param serverName region server name
+   * @param timestamp  timestamp
+   */
+  public static byte[] getRowKey(String serverName, long timestamp) {
+    // converting to string since this will help seeing the timestamp in string format using
+    // hbase shell commands.
+    String timestampStr = String.valueOf(timestamp);
+    final String rowKeyStr = serverName + DELIMITER + timestampStr;
+    return Bytes.toBytes(rowKeyStr);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 3b4ec3ea8b9..7c5583a8742 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -17,6 +17,16 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN;
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_DEFAULT;
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY;
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_INFO_FAMILY;
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN;
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN;
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN;
+
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -61,6 +71,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
 
@@ -107,6 +118,7 @@ public class ReplicationSink {
    * Row size threshold for multi requests above which a warning is logged
    */
   private final int rowSizeWarnThreshold;
+  private boolean replicationSinkTrackerEnabled;
 
   /**
    * Create a sink for replication
@@ -117,6 +129,8 @@ public class ReplicationSink {
     this.conf = HBaseConfiguration.create(conf);
     rowSizeWarnThreshold =
       conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
+    replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY,
+      REPLICATION_SINK_TRACKER_ENABLED_DEFAULT);
     decorateConf();
     this.metrics = new MetricsSink();
     this.walEntrySinkFilter = setupWALEntrySinkFilter();
@@ -230,6 +244,18 @@ public class ReplicationSink {
                 bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
               buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
             }
+          } else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) {
+            Mutation put = processReplicationMarkerEntry(cell);
+            if (put == null) {
+              continue;
+            }
+            table = REPLICATION_SINK_TRACKER_TABLE_NAME;
+            List<UUID> clusterIds = new ArrayList<>();
+            for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
+              clusterIds.add(toUUID(clusterId));
+            }
+            put.setClusterIds(clusterIds);
+            addToHashMultiMap(rowMap, table, clusterIds, put);
           } else {
             // Handle wal replication
             if (isNewRowOrType(previousCell, cell)) {
@@ -292,6 +318,33 @@ public class ReplicationSink {
     }
   }
 
+  /*
+   * First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not.
+   * If false, then ignore this cell. If set to true, de-serialize value into
+   * ReplicationTrackerDescriptor. Create a Put mutation with regionserver name, walname, offset and
+   * timestamp from ReplicationMarkerDescriptor.
+   */
+  private Put processReplicationMarkerEntry(Cell cell) throws IOException {
+    // If source is emitting replication marker rows but sink is not accepting them,
+    // ignore the edits.
+    if (!replicationSinkTrackerEnabled) {
+      return null;
+    }
+    WALProtos.ReplicationMarkerDescriptor descriptor =
+      WALProtos.ReplicationMarkerDescriptor.parseFrom(new ByteArrayInputStream(cell.getValueArray(),
+        cell.getValueOffset(), cell.getValueLength()));
+    Put put = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, RS_COLUMN, cell.getTimestamp(),
+      (Bytes.toBytes(descriptor.getRegionServerName())));
+    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, WAL_NAME_COLUMN, cell.getTimestamp(),
+      Bytes.toBytes(descriptor.getWalName()));
+    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, TIMESTAMP_COLUMN, cell.getTimestamp(),
+      Bytes.toBytes(cell.getTimestamp()));
+    put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, OFFSET_COLUMN, cell.getTimestamp(),
+      Bytes.toBytes(descriptor.getOffset()));
+    return put;
+  }
+
   private void buildBulkLoadHFileMap(
     final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
     BulkLoadDescriptor bld) throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index a827a2555fb..2db8acbd2dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -827,4 +827,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
   public String logPeerId() {
     return "peerId=" + this.getPeerId() + ",";
   }
+
+  // Visible for testing purpose
+  public long getTotalReplicatedEdits() {
+    return totalReplicatedEdits.get();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
index 6e5da0feffb..7337694addb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
@@ -69,6 +69,10 @@ class ReplicationSourceWALActionListener implements WALActionsListener {
     if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
       return;
     }
+    // Allow replication marker row to pass through.
+    if (WALEdit.isReplicationMarkerEdit(logEdit)) {
+      return;
+    }
     // For replay, or if all the cells are markers, do not need to store replication scope.
     if (
       logEdit.isReplay() || logEdit.getCells().stream().allMatch(c -> WALEdit.isMetaEditFamily(c))
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 28768679fb6..d6351ea0eab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -30,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
@@ -42,6 +44,9 @@ import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
 
@@ -178,6 +183,7 @@ class ReplicationSourceWALReader extends Thread {
     }
     LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}",
       entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
+    updateReplicationMarkerEdit(entry, batch.getLastWalPosition());
     long entrySize = getEntrySizeIncludeBulkLoad(entry);
     long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
     batch.addEntry(entry, entrySize);
@@ -341,6 +347,10 @@ class ReplicationSourceWALReader extends Thread {
   }
 
   protected final Entry filterEntry(Entry entry) {
+    // Always replicate if this edit is Replication Marker edit.
+    if (entry != null && WALEdit.isReplicationMarkerEdit(entry.getEdit())) {
+      return entry;
+    }
     Entry filtered = filter.filter(entry);
     if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) {
       LOG.trace("Filtered entry for replication: {}", entry);
@@ -451,6 +461,36 @@ class ReplicationSourceWALReader extends Thread {
     return totalStoreFilesSize;
   }
 
+  /*
+   * Create @ReplicationMarkerDescriptor with region_server_name, wal_name and offset and set to
+   * cell's value.
+   */
+  private void updateReplicationMarkerEdit(Entry entry, long offset) {
+    WALEdit edit = entry.getEdit();
+    // Return early if it is not ReplicationMarker edit.
+    if (!WALEdit.isReplicationMarkerEdit(edit)) {
+      return;
+    }
+    List<Cell> cells = edit.getCells();
+    Preconditions.checkArgument(cells.size() == 1, "ReplicationMarker should have only 1 cell");
+    Cell cell = cells.get(0);
+    // Create a descriptor with region_server_name, wal_name and offset
+    WALProtos.ReplicationMarkerDescriptor.Builder builder =
+      WALProtos.ReplicationMarkerDescriptor.newBuilder();
+    builder.setRegionServerName(this.source.getServer().getServerName().getHostname());
+    builder.setWalName(getCurrentPath().getName());
+    builder.setOffset(offset);
+    WALProtos.ReplicationMarkerDescriptor descriptor = builder.build();
+
+    // Create a new KeyValue
+    KeyValue kv = new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell),
+      CellUtil.cloneQualifier(cell), cell.getTimestamp(), descriptor.toByteArray());
+    ArrayList<Cell> newCells = new ArrayList<>();
+    newCells.add(kv);
+    // Update edit with new cell.
+    edit.setCells(newCells);
+  }
+
   /**
    * @param size delta size for grown buffer
    * @return true if we should clear buffer and push all
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
index bc24f8e8d3c..7a2086486bb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
@@ -132,6 +132,22 @@ public class WALEdit implements HeapSize {
   @InterfaceAudience.Private
   public static final byte[] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD");
 
+  /**
+   * Periodically {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore}
+   * will create marker edits with family as {@link WALEdit#METAFAMILY} and
+   * {@link WALEdit#REPLICATION_MARKER} as qualifier and an empty value.
+   * org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader will populate the
+   * Replication Marker edit with region_server_name, wal_name and wal_offset encoded in
+   * {@link org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ReplicationMarkerDescriptor}
+   * object. {@link org.apache.hadoop.hbase.replication.regionserver.Replication} will change the
+   * REPLICATION_SCOPE for this edit to GLOBAL so that it can replicate. On the sink cluster,
+   * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSink} will convert the
+   * ReplicationMarkerDescriptor into a Put mutation to REPLICATION_SINK_TRACKER_TABLE_NAME_STR
+   * table.
+   */
+  @InterfaceAudience.Private
+  public static final byte[] REPLICATION_MARKER = Bytes.toBytes("HBASE::REPLICATION_MARKER");
+
   private final transient boolean replay;
 
   private ArrayList<Cell> cells;
@@ -454,4 +470,28 @@ public class WALEdit implements HeapSize {
     this.cells.add(cell);
     return this;
   }
+
+  /**
+   * Creates a replication tracker edit with {@link #METAFAMILY} family and
+   * {@link #REPLICATION_MARKER} qualifier and has null value.
+   * @param rowKey    rowkey
+   * @param timestamp timestamp
+   */
+  public static WALEdit createReplicationMarkerEdit(byte[] rowKey, long timestamp) {
+    KeyValue kv =
+      new KeyValue(rowKey, METAFAMILY, REPLICATION_MARKER, timestamp, KeyValue.Type.Put);
+    return new WALEdit().add(kv);
+  }
+
+  /**
+   * Checks whether this edit is a replication marker edit.
+   * @param edit edit
+   * @return true if the cell within an edit has column = METAFAMILY and qualifier =
+   *         REPLICATION_MARKER, false otherwise
+   */
+  public static boolean isReplicationMarkerEdit(WALEdit edit) {
+    // Check just the first cell from the edit. ReplicationMarker edit will have only 1 cell.
+    return edit.getCells().size() == 1
+      && CellUtil.matchingColumn(edit.getCells().get(0), METAFAMILY, REPLICATION_MARKER);
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index a6463094bea..ee093855bce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -318,6 +318,13 @@ public class WALSplitter {
       Entry entry;
       startTS = EnvironmentEdgeManager.currentTime();
       while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) {
+        if (WALEdit.isReplicationMarkerEdit(entry.getEdit())) {
+          // Skip processing the replication marker edits.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Ignoring Replication marker edits.");
+          }
+          continue;
+        }
         byte[] region = entry.getKey().getEncodedRegionName();
         String encodedRegionNameAsStr = Bytes.toString(region);
         Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTracker.java
new file mode 100644
index 00000000000..d4d210879d3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTracker.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
+import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.RS_COLUMN;
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.TIMESTAMP_COLUMN;
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME;
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_LENGTH_COLUMN;
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_NAME_COLUMN;
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_STATE_COLUMN;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestWALEventTracker {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestWALEventTracker.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
+  private static HBaseTestingUtility TEST_UTIL;
+  public static Configuration CONF;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    CONF = HBaseConfiguration.create();
+    CONF.setBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, true);
+    // Set the chore for less than a second.
+    CONF.setInt(NAMED_QUEUE_CHORE_DURATION_KEY, 900);
+    CONF.setLong(WALEventTrackerTableAccessor.SLEEP_INTERVAL_KEY, 100);
+    TEST_UTIL = new HBaseTestingUtility(CONF);
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void teardown() throws Exception {
+    LOG.info("Calling teardown");
+    TEST_UTIL.shutdownMiniHBaseCluster();
+  }
+
+  @Before
+  public void waitForWalEventTrackerTableCreation() {
+    Waiter.waitFor(CONF, 10000,
+      (Waiter.Predicate) () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME));
+  }
+
+  @Test
+  public void testWALRolling() throws Exception {
+    Connection connection = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getConnection();
+    waitForWALEventTrackerTable(connection);
+    List<WAL> wals = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWALs();
+    assertEquals(1, wals.size());
+    AbstractFSWAL wal = (AbstractFSWAL) wals.get(0);
+    Path wal1Path = wal.getOldPath();
+    wal.rollWriter(true);
+
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    long wal1Length = fs.getFileStatus(wal1Path).getLen();
+    Path wal2Path = wal.getOldPath();
+    String hostName =
+      TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName().getHostname();
+
+    TEST_UTIL.waitFor(5000, () -> getTableCount(connection) >= 3);
+    List<WALEventTrackerPayload> walEventsList = getRows(hostName, connection);
+
+    // There should be atleast 2 events for wal1Name, with ROLLING and ROLLED state. Most of the
+    // time we will lose ACTIVE event for the first wal creates since hmaster will take some time
+    // to create hbase:waleventtracker table and by that time RS will already create the first wal
+    // and will try to persist it.
+    compareEvents(hostName, wal1Path.getName(), walEventsList,
+      new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ROLLING.name(),
+        WALEventTrackerListener.WalState.ROLLED.name())),
+      false);
+
+    // There should be only 1 event for wal2Name which is current wal, with ACTIVE state
+    compareEvents(hostName, wal2Path.getName(), walEventsList,
+      new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ACTIVE.name())), true);
+
+    // Check that event with wal1Path and state ROLLED has the wal length set.
+    checkWALRolledEventHasSize(walEventsList, wal1Path.getName(), wal1Length);
+  }
+
+  private void checkWALRolledEventHasSize(List<WALEventTrackerPayload> walEvents, String walName,
+    long actualSize) {
+    List<WALEventTrackerPayload> eventsFilteredByNameState = new ArrayList<>();
+    // Filter the list by walName and wal state.
+    for (WALEventTrackerPayload event : walEvents) {
+      if (
+        walName.equals(event.getWalName())
+          && WALEventTrackerListener.WalState.ROLLED.name().equals(event.getState())
+      ) {
+        eventsFilteredByNameState.add(event);
+      }
+    }
+
+    assertEquals(1, eventsFilteredByNameState.size());
+    // We are not comparing the size of the WAL in the tracker table with actual size.
+    // For AsyncWAL implementation, since the WAL file is closed in an async fashion, the WAL length
+    // will always be incorrect.
+    // For FSHLog implementation, we close the WAL in an executor thread. So there will always be
+    // a difference of trailer size bytes.
+    // assertEquals(actualSize, eventsFilteredByNameState.get(0).getWalLength());
+  }
+
+  /**
+   * Compare the events from @{@link WALEventTrackerTableAccessor#WAL_EVENT_TRACKER_TABLE_NAME}
+   * @param hostName       hostname
+   * @param walName        walname
+   * @param walEvents      event from table
+   * @param expectedStates expected states for the hostname and wal name
+   * @param strict         whether to check strictly or not. Sometimes we lose the ACTIVE state
+   *                       event for the first wal since it takes some time for hmaster to create
+   *                       the table and by that time RS already creates the first WAL and will try
+   *                       to persist ACTIVE event to waleventtracker table.
+   */
+  private void compareEvents(String hostName, String walName,
+    List<WALEventTrackerPayload> walEvents, List<String> expectedStates, boolean strict) {
+    List<WALEventTrackerPayload> eventsFilteredByWalName = new ArrayList<>();
+
+    // Assert that all the events have the same host name i.e they came from the same RS.
+    for (WALEventTrackerPayload event : walEvents) {
+      assertEquals(hostName, event.getRsName());
+    }
+
+    // Filter the list by walName.
+    for (WALEventTrackerPayload event : walEvents) {
+      if (walName.equals(event.getWalName())) {
+        eventsFilteredByWalName.add(event);
+      }
+    }
+
+    // Assert that the list of events after filtering by walName should be same as expected states.
+    if (strict) {
+      assertEquals(expectedStates.size(), eventsFilteredByWalName.size());
+    }
+
+    for (WALEventTrackerPayload event : eventsFilteredByWalName) {
+      expectedStates.remove(event.getState());
+    }
+    assertEquals(0, expectedStates.size());
+  }
+
+  private void waitForWALEventTrackerTable(Connection connection) throws IOException {
+    TEST_UTIL.waitFor(5000, () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME));
+  }
+
+  private List<WALEventTrackerPayload> getRows(String rowKeyPrefix, Connection connection)
+    throws IOException {
+    List<WALEventTrackerPayload> list = new ArrayList<>();
+    Scan scan = new Scan();
+    scan.withStartRow(Bytes.toBytes(rowKeyPrefix));
+    Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME);
+    ResultScanner scanner = table.getScanner(scan);
+
+    Result r;
+    while ((r = scanner.next()) != null) {
+      List<Cell> cells = r.listCells();
+      list.add(getPayload(cells));
+    }
+    return list;
+  }
+
+  private WALEventTrackerPayload getPayload(List<Cell> cells) {
+    String rsName = null, walName = null, walState = null;
+    long timestamp = 0L, walLength = 0L;
+    for (Cell cell : cells) {
+      byte[] qualifier = CellUtil.cloneQualifier(cell);
+      byte[] value = CellUtil.cloneValue(cell);
+      String qualifierStr = Bytes.toString(qualifier);
+
+      if (RS_COLUMN.equals(qualifierStr)) {
+        rsName = Bytes.toString(value);
+      } else if (WAL_NAME_COLUMN.equals(qualifierStr)) {
+        walName = Bytes.toString(value);
+      } else if (WAL_STATE_COLUMN.equals(qualifierStr)) {
+        walState = Bytes.toString(value);
+      } else if (TIMESTAMP_COLUMN.equals(qualifierStr)) {
+        timestamp = Bytes.toLong(value);
+      } else if (WAL_LENGTH_COLUMN.equals(qualifierStr)) {
+        walLength = Bytes.toLong(value);
+      }
+    }
+    return new WALEventTrackerPayload(rsName, walName, timestamp, walState, walLength);
+  }
+
+  private int getTableCount(Connection connection) throws Exception {
+    Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME);
+    ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM));
+    int count = 0;
+    while (resultScanner.next() != null) {
+      count++;
+    }
+    LOG.info("Table count: " + count);
+    return count;
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTrackerTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTrackerTableAccessor.java
new file mode 100644
index 00000000000..47ca5a8252b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTrackerTableAccessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestWALEventTrackerTableAccessor {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestWALEventTrackerTableAccessor.class);
+
+  /*
+   * Tests that rowkey is getting constructed correctly.
+   */
+  @Test
+  public void testRowKey() {
+    String rsName = "test-region-server";
+    String walName = "test-wal-0";
+    long timeStamp = EnvironmentEdgeManager.currentTime();
+    String walState = WALEventTrackerListener.WalState.ACTIVE.name();
+    long walLength = 100L;
+    WALEventTrackerPayload payload =
+      new WALEventTrackerPayload(rsName, walName, timeStamp, walState, walLength);
+    byte[] rowKeyBytes = WALEventTrackerTableAccessor.getRowKey(payload);
+
+    String rowKeyBytesStr = Bytes.toString(rowKeyBytes);
+    String[] fields = rowKeyBytesStr.split(WALEventTrackerTableAccessor.DELIMITER, -1);
+    // This is the format of rowkey: walName_timestamp_walState;
+    assertEquals(walName, fields[0]);
+    assertEquals(timeStamp, Long.valueOf(fields[1]).longValue());
+    assertEquals(walState, fields[2]);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWalEventTrackerQueueService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWalEventTrackerQueueService.java
new file mode 100644
index 00000000000..4fbb03b13ee
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWalEventTrackerQueueService.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(SmallTests.class)
+public class TestWalEventTrackerQueueService {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestWalEventTrackerQueueService.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  /*
+   * Test whether wal event tracker metrics are being incremented.
+   */
+  @Test
+  public void testMetrics() throws Exception {
+    String rsName = "test-region-server";
+    String walName = "test-wal-0";
+    long timeStamp = EnvironmentEdgeManager.currentTime();
+    String walState = WALEventTrackerListener.WalState.ACTIVE.name();
+    long walLength = 100L;
+    WALEventTrackerPayload payload =
+      new WALEventTrackerPayload(rsName, walName, timeStamp, walState, walLength);
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, true);
+    conf.setLong(WALEventTrackerTableAccessor.SLEEP_INTERVAL_KEY, 100);
+    MetricsWALEventTrackerSourceImpl source = new MetricsWALEventTrackerSourceImpl(
+      name.getMethodName(), name.getMethodName(), name.getMethodName(), name.getMethodName());
+    WALEventTrackerQueueService service = new WALEventTrackerQueueService(conf, source);
+    service.addToQueue(payload);
+    Connection mockConnection = mock(Connection.class);
+    doReturn(conf).when(mockConnection).getConfiguration();
+    // Always throw IOException whenever mock connection is being used.
+    doThrow(new IOException()).when(mockConnection).getTable(WAL_EVENT_TRACKER_TABLE_NAME);
+    assertEquals(0L, source.getFailedPuts());
+    assertEquals(0L, source.getNumRecordsFailedPuts());
+    // Persist all the events.
+    service.persistAll(mockConnection);
+    assertEquals(1L, source.getFailedPuts());
+    assertEquals(1L, source.getNumRecordsFailedPuts());
+    // Verify that we tried MAX_RETRY_ATTEMPTS retry attempts to persist.
+    verify(mockConnection, times(1 + WALEventTrackerTableAccessor.DEFAULT_MAX_ATTEMPTS))
+      .getTable(WAL_EVENT_TRACKER_TABLE_NAME);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALEdit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALEdit.java
new file mode 100644
index 00000000000..00de2118795
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALEdit.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY;
+import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class })
+public class TestWALEdit {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestWALEdit.class);
+
+  private static final String RS_NAME = "test-region-server-name";
+
+  /**
+   * Tests that
+   * {@link org.apache.hadoop.hbase.wal.WALEdit#createReplicationMarkerEdit(byte[], long)} method is
+   * creating WALEdit with correct family and qualifier.
+   */
+  @Test
+  public void testCreateReplicationMarkerEdit() {
+    long timestamp = EnvironmentEdgeManager.currentTime();
+
+    byte[] rowkey = ReplicationMarkerChore.getRowKey(RS_NAME, timestamp);
+    WALEdit edit = WALEdit.createReplicationMarkerEdit(rowkey, timestamp);
+    assertEquals(1, edit.getCells().size());
+    Cell cell = edit.getCells().get(0);
+    assertTrue(CellUtil.matchingFamily(cell, METAFAMILY));
+    assertTrue(CellUtil.matchingQualifier(cell, REPLICATION_MARKER));
+    assertTrue(WALEdit.isReplicationMarkerEdit(edit));
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationMarker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationMarker.java
new file mode 100644
index 00000000000..5a30bc71711
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationMarker.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN;
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY;
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN;
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN;
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN;
+import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
+import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test creates 2 mini hbase cluster. One cluster with
+ * "hbase.regionserver.replication.marker.enabled" conf key. This will create
+ * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore} which will create
+ * marker rows to be replicated to sink cluster. Second cluster with
+ * "hbase.regionserver.replication.sink.tracker.enabled" conf key enabled. This will persist the
+ * marker rows coming from peer cluster to persist to REPLICATION.SINK_TRACKER table.
+ **/
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationMarker {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationMarker.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationMarker.class);
+
+  private static Configuration conf1;
+  private static Configuration conf2;
+  private static HBaseTestingUtility utility1;
+  private static HBaseTestingUtility utility2;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf1 = HBaseConfiguration.create();
+    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+    conf2 = new Configuration(conf1);
+    // Run the replication marker chore in cluster1.
+    conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
+    conf1.setLong(REPLICATION_MARKER_CHORE_DURATION_KEY, 1000); // 1 sec
+    utility1 = new HBaseTestingUtility(conf1);
+
+    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+    // Enable the replication sink tracker for cluster 2
+    conf2.setBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, true);
+    utility2 = new HBaseTestingUtility(conf2);
+
+    // Start cluster 2 first so that hbase:replicationsinktracker table gets created first.
+    utility2.startMiniCluster(1);
+    waitForReplicationTrackerTableCreation();
+
+    // Start cluster1
+    utility1.startMiniCluster(1);
+    Admin admin1 = utility1.getAdmin();
+    ReplicationPeerConfigBuilder rpcBuilder = ReplicationPeerConfig.newBuilder();
+    rpcBuilder.setClusterKey(utility2.getClusterKey());
+    admin1.addReplicationPeer("1", rpcBuilder.build());
+
+    ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0)
+      .getReplicationSourceService().getReplicationManager();
+    // Wait until the peer gets established.
+    Waiter.waitFor(conf1, 10000, (Waiter.Predicate) () -> manager.getSources().size() == 1);
+  }
+
+  private static void waitForReplicationTrackerTableCreation() {
+    Waiter.waitFor(conf2, 10000, (Waiter.Predicate) () -> utility2.getAdmin()
+      .tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME));
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    utility1.shutdownMiniCluster();
+    utility2.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testReplicationMarkerRow() throws Exception {
+    // We have configured ReplicationTrackerChore to run every second. Sleeping so that it will
+    // create enough sentinel rows.
+    Thread.sleep(5000);
+    WAL wal1 = utility1.getHBaseCluster().getRegionServer(0).getWAL(null);
+    String walName1ForCluster1 = ((AbstractFSWAL) wal1).getCurrentFileName().getName();
+    String rs1Name = utility1.getHBaseCluster().getRegionServer(0).getServerName().getHostname();
+    // Since we sync the marker edits while appending to wal, all the edits should be visible
+    // to Replication threads immediately.
+    assertTrue(getReplicatedEntries() >= 5);
+    // Force log roll.
+    wal1.rollWriter(true);
+    String walName2ForCluster1 = ((AbstractFSWAL) wal1).getCurrentFileName().getName();
+    Connection connection2 = utility2.getMiniHBaseCluster().getRegionServer(0).getConnection();
+    // Sleep for 5 more seconds to get marker rows with new wal name.
+    Thread.sleep(5000);
+    // Wait for cluster 2 to have atleast 8 tracker rows from cluster1.
+    utility2.waitFor(5000, () -> getTableCount(connection2) >= 8);
+    // Get replication marker rows from cluster2
+    List<ReplicationSinkTrackerRow> list = getRows(connection2);
+    for (ReplicationSinkTrackerRow desc : list) {
+      // All the tracker rows should have same region server name i.e. rs of cluster1
+      assertEquals(rs1Name, desc.getRegionServerName());
+      // All the tracker rows will have either wal1 or wal2 name.
+      assertTrue(walName1ForCluster1.equals(desc.getWalName())
+        || walName2ForCluster1.equals(desc.getWalName()));
+    }
+
+    // This table shouldn't exist on cluster1 since
+    // hbase.regionserver.replication.sink.tracker.enabled is not enabled on this cluster.
+    assertFalse(utility1.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME));
+    // This table shouldn't exist on cluster1 since
+    // hbase.regionserver.replication.sink.tracker.enabled is enabled on this cluster.
+    assertTrue(utility2.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME));
+  }
+
+  /*
+   * Get rows for replication sink tracker table.
+   */
+  private List<ReplicationSinkTrackerRow> getRows(Connection connection) throws IOException {
+    List<ReplicationSinkTrackerRow> list = new ArrayList<>();
+    Scan scan = new Scan();
+    Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME);
+    ResultScanner scanner = table.getScanner(scan);
+
+    Result r;
+    while ((r = scanner.next()) != null) {
+      List<Cell> cells = r.listCells();
+      list.add(getPayload(cells));
+    }
+    return list;
+  }
+
+  private ReplicationSinkTrackerRow getPayload(List<Cell> cells) {
+    String rsName = null, walName = null;
+    Long offset = null;
+    long timestamp = 0L;
+    for (Cell cell : cells) {
+      byte[] qualifier = CellUtil.cloneQualifier(cell);
+      byte[] value = CellUtil.cloneValue(cell);
+
+      if (Bytes.equals(RS_COLUMN, qualifier)) {
+        rsName = Bytes.toString(value);
+      } else if (Bytes.equals(WAL_NAME_COLUMN, qualifier)) {
+        walName = Bytes.toString(value);
+      } else if (Bytes.equals(TIMESTAMP_COLUMN, qualifier)) {
+        timestamp = Bytes.toLong(value);
+      } else if (Bytes.equals(OFFSET_COLUMN, qualifier)) {
+        offset = Bytes.toLong(value);
+      }
+    }
+    ReplicationSinkTrackerRow row =
+      new ReplicationSinkTrackerRow(rsName, walName, timestamp, offset);
+    return row;
+  }
+
+  static class ReplicationSinkTrackerRow {
+    private String region_server_name;
+    private String wal_name;
+    private long timestamp;
+    private long offset;
+
+    public ReplicationSinkTrackerRow(String region_server_name, String wal_name, long timestamp,
+      long offset) {
+      this.region_server_name = region_server_name;
+      this.wal_name = wal_name;
+      this.timestamp = timestamp;
+      this.offset = offset;
+    }
+
+    public String getRegionServerName() {
+      return region_server_name;
+    }
+
+    public String getWalName() {
+      return wal_name;
+    }
+
+    public long getTimestamp() {
+      return timestamp;
+    }
+
+    public long getOffset() {
+      return offset;
+    }
+
+    @Override
+    public String toString() {
+      return "ReplicationSinkTrackerRow{" + "region_server_name='" + region_server_name + '\''
+        + ", wal_name='" + wal_name + '\'' + ", timestamp=" + timestamp + ", offset=" + offset
+        + '}';
+    }
+  }
+
+  private int getTableCount(Connection connection) throws Exception {
+    Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME);
+    ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM));
+    int count = 0;
+    while (resultScanner.next() != null) {
+      count++;
+    }
+    LOG.info("Table count: " + count);
+    return count;
+  }
+
+  /*
+   * Return replicated entries from cluster1.
+   */
+  private long getReplicatedEntries() {
+    ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0)
+      .getReplicationSourceService().getReplicationManager();
+    List<ReplicationSourceInterface> sources = manager.getSources();
+    assertEquals(1, sources.size());
+    ReplicationSource source = (ReplicationSource) sources.get(0);
+    return source.getTotalReplicatedEdits();
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index c14c589a919..c978a412a25 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hbase.wal;
 
+import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
+import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.getRowKey;
+import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY;
+import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -60,10 +64,12 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
+import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
 import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -485,6 +491,39 @@ public class TestWALSplit {
     assertEquals(11, countWAL(splitLog[0]));
   }
 
+  /*
+   * Tests that WalSplitter ignores replication marker edits.
+   */
+  @Test(timeout = 30000)
+  public void testSplitRemovesReplicationMarkerEdits() throws IOException {
+    RegionInfo regionInfo = ReplicationMarkerChore.REGION_INFO;
+    Path path = new Path(WALDIR, WAL_FILE_PREFIX + "1");
+    generateReplicationMarkerEdits(path, regionInfo);
+    useDifferentDFSClient();
+    List<FileStatus> logFiles =
+      SplitLogManager.getFileList(conf, Collections.singletonList(WALDIR), null);
+    assertEquals(1, logFiles.size());
+    assertEquals(path, logFiles.get(0).getPath());
+    List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+    // Make sure that WALSplitter doesn't fail.
+    assertEquals(0, splitPaths.size());
+  }
+
+  private void generateReplicationMarkerEdits(Path path, RegionInfo regionInfo) throws IOException {
+    long timestamp = EnvironmentEdgeManager.currentTime();
+    fs.mkdirs(WALDIR);
+    try (Writer writer = wals.createWALWriter(fs, path)) {
+      WALProtos.ReplicationMarkerDescriptor.Builder builder =
+        WALProtos.ReplicationMarkerDescriptor.newBuilder();
+      builder.setWalName("wal-name");
+      builder.setRegionServerName("rs-name");
+      builder.setOffset(0L);
+      WALProtos.ReplicationMarkerDescriptor desc = builder.build();
+      appendEntry(writer, REPLICATION_SINK_TRACKER_TABLE_NAME, regionInfo.getEncodedNameAsBytes(),
+        getRowKey(desc.getRegionServerName(), timestamp), METAFAMILY, REPLICATION_MARKER, VALUE, 1);
+    }
+  }
+
   /**
    * @param expectedEntries -1 to not assert
    * @return the count across all regions