You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/10/02 19:31:29 UTC

[hbase] branch HBASE-18070 updated: HBASE-25055 Add ReplicationSource for meta WALs; add enable/disable w… (#2451)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch HBASE-18070
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-18070 by this push:
     new a2762d1  HBASE-25055 Add ReplicationSource for meta WALs; add enable/disable w… (#2451)
a2762d1 is described below

commit a2762d17f686bcbc0fb2941988a5e2542010ec2f
Author: Michael Stack <sa...@users.noreply.github.com>
AuthorDate: Fri Oct 2 12:29:18 2020 -0700

    HBASE-25055 Add ReplicationSource for meta WALs; add enable/disable w… (#2451)
    
    * HBASE-25055 Add ReplicationSource for meta WALs; add enable/disable when hbase:meta assigned to RS
    
    Fill in gap left by HBASE-11183 'Timeline Consistent region replicas - Phase 2 design'.
    HBASE-11183 left off implementing 'async WAL Replication' on the hbase:meta
    Table; hbase:meta Table could only do Phase 1 Region Replicas reading
    the primary Regions' hfiles. Here we add 'async WAL Replication' to
    hbase:meta so Replicas can be more current with the primary's changes.
    
    Adds a 'special' ReplicationSource that reads hbase:meta WAL files and replicates
    all edits to the configured in-cluster endpoint (Defaults to the
    RegionReadReplicaEndpoint.class -- set hbase.region.replica.catalog.replication to
    target a different endpoint implementation).
    
    Set hbase.region.replica.replication.catalog.enabled to enable async WAL
    Replication for hbase:meta region replicas. Its off by default.
    
    The CatalogReplicationSource for async WAL Replication of hbase:meta does
    NOT need to keep up WAL offset or a queue of WALs-to-replicate in the
    replication queue store as is done in other ReplicationSource implementations;
    the CatalogReplicationSource is for Region Replicas only. General
    Replication does not replicate hbase:meta. hbase:meta Region Replicas reset
    on crash of the primary replica so there is no need to 'recover'
    replication that was running on the crashed server.
    
    Because it so different in operation, the CatalogReplicationSource is bolted
    on to the side of the ReplicationSourceManager. It is lazily
    instantiated to match the lazy instantiation of the hbase:meta
    WALProvider, created and started on the open of the first Region of an
    hbase:meta table. Thereafter it stays up till the process dies, even if
    all hbase:meta Regions have moved off the server, in case a hbase:meta
    Region is moved back (Doing this latter simplifies the implementation)
    
    hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
      Read configuration to see if we need to wait on setting a Region read-enabled
      (if so, replicas will only flip to enable read after confirming a
      flush of the primary so they for sure are a replica of a known point)
    
    hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
     If configured, on open of hbase:meta, ask the ReplicationSourceManager
     to add a ReplicationSource (if it hasn't already).
    
    hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
     Edit log message.
    
    hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
     If configured, on close of hbase:meta, update ReplicationSourceManager
     that a source Region has closed.
    
    hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
     javadoc and make constructor private.
    
    hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
     Add logPositionAndCleanOldLogs w/ default of the old behavior so
     CatalogReplicationSource can bypass updating store with WAL position,
     etc.
    
    hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
     Add creation and start of an CatalogReplicationSource.
    
    hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
     Go via ReplicationSource when calling logPostionAndCleanOldLogs so new RS can intercept.
    
    hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
     Javadoc.
    
    hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
     Add utility for reading configurations for hbase:meta region replicas.
    
    hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
     Javadoc.
    
    hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
     Use define.
    
    hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
     Specical version of ReplicationSource for Region Replicas on hbase:meta.
    
    hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
     Needs a special peer too (peers are baked into replication though we don't use 'peers' here)
    
    hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
    hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALProvider.java
     Tests.
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: Huaxiang Sun <hu...@apache.com>
---
 .../hadoop/hbase/master/MasterMetaBootstrap.java   |   3 +-
 .../hadoop/hbase/regionserver/HRegionServer.java   |   6 +-
 .../regionserver/handler/AssignRegionHandler.java  |  15 +-
 .../handler/RegionReplicaFlushHandler.java         |   6 +-
 .../handler/UnassignRegionHandler.java             |  12 +-
 ...Provider.java => CatalogReplicationSource.java} |  27 +-
 .../regionserver/CatalogReplicationSourcePeer.java |  50 ++++
 .../regionserver/NoopReplicationQueueStorage.java  | 140 +++++++++
 .../regionserver/RecoveredReplicationSource.java   |   2 +-
 .../RegionReplicaReplicationEndpoint.java          |   2 +-
 .../replication/regionserver/Replication.java      |   9 +-
 .../regionserver/ReplicationSource.java            |  25 +-
 .../regionserver/ReplicationSourceFactory.java     |   9 +-
 .../regionserver/ReplicationSourceInterface.java   |  22 +-
 .../regionserver/ReplicationSourceManager.java     | 110 ++++++-
 .../regionserver/ReplicationSourceShipper.java     |   4 +-
 .../regionserver/SerialReplicationChecker.java     |   3 +-
 .../regionserver/WALFileLengthProvider.java        |   3 +
 .../hadoop/hbase/util/ServerRegionReplicaUtil.java |  76 +++--
 .../org/apache/hadoop/hbase/wal/WALFactory.java    |  18 +-
 .../hbase/replication/ReplicationSourceDummy.java  |  10 +-
 .../TestMetaRegionReplicaReplicationEndpoint.java  | 320 +++++++++++++++++++++
 .../TestRegionReplicaReplicationEndpoint.java      |  12 +-
 .../regionserver/TestSerialReplicationChecker.java |   2 +-
 24 files changed, 778 insertions(+), 108 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
index c676df8..3e139b9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
@@ -73,8 +73,7 @@ class MasterMetaBootstrap {
         LOG.info(hri.getRegionNameAsString() +
           " old location is same as current hbase:meta location; setting location as null...");
       }
-      // These assigns run inline. All is blocked till they complete. Only interrupt is shutting
-      // down hosting server which calls AM#stop.
+      // Assigns are done asynchronously. See HBASE-24562.
       if (metaState != null && metaState.getServerName() != null) {
         // Try to retain old assignment.
         assignmentManager.assignAsync(hri, metaState.getServerName());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 8abede5..1293a75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2455,9 +2455,9 @@ public class HRegionServer extends Thread implements
     if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
       return;
     }
-    if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
-        !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
-          region.conf)) {
+    TableName tn = region.getTableDescriptor().getTableName();
+    if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) ||
+        !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf)) {
       region.setReadsEnabled(true);
       return;
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
index 98d09b2..4ee6efc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
@@ -20,8 +20,11 @@ package org.apache.hadoop.hbase.regionserver.handler;
 import edu.umd.cs.findbugs.annotations.Nullable;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
@@ -31,6 +34,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
 import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -129,8 +133,15 @@ public class AssignRegionHandler extends EventHandler {
       }
       // pass null for the last parameter, which used to be a CancelableProgressable, as now the
       // opening can not be interrupted by a close request any more.
-      region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), rs.getConfiguration(),
-        rs, null);
+      Configuration conf = rs.getConfiguration();
+      TableName tn = htd.getTableName();
+      if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(conf, tn)) {
+        if (RegionReplicaUtil.isDefaultReplica(this.regionInfo.getReplicaId())) {
+          // Add the hbase:meta replication source on replica zero/default.
+          rs.getReplicationSourceService().getReplicationManager().addCatalogReplicationSource();
+        }
+      }
+      region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), conf, rs, null);
     } catch (IOException e) {
       cleanUpAndReportFailure(e);
       return;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
index dddf553..829d0bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
@@ -100,9 +100,9 @@ public class RegionReplicaFlushHandler extends EventHandler {
     RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create();
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("RPC'ing to primary region replica " +
-        ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + " from " +
-        region.getRegionInfo() + " to trigger FLUSH");
+      LOG.debug("RPC'ing to primary " + ServerRegionReplicaUtil.
+          getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionNameAsString() +
+        " from " + region.getRegionInfo().getRegionNameAsString() + " to trigger FLUSH");
     }
     while (!region.isClosing() && !region.isClosed()
         && !server.isAborted() && !server.isStopped()) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
index 0bf2543a..1ed74bb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,12 +115,20 @@ public class UnassignRegionHandler extends EventHandler {
     if (region.close(abort) == null) {
       // XXX: Is this still possible? The old comment says about split, but now split is done at
       // master side, so...
-      LOG.warn("Can't close {} already closed during close()", regionName);
+      LOG.warn("Can't close {}, already closed during close()", regionName);
       rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
       return;
     }
 
     rs.removeRegion(region, destination);
+    if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(rs.getConfiguration(),
+        region.getTableDescriptor().getTableName())) {
+      if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo().getReplicaId())) {
+        // If hbase:meta read replicas enabled, remove replication source for hbase:meta Regions.
+        // See assign region handler where we add the replication source on open.
+        rs.getReplicationSourceService().getReplicationManager().removeCatalogReplicationSource();
+      }
+    }
     if (!rs.reportRegionStateTransition(
       new RegionStateTransitionContext(TransitionCode.CLOSED, HConstants.NO_SEQNUM, closeProcId,
         -1, region.getRegionInfo()))) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
similarity index 50%
copy from hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
index 010fa69..f36514d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,18 +17,27 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import java.util.OptionalLong;
-
-import org.apache.hadoop.fs.Path;
+import java.util.Collections;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * Used by replication to prevent replicating unacked log entries. See
- * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
+ * ReplicationSource that reads catalog WAL files -- e.g. hbase:meta WAL files -- and lets through
+ * all WALEdits from these WALs. This ReplicationSource is NOT created via
+ * {@link ReplicationSourceFactory}.
  */
 @InterfaceAudience.Private
-@FunctionalInterface
-public interface WALFileLengthProvider {
+class CatalogReplicationSource extends ReplicationSource {
+  CatalogReplicationSource() {
+    // Filters in hbase:meta WAL files and allows all edits, including 'meta' edits (these are
+    // filtered out in the 'super' class default implementation).
+    super(p -> AbstractFSWALProvider.isMetaFile(p), Collections.emptyList());
+  }
 
-  OptionalLong getLogFileSizeIfBeingWritten(Path path);
+  @Override
+  public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
+    // Noop. This implementation does not persist state to backing storage nor does it keep its
+    // WALs in a general map up in ReplicationSourceManager so just skip calling through to the
+    // default implemenentation.
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
new file mode 100644
index 0000000..cb00ac2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The 'peer' used internally by Catalog Region Replicas Replication Source.
+ * The Replication system has 'peer' baked into its core so though we do not need 'peering', we
+ * need a 'peer' and its configuration else the replication system breaks at a few locales.
+ * Set "hbase.region.replica.catalog.replication" if you want to change the configured endpoint.
+ */
+@InterfaceAudience.Private
+class CatalogReplicationSourcePeer extends ReplicationPeerImpl {
+  /**
+   * @param clusterKey Usually the UUID from zk passed in by caller as a String.
+   */
+  CatalogReplicationSourcePeer(Configuration configuration, String clusterKey, String peerId) {
+    super(configuration, ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER + "_catalog",
+      ReplicationPeerConfig.newBuilder().
+        setClusterKey(clusterKey).
+        setReplicationEndpointImpl(
+          configuration.get("hbase.region.replica.catalog.replication",
+            RegionReplicaReplicationEndpoint.class.getName())).
+        setBandwidth(0). // '0' means no bandwidth.
+        setSerial(false).
+        build(),
+      true, SyncReplicationState.NONE, SyncReplicationState.NONE);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/NoopReplicationQueueStorage.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/NoopReplicationQueueStorage.java
new file mode 100644
index 0000000..4ad41fc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/NoopReplicationQueueStorage.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Noop queue storage -- does nothing.
+ */
+@InterfaceAudience.Private
+class NoopReplicationQueueStorage implements ReplicationQueueStorage {
+  NoopReplicationQueueStorage() {}
+
+  @Override
+  public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {}
+
+  @Override
+  public void addWAL(ServerName serverName, String queueId, String fileName)
+    throws ReplicationException {}
+
+  @Override
+  public void removeWAL(ServerName serverName, String queueId, String fileName)
+    throws ReplicationException { }
+
+  @Override
+  public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
+    Map<String, Long> lastSeqIds) throws ReplicationException {}
+
+  @Override
+  public long getLastSequenceId(String encodedRegionName, String peerId)
+    throws ReplicationException {
+    return 0;
+  }
+
+  @Override
+  public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
+    throws ReplicationException {}
+
+  @Override
+  public void removeLastSequenceIds(String peerId) throws ReplicationException {}
+
+  @Override
+  public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
+    throws ReplicationException {}
+
+  @Override
+  public long getWALPosition(ServerName serverName, String queueId, String fileName)
+    throws ReplicationException {
+    return 0;
+  }
+
+  @Override
+  public List<String> getWALsInQueue(ServerName serverName, String queueId)
+      throws ReplicationException {
+    return Collections.EMPTY_LIST;
+  }
+
+  @Override
+  public List<String> getAllQueues(ServerName serverName) throws ReplicationException {
+    return Collections.EMPTY_LIST;
+  }
+
+  @Override
+  public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
+    ServerName destServerName) throws ReplicationException {
+    return null;
+  }
+
+  @Override
+  public void removeReplicatorIfQueueIsEmpty(ServerName serverName)
+    throws ReplicationException {}
+
+  @Override
+  public List<ServerName> getListOfReplicators() throws ReplicationException {
+    return Collections.EMPTY_LIST;
+  }
+
+  @Override
+  public Set<String> getAllWALs() throws ReplicationException {
+    return Collections.EMPTY_SET;
+  }
+
+  @Override
+  public void addPeerToHFileRefs(String peerId) throws ReplicationException {}
+
+  @Override
+  public void removePeerFromHFileRefs(String peerId) throws ReplicationException {}
+
+  @Override
+  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
+    throws ReplicationException {}
+
+  @Override
+  public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException {}
+
+  @Override
+  public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
+    return Collections.EMPTY_LIST;
+  }
+
+  @Override
+  public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
+    return Collections.EMPTY_LIST;
+  }
+
+  @Override
+  public Set<String> getAllHFileRefs() throws ReplicationException {
+    return Collections.EMPTY_SET;
+  }
+
+  @Override
+  public String getRsNode(ServerName serverName) {
+    return null;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 00aa026..aadce39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index 6e09077..70a9280 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -368,7 +368,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
         ctx.getMetrics().incrLogEditsFiltered(skippedEdits);
         return true;
       } else {
-        LOG.warn("Failed to replicate all entris, retry={}", retryCounter.getAttemptTimes());
+        LOG.warn("Failed to replicate all entries, retry={}", retryCounter.getAttemptTimes());
         if (!retryCounter.shouldRetry()) {
           return false;
         }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index d8a696c..0f0ac6d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.OptionalLong;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -51,9 +50,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 
 /**
@@ -129,14 +126,14 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
     SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
     this.globalMetricsSource = CompatibilitySingletonFactory
         .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
-    WALProvider walProvider = walFactory.getWALProvider();
     this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
-        replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
-        walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
+        replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walFactory,
         mapping, globalMetricsSource);
     this.syncReplicationPeerInfoProvider =
         new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
     PeerActionListener peerActionListener = PeerActionListener.DUMMY;
+    // Get the user-space WAL provider
+    WALProvider walProvider = walFactory != null? walFactory.getWALProvider(): null;
     if (walProvider != null) {
       walProvider
         .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index cb9a14d..11d6679 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -66,10 +66,10 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
 /**
@@ -222,6 +222,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
     this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
 
+    // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
     defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
     currentBandwidth = getCurrentBandwidth();
     this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
@@ -372,10 +373,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
   private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
     workerThreads.compute(walGroupId, (key, value) -> {
       if (value != null) {
-        LOG.debug("{} preempted start of worker walGroupId={}", logPeerId(), walGroupId);
+        LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
         return value;
       } else {
-       LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId);
+        LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
         ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
         ReplicationSourceWALReader walReader =
             createNewWALReader(walGroupId, queue, worker.getStartPosition());
@@ -520,7 +521,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
 
   private long getCurrentBandwidth() {
     long peerBandwidth = replicationPeer.getPeerBandwidth();
-    // user can set peer bandwidth to 0 to use default bandwidth
+    // User can set peer bandwidth to 0 to use default bandwidth.
     return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
   }
 
@@ -603,11 +604,11 @@ public class ReplicationSource implements ReplicationSourceInterface {
       this.startupOngoing.set(false);
       throw new IllegalStateException("Source should be active.");
     }
-    LOG.info("{} queueId={} is replicating from cluster={} to cluster={}",
-      logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
-
+    LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}",
+      logPeerId(), this.replicationQueueInfo.getQueueId(), this.queues.size(), clusterId,
+      peerClusterId);
     initializeWALEntryFilter(peerClusterId);
-    // start workers
+    // Start workers
     for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
       String walGroupId = entry.getKey();
       PriorityBlockingQueue<Path> queue = entry.getValue();
@@ -617,9 +618,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
   }
 
   @Override
-  public void startup() {
+  public ReplicationSourceInterface startup() {
     if (this.sourceRunning) {
-      return;
+      return this;
     }
     this.sourceRunning = true;
     //Flag that signalizes uncaught error happening while starting up the source
@@ -640,6 +641,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
           });
       }
     } while (this.startupOngoing.get() && !this.abortOnError);
+    return this;
   }
 
   @Override
@@ -843,7 +845,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
     return server;
   }
 
-  ReplicationQueueStorage getQueueStorage() {
+  @Override
+  public ReplicationQueueStorage getReplicationQueueStorage() {
     return queueStorage;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index d613049..8863f14 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -19,19 +19,22 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 
 /**
  * Constructs a {@link ReplicationSourceInterface}
+ * Note, not used to create specialized ReplicationSources
+ * @see CatalogReplicationSource
  */
 @InterfaceAudience.Private
-public class ReplicationSourceFactory {
-
+public final class ReplicationSourceFactory {
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceFactory.class);
 
+  private ReplicationSourceFactory() {}
+
   static ReplicationSourceInterface create(Configuration conf, String queueId) {
     ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
     boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 0bd90cf..27e4b79 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,7 +42,6 @@ import org.apache.yetus.audience.InterfaceAudience;
  */
 @InterfaceAudience.Private
 public interface ReplicationSourceInterface {
-
   /**
    * Initializer for the source
    * @param conf the configuration to use
@@ -76,7 +74,7 @@ public interface ReplicationSourceInterface {
   /**
    * Start the replication
    */
-  void startup();
+  ReplicationSourceInterface startup();
 
   /**
    * End the replication
@@ -174,7 +172,6 @@ public interface ReplicationSourceInterface {
   /**
    * Try to throttle when the peer config with a bandwidth
    * @param batchSize entries size will be pushed
-   * @throws InterruptedException
    */
   void tryThrottle(int batchSize) throws InterruptedException;
 
@@ -206,4 +203,21 @@ public interface ReplicationSourceInterface {
   default boolean isRecovered() {
     return false;
   }
+
+  /**
+   * @return The instance of queueStorage used by this ReplicationSource.
+   */
+  ReplicationQueueStorage getReplicationQueueStorage();
+
+  /**
+   * Log the current position to storage. Also clean old logs from the replication queue.
+   * Use to bypass the default call to
+   * {@link ReplicationSourceManager#logPositionAndCleanOldLogs(ReplicationSourceInterface,
+   * WALEntryBatch)} whem implementation does not need to persist state to backing storage.
+   * @param entryBatch the wal entry batch we just shipped
+   * @return The instance of queueStorage used by this ReplicationSource.
+   */
+  default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
+    getSourceManager().logPositionAndCleanOldLogs(this, entryBatch);
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 0940b5a..8527f96 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -40,6 +41,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -49,6 +51,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationListener;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
@@ -62,13 +65,15 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -126,7 +131,15 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final ConcurrentMap<String, ReplicationSourceInterface> sources;
   // List of all the sources we got from died RSs
   private final List<ReplicationSourceInterface> oldsources;
+
+  /**
+   * Storage for queues that need persistance; e.g. Replication state so can be recovered
+   * after a crash. queueStorage upkeep is spread about this class and passed
+   * to ReplicationSource instances for these to do updates themselves. Not all ReplicationSource
+   * instances keep state.
+   */
   private final ReplicationQueueStorage queueStorage;
+
   private final ReplicationTracker replicationTracker;
   private final ReplicationPeers replicationPeers;
   // UUID for this cluster
@@ -153,7 +166,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final Path logDir;
   // Path to the wal archive
   private final Path oldLogDir;
-  private final WALFileLengthProvider walFileLengthProvider;
+  private final WALFactory walFactory;
   // The number of ms that we wait before moving znodes, HBASE-3596
   private final long sleepBeforeFailover;
   // Homemade executer service for replication
@@ -174,21 +187,29 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final MetricsReplicationGlobalSourceSource globalMetrics;
 
   /**
+   * A special ReplicationSource for hbase:meta Region Read Replicas.
+   * Usually this reference remains empty. If an hbase:meta Region is opened on this server, we
+   * will create an instance of a hbase:meta CatalogReplicationSource and it will live the life of
+   * the Server thereafter; i.e. we will not shut it down even if the hbase:meta moves away from
+   * this server (in case it later gets moved back). We synchronize on this instance testing for
+   * presence and if absent, while creating so only created and started once.
+   */
+  @VisibleForTesting
+  AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>();
+
+  /**
    * Creates a replication manager and sets the watch on all the other registered region servers
    * @param queueStorage the interface for manipulating replication queues
-   * @param replicationPeers
-   * @param replicationTracker
    * @param conf the configuration to use
    * @param server the server for this region server
    * @param fs the file system to use
    * @param logDir the directory that contains all wal directories of live RSs
    * @param oldLogDir the directory where old logs are archived
-   * @param clusterId
    */
   public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
       ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
       Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
-      WALFileLengthProvider walFileLengthProvider,
+      WALFactory walFactory,
       SyncReplicationPeerMappingManager syncReplicationPeerMappingManager,
       MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
     this.sources = new ConcurrentHashMap<>();
@@ -206,7 +227,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     // 30 seconds
     this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000);
     this.clusterId = clusterId;
-    this.walFileLengthProvider = walFileLengthProvider;
+    this.walFactory = walFactory;
     this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager;
     this.replicationTracker.registerListener(this);
     // It's preferable to failover 1 RS at a time, but with good zk servers
@@ -346,18 +367,21 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
-   * Factory method to create a replication source
-   * @param queueId the id of the replication queue
-   * @return the created source
+   * @return a new 'classic' user-space replication source.
+   * @param queueId the id of the replication queue to associate the ReplicationSource with.
+   * @see #createCatalogReplicationSource() for creating a ReplicationSource for hbase:meta.
    */
   private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
       throws IOException {
     ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
-
-    MetricsSource metrics = new MetricsSource(queueId);
-    // init replication source
+    // Init the just created replication source. Pass the default walProvider's wal file length
+    // provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica
+    // replication, see #createCatalogReplicationSource().
+    WALFileLengthProvider walFileLengthProvider =
+      this.walFactory.getWALProvider() != null?
+        this.walFactory.getWALProvider().getWALFileLengthProvider() : p -> OptionalLong.empty();
     src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
-      walFileLengthProvider, metrics);
+      walFileLengthProvider, new MetricsSource(queueId));
     return src;
   }
 
@@ -1154,4 +1178,60 @@ public class ReplicationSourceManager implements ReplicationListener {
   MetricsReplicationGlobalSourceSource getGlobalMetrics() {
     return this.globalMetrics;
   }
+
+  /**
+   * Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region.
+   * @see #removeCatalogReplicationSource()
+   */
+  public ReplicationSourceInterface addCatalogReplicationSource() throws IOException {
+    // Open/Create the hbase:meta ReplicationSource once only.
+    synchronized (this.catalogReplicationSource) {
+      ReplicationSourceInterface rs = this.catalogReplicationSource.get();
+      return rs != null ? rs :
+        this.catalogReplicationSource.getAndSet(createCatalogReplicationSource());
+    }
+  }
+
+  /**
+   * Remove the hbase:meta Catalog replication source.
+   * Called when we close hbase:meta.
+   * @see #addCatalogReplicationSource()
+   */
+  public void removeCatalogReplicationSource() {
+    // Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region
+    // comes back to this server.
+  }
+
+  /**
+   * Create, initialize, and start the Catalog ReplicationSource.
+   */
+  private ReplicationSourceInterface createCatalogReplicationSource() throws IOException {
+    // Has the hbase:meta WALProvider been instantiated?
+    WALProvider walProvider = this.walFactory.getMetaWALProvider();
+    boolean addListener = false;
+    if (walProvider == null) {
+      // The meta walProvider has not been instantiated. Create it.
+      walProvider = this.walFactory.getMetaProvider();
+      addListener = true;
+    }
+    CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf,
+      this.clusterId.toString(), "meta_" + ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
+    final ReplicationSourceInterface crs = new CatalogReplicationSource();
+    crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
+      clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
+    if (addListener) {
+      walProvider.addWALActionsListener(new WALActionsListener() {
+        @Override
+        public void postLogRoll(Path oldPath, Path newPath) throws IOException {
+          crs.enqueueLog(newPath);
+        }
+      });
+    } else {
+      // This is a problem. We'll have a ReplicationSource but no listener on hbase:meta WALs
+      // so nothing will be replicated.
+      LOG.error("Did not install WALActionsListener creating CatalogReplicationSource!");
+    }
+    // Start this ReplicationSource.
+    return crs.startup();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 45eb91c..d3af995 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
 import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
 
@@ -271,7 +269,7 @@ public class ReplicationSourceShipper extends Thread {
     // position and the file will be removed soon in cleanOldLogs.
     if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
       batch.getLastWalPosition() != currentPosition) {
-      source.getSourceManager().logPositionAndCleanOldLogs(source, batch);
+      source.logPositionAndCleanOldLogs(batch);
       updated = true;
     }
     // if end of file is true, then we can just skip to the next file in queue.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
index 6b3c34a..fdc1e54 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
 import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
@@ -140,7 +139,7 @@ class SerialReplicationChecker {
 
   public SerialReplicationChecker(Configuration conf, ReplicationSource source) {
     this.peerId = source.getPeerId();
-    this.storage = source.getQueueStorage();
+    this.storage = source.getReplicationQueueStorage();
     this.conn = source.getServer().getConnection();
     this.waitTimeMs =
       conf.getLong(REPLICATION_SERIALLY_WAITING_KEY, REPLICATION_SERIALLY_WAITING_DEFAULT);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
index 010fa69..c60faa9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
@@ -25,6 +25,9 @@ import org.apache.yetus.audience.InterfaceAudience;
 /**
  * Used by replication to prevent replicating unacked log entries. See
  * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
+ * WALFileLengthProvider exists because we do not want to reference WALFactory and WALProvider
+ * directly in the replication code so in the future it will be easier to decouple them.
+ * Each walProvider will have its own implementation.
  */
 @InterfaceAudience.Private
 @FunctionalInterface
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index b83749d..4ee9587 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -58,7 +59,15 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
   public static final String REGION_REPLICA_REPLICATION_CONF_KEY
     = "hbase.region.replica.replication.enabled";
   private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false;
-  private static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
+  public static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
+
+  /**
+   * Same as for {@link #REGION_REPLICA_REPLICATION_CONF_KEY} but for catalog replication.
+   */
+  public static final String REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY
+    = "hbase.region.replica.replication.catalog.enabled";
+  private static final boolean DEFAULT_REGION_REPLICA_REPLICATION_CATALOG = false;
+
 
   /**
    * Enables or disables refreshing store files of secondary region replicas when the memory is
@@ -117,7 +126,6 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
    * files of the primary region, so an HFileLink is used to construct the StoreFileInfo. This
    * way ensures that the secondary will be able to continue reading the store files even if
    * they are moved to archive after compaction
-   * @throws IOException
    */
   public static StoreFileInfo getStoreFileInfo(Configuration conf, FileSystem fs,
       RegionInfo regionInfo, RegionInfo regionInfoForFs, String familyName, Path path)
@@ -154,47 +162,68 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
   }
 
   /**
-   * Create replication peer for replicating to region replicas if needed.
+   * Create replication peer for replicating user-space Region Read Replicas.
    * @param conf configuration to use
-   * @throws IOException
    */
   public static void setupRegionReplicaReplication(Configuration conf) throws IOException {
     if (!isRegionReplicaReplicationEnabled(conf)) {
       return;
     }
-
+    String peerId = REGION_REPLICA_REPLICATION_PEER;
     try (Connection connection = ConnectionFactory.createConnection(conf);
       Admin admin = connection.getAdmin()) {
       ReplicationPeerConfig peerConfig = null;
       try {
-        peerConfig = admin.getReplicationPeerConfig(REGION_REPLICA_REPLICATION_PEER);
+        peerConfig = admin.getReplicationPeerConfig(peerId);
       } catch (ReplicationPeerNotFoundException e) {
-        LOG.warn(
-          "Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER + " not exist",
-          e);
+        LOG.warn("Region replica peer id={} does not exist", peerId, e);
       }
-
       if (peerConfig == null) {
-        LOG.info("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER
-          + " not exist. Creating...");
-        peerConfig = new ReplicationPeerConfig();
-        peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf));
-        peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName());
-        admin.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig);
+        LOG.info("Region Read Replica peerId={} does not exist; creating...", peerId);
+        peerConfig = ReplicationPeerConfig.newBuilder().
+          setClusterKey(ZKConfig.getZooKeeperClusterKey(conf)).
+          setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()).build();
+        admin.addReplicationPeer(peerId, peerConfig);
       }
     }
   }
 
-  public static boolean isRegionReplicaReplicationEnabled(Configuration conf) {
-    return conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY,
-      DEFAULT_REGION_REPLICA_REPLICATION);
+  /**
+   * @return True if Region Read Replica is enabled for <code>tn</code> (whether hbase:meta or
+   *   user-space tables).
+   */
+  public static boolean isRegionReplicaReplicationEnabled(Configuration conf, TableName tn) {
+    return isMetaRegionReplicaReplicationEnabled(conf, tn) ||
+      isRegionReplicaReplicationEnabled(conf);
+  }
+
+  /**
+   * @return True if Region Read Replica is enabled for user-space tables.
+   */
+  private static boolean isRegionReplicaReplicationEnabled(Configuration conf) {
+    return conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY, DEFAULT_REGION_REPLICA_REPLICATION);
   }
 
+  /**
+   * @return True if hbase:meta Region Read Replica is enabled.
+   */
+  public static boolean isMetaRegionReplicaReplicationEnabled(Configuration conf, TableName tn) {
+    return TableName.isMetaTableName(tn) &&
+      conf.getBoolean(REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY,
+        DEFAULT_REGION_REPLICA_REPLICATION_CATALOG);
+  }
+
+  /**
+   * @return True if wait for primary to flush is enabled for user-space tables.
+   */
   public static boolean isRegionReplicaWaitForPrimaryFlushEnabled(Configuration conf) {
     return conf.getBoolean(REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY,
       DEFAULT_REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH);
   }
 
+  /**
+   * @return True if we are to refresh user-space hfiles in Region Read Replicas.
+   */
   public static boolean isRegionReplicaStoreFileRefreshEnabled(Configuration conf) {
     return conf.getBoolean(REGION_REPLICA_STORE_FILE_REFRESH,
       DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH);
@@ -205,11 +234,4 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
       DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER);
   }
 
-  /**
-   * Return the peer id used for replicating to secondary region replicas
-   */
-  public static String getReplicationPeerId() {
-    return REGION_REPLICA_REPLICATION_PEER;
-  }
-
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 26b8727..b530d13 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -66,7 +65,7 @@ public class WALFactory {
   /**
    * Maps between configuration names for providers and implementation classes.
    */
-  static enum Providers {
+  enum Providers {
     defaultProvider(AsyncFSWALProvider.class),
     filesystem(FSHLogProvider.class),
     multiwal(RegionGroupingProvider.class),
@@ -255,8 +254,12 @@ public class WALFactory {
     return provider.getWALs();
   }
 
-  @VisibleForTesting
-  WALProvider getMetaProvider() throws IOException {
+  /**
+   * Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of
+   * creating the first hbase:meta WAL so we can register a listener.
+   * @see #getMetaWALProvider()
+   */
+  public WALProvider getMetaProvider() throws IOException {
     for (;;) {
       WALProvider provider = this.metaProvider.get();
       if (provider != null) {
@@ -307,7 +310,6 @@ public class WALFactory {
    * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method
    * then just seek back to the last known good position.
    * @return A WAL reader.  Close when done with it.
-   * @throws IOException
    */
   public Reader createReader(final FileSystem fs, final Path path,
       CancelableProgressable reporter) throws IOException {
@@ -486,6 +488,10 @@ public class WALFactory {
     return this.provider;
   }
 
+  /**
+   * @return Current metaProvider... may be null if not yet initialized.
+   * @see #getMetaProvider()
+   */
   public final WALProvider getMetaWALProvider() {
     return this.metaProvider.get();
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index a361c44..cab01d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -71,8 +71,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   }
 
   @Override
-  public void startup() {
+  public ReplicationSourceInterface startup() {
     startup.set(true);
+    return this;
   }
 
   public boolean isStartup() {
@@ -163,6 +164,11 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   }
 
   @Override
+  public ReplicationQueueStorage getReplicationQueueStorage() {
+    return null;
+  }
+
+  @Override
   public ReplicationPeer getPeer() {
     return replicationPeer;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
new file mode 100644
index 0000000..3bf0b9a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ClientMetaTableAccessor;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests RegionReplicaReplicationEndpoint class for hbase:meta by setting up region replicas and
+ * verifying async wal replication replays the edits to the secondary region in various scenarios.
+ * @see TestRegionReplicaReplicationEndpoint
+ */
+@Category({LargeTests.class})
+public class TestMetaRegionReplicaReplicationEndpoint {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class);
+  private static final int NB_SERVERS = 3;
+  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Before
+  public void before() throws Exception {
+    Configuration conf = HTU.getConfiguration();
+    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
+    conf.setInt("replication.source.size.capacity", 10240);
+    conf.setLong("replication.source.sleepforretries", 100);
+    conf.setInt("hbase.regionserver.maxlogs", 10);
+    conf.setLong("hbase.master.logcleaner.ttl", 10);
+    conf.setInt("zookeeper.recovery.retry", 1);
+    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
+    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+    conf.setInt("replication.stats.thread.period.seconds", 5);
+    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
+    // Enable hbase:meta replication.
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
+    // Set hbase:meta replicas to be 3.
+    conf.setInt(HConstants.META_REPLICAS_NUM, NB_SERVERS);
+    HTU.startMiniCluster(NB_SERVERS);
+    HTU.waitFor(30000,
+      () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size() >= NB_SERVERS);
+  }
+
+  @After
+  public void after() throws Exception {
+    HTU.shutdownMiniCluster();
+  }
+
+  /**
+   * Assert that the ReplicationSource for hbase:meta gets created when hbase:meta is opened.
+   */
+  @Test
+  public void testHBaseMetaReplicationSourceCreatedOnOpen()
+    throws IOException, InterruptedException {
+    MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();
+    HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
+    assertTrue(isMetaRegionReplicaReplicationSource(hrs));
+    // Now move the hbase:meta and make sure the ReplicationSoruce is in both places.
+    HRegionServer hrsOther = null;
+    for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
+      hrsOther = cluster.getRegionServer(i);
+      if (hrsOther.getServerName().equals(hrs.getServerName())) {
+        hrsOther = null;
+        continue;
+      }
+      break;
+    }
+    assertNotNull(hrsOther);
+    assertFalse(isMetaRegionReplicaReplicationSource(hrsOther));
+    Region meta = null;
+    for (Region region: hrs.getOnlineRegionsLocalContext()) {
+      if (region.getRegionInfo().isMetaRegion()) {
+        meta = region;
+        break;
+      }
+    }
+    assertNotNull(meta);
+    HTU.moveRegionAndWait(meta.getRegionInfo(), hrsOther.getServerName());
+    // Assert that there is a ReplicationSource in both places now.
+    assertTrue(isMetaRegionReplicaReplicationSource(hrs));
+    assertTrue(isMetaRegionReplicaReplicationSource(hrsOther));
+  }
+
+  /**
+   * @return Whether the special meta region replica peer is enabled on <code>hrs</code>
+   */
+  private boolean isMetaRegionReplicaReplicationSource(HRegionServer hrs) {
+    return hrs.getReplicationSourceService().getReplicationManager().
+      catalogReplicationSource.get() != null;
+  }
+
+  /**
+   * Test meta region replica replication. Create some tables and see if replicas pick up the
+   * additions.
+   */
+  @Test
+  public void testHBaseMetaReplicates() throws Exception {
+    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"),
+      HConstants.CATALOG_FAMILY,
+        Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length)))  {
+      verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
+    }
+    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_1"),
+      HConstants.CATALOG_FAMILY,
+      Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length)))  {
+      verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
+      // Try delete.
+      HTU.deleteTableIfAny(table.getName());
+      verifyDeletedReplication(TableName.META_TABLE_NAME, NB_SERVERS, table.getName());
+    }
+  }
+
+  /**
+   * Replicas come online after primary.
+   */
+  private void waitForMetaReplicasToOnline() throws IOException {
+    final RegionLocator regionLocator =
+      HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME);
+    HTU.waitFor(10000,
+      // getRegionLocations returns an entry for each replica but if unassigned, entry is null.
+      // Pass reload to force us to skip cache else it just keeps returning default.
+      () -> regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW, true).stream().
+        filter(Objects::nonNull).count() >= NB_SERVERS);
+    List<HRegionLocation> locations = regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW);
+    LOG.info("Found locations {}", locations);
+    assertEquals(NB_SERVERS, locations.size());
+  }
+
+  /**
+   * Scan hbase:meta for <code>tableName</code> content.
+   */
+  private List<Result> getMetaCells(TableName tableName) throws IOException {
+    final List<Result> results = new ArrayList<>();
+    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
+      @Override public boolean visit(Result r) throws IOException {
+        results.add(r);
+        return true;
+      }
+    };
+    MetaTableAccessor.scanMetaForTableRegions(HTU.getConnection(), visitor, tableName);
+    return results;
+  }
+
+  /**
+   * @return All Regions for tableName including Replicas.
+   */
+  private Region [] getAllRegions(TableName tableName, int replication) {
+    final Region[] regions = new Region[replication];
+    for (int i = 0; i < NB_SERVERS; i++) {
+      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+      List<HRegion> onlineRegions = rs.getRegions(tableName);
+      for (HRegion region : onlineRegions) {
+        regions[region.getRegionInfo().getReplicaId()] = region;
+      }
+    }
+    for (Region region : regions) {
+      assertNotNull(region);
+    }
+    return regions;
+  }
+
+  /**
+   * Verify when a Table is deleted from primary, then there are no references in replicas
+   * (because they get the delete of the table rows too).
+   */
+  private void verifyDeletedReplication(TableName tableName, int regionReplication,
+      final TableName deletedTableName) {
+    final Region[] regions = getAllRegions(tableName, regionReplication);
+
+    // Start count at '1' so we skip default, primary replica and only look at secondaries.
+    for (int i = 1; i < regionReplication; i++) {
+      final Region region = regions[i];
+      // wait until all the data is replicated to all secondary regions
+      Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          LOG.info("Verifying replication for region replica {}", region.getRegionInfo());
+          try (RegionScanner rs = region.getScanner(new Scan())) {
+            List<Cell> cells = new ArrayList<>();
+            while (rs.next(cells)) {
+              continue;
+            }
+            return doesNotContain(cells, deletedTableName);
+          } catch(Throwable ex) {
+            LOG.warn("Verification from secondary region is not complete yet", ex);
+            // still wait
+            return false;
+          }
+        }
+      });
+    }
+  }
+
+  /**
+   * Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed
+   * by HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
+   * <code>cells</code>.
+   */
+  private boolean doesNotContain(List<Cell> cells, TableName tableName) {
+    for (Cell cell: cells) {
+      String row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+      if (row.startsWith(tableName.toString() + HConstants.DELIMITER)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Verify Replicas have results (exactly).
+   */
+  private void verifyReplication(TableName tableName, int regionReplication,
+      List<Result> contains) {
+    final Region[] regions = getAllRegions(tableName, regionReplication);
+
+    // Start count at '1' so we skip default, primary replica and only look at secondaries.
+    for (int i = 1; i < regionReplication; i++) {
+      final Region region = regions[i];
+      // wait until all the data is replicated to all secondary regions
+      Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          LOG.info("Verifying replication for region replica {}", region.getRegionInfo());
+          try (RegionScanner rs = region.getScanner(new Scan())) {
+            List<Cell> cells = new ArrayList<>();
+            while (rs.next(cells)) {
+              continue;
+            }
+            return contains(contains, cells);
+          } catch(Throwable ex) {
+            LOG.warn("Verification from secondary region is not complete yet", ex);
+            // still wait
+            return false;
+          }
+        }
+      });
+    }
+  }
+
+  /**
+   * Presumes sorted Cells. Verify that <code>cells</code> has <code>contains</code> at least.
+   */
+  static boolean contains(List<Result> contains, List<Cell> cells) throws IOException {
+    CellScanner containsScanner = CellUtil.createCellScanner(contains);
+    CellScanner cellsScanner = CellUtil.createCellScanner(cells);
+    int matches = 0;
+    int count = 0;
+    while (containsScanner.advance()) {
+      while (cellsScanner.advance()) {
+        count++;
+        LOG.info("{} {}", containsScanner.current(), cellsScanner.current());
+        if (containsScanner.current().equals(cellsScanner.current())) {
+          matches++;
+          break;
+        }
+      }
+    }
+    return !containsScanner.advance() && matches >= 1 && count >= matches && count == cells.size();
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
index 56afcda..5456058 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -127,7 +127,7 @@ public class TestRegionReplicaReplicationEndpoint {
     // and replication started.
     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
       Admin admin = connection.getAdmin()) {
-      String peerId = "region_replica_replication";
+      String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER;
 
       ReplicationPeerConfig peerConfig = null;
       try {
@@ -416,7 +416,7 @@ public class TestRegionReplicaReplicationEndpoint {
     HTU.getAdmin().createTable(htd);
 
     // both tables are created, now pause replication
-    HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
+    HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
 
     // now that the replication is disabled, write to the table to be dropped, then drop the table.
 
@@ -450,9 +450,9 @@ public class TestRegionReplicaReplicationEndpoint {
     MetricsSource metrics = mock(MetricsSource.class);
     ReplicationEndpoint.Context ctx =
       new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), HTU.getConfiguration(),
-        HTU.getTestFileSystem(), ServerRegionReplicaUtil.getReplicationPeerId(),
+        HTU.getTestFileSystem(), ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER,
         UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().getReplicationPeers()
-          .getPeer(ServerRegionReplicaUtil.getReplicationPeerId()),
+          .getPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER),
         metrics, rs.getTableDescriptors(), rs);
     RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
     rrpe.init(ctx);
@@ -476,7 +476,7 @@ public class TestRegionReplicaReplicationEndpoint {
       HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
 
       // now enable the replication
-      HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
+      HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
 
       verifyReplication(tableName, regionReplication, 0, 1000);
     } finally {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
index 6be09c0..eb445f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
@@ -109,7 +109,7 @@ public class TestSerialReplicationChecker {
   public void setUp() throws IOException {
     ReplicationSource source = mock(ReplicationSource.class);
     when(source.getPeerId()).thenReturn(PEER_ID);
-    when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE);
+    when(source.getReplicationQueueStorage()).thenReturn(QUEUE_STORAGE);
     conn = mock(Connection.class);
     when(conn.isClosed()).thenReturn(false);
     doAnswer(new Answer<Table>() {