You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/08/06 16:29:30 UTC

[hbase] branch branch-2 updated: HBASE-24817 Allow configuring WALEntry filters on ReplicationSource (#2198)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 9a564dc  HBASE-24817 Allow configuring WALEntry filters on ReplicationSource (#2198)
9a564dc is described below

commit 9a564dc2bff0f55d2b0ef93a2cdf74cdbde578ba
Author: Michael Stack <sa...@users.noreply.github.com>
AuthorDate: Thu Aug 6 09:29:08 2020 -0700

    HBASE-24817 Allow configuring WALEntry filters on ReplicationSource (#2198)
    
    Allow specifying base WALEntry filter on construction of
    ReplicationSource. Add means of being able to filter WALs by name.
    
    hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
     Add constructor that allows passing a predicate for filtering *in* WALs
     and a list of filters for filtering *out* WALEntries. The latter was
     hardcoded to filter out system-table WALEntries. The former did not
     exist but we'll need it if Replication takes in more than just the
     default Provider.
    
    Signed-off-by Anoop Sam John <an...@apache.org>
    Signed-off-by: Viraj Jasani <vj...@apache.org>
---
 .../replication/SystemTableWALEntryFilter.java     |  12 +-
 .../hadoop/hbase/replication/WALEntryFilter.java   |   8 +-
 .../regionserver/ReplicationSource.java            |  86 ++++++--
 .../hadoop/hbase/wal/AbstractFSWALProvider.java    |  17 +-
 .../regionserver/TestReplicationSource.java        | 239 ++++++++++++++-------
 5 files changed, 247 insertions(+), 115 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
index 1984436..3cda94a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,20 +17,16 @@
  */
 
 package org.apache.hadoop.hbase.replication;
-
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * Skips WAL edits for all System tables including META
+ * Skips WAL edits for all System tables including hbase:meta.
  */
 @InterfaceAudience.Private
 public class SystemTableWALEntryFilter implements WALEntryFilter {
   @Override
   public Entry filter(Entry entry) {
-    if (entry.getKey().getTableName().isSystemTable()) {
-      return null;
-    }
-    return entry;
+    return entry.getKey().getTableName().isSystemTable()? null: entry;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
index cd3f1bd..23c1c60 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,10 +16,9 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hbase.replication;
-
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * A Filter for WAL entries before being sent over to replication. Multiple
@@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
 public interface WALEntryFilter {
-
   /**
    * <p>
    * Applies the filter, possibly returning a different Entry instance. If null is returned, the
@@ -49,5 +47,5 @@ public interface WALEntryFilter {
    * @return a (possibly modified) Entry to use. Returning null or an entry with no cells will cause
    *         the entry to be skipped for replication.
    */
-  public Entry filter(Entry entry);
+  Entry filter(Entry entry);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index c283b5c..23b7029 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,12 +18,12 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -36,6 +36,7 @@ import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -63,15 +64,15 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
 /**
  * Class that handles the source of a replication stream.
- * Currently does not handle more than 1 slave
+ * Currently does not handle more than 1 slave cluster.
  * For each slave cluster it selects a random number of peers
  * using a replication ratio. For example, if replication ration = 0.1
  * and slave cluster has 100 region servers, 10 will be selected.
@@ -120,8 +121,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
   private int logQueueWarnThreshold;
   // ReplicationEndpoint which will handle the actual replication
   private volatile ReplicationEndpoint replicationEndpoint;
-  // A filter (or a chain of filters) for the WAL entries.
+
+  /**
+   * A filter (or a chain of filters) for WAL entries; filters out edits.
+   */
   protected volatile WALEntryFilter walEntryFilter;
+
   // throttler
   private ReplicationThrottler throttler;
   private long defaultBandwidth;
@@ -140,6 +145,39 @@ public class ReplicationSource implements ReplicationSourceInterface {
   private Thread initThread;
 
   /**
+   * WALs to replicate.
+   * Predicate that returns 'true' for WALs to replicate and false for WALs to skip.
+   */
+  private final Predicate<Path> filterInWALs;
+
+  /**
+   * Base WALEntry filters for this class. Unmodifiable. Set on construction.
+   * Filters *out* edits we do not want replicated, passed on to replication endpoints.
+   * This is the basic set. Down in #initializeWALEntryFilter this set is added to the end of
+   * the WALEntry filter chain. These are put after those that we pick up from the configured
+   * endpoints and other machinations to create the final {@link #walEntryFilter}.
+   * @see WALEntryFilter
+   */
+  private final List<WALEntryFilter> baseFilterOutWALEntries;
+
+  ReplicationSource() {
+    // Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables.
+    this(p -> !AbstractFSWALProvider.isMetaFile(p),
+      Lists.newArrayList(new SystemTableWALEntryFilter()));
+  }
+
+  /**
+   * @param replicateWAL Pass a filter to run against WAL Path; filter *in* WALs to Replicate;
+   *   i.e. return 'true' if you want to replicate the content of the WAL.
+   * @param baseFilterOutWALEntries Base set of filters you want applied always; filters *out*
+   *   WALEntries so they never make it out of this ReplicationSource.
+   */
+  ReplicationSource(Predicate<Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) {
+    this.filterInWALs = replicateWAL;
+    this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
+  }
+
+  /**
    * Instantiation method used by region servers
    * @param conf configuration to use
    * @param fs file system to use
@@ -194,26 +232,30 @@ public class ReplicationSource implements ReplicationSourceInterface {
   }
 
   @Override
-  public void enqueueLog(Path log) {
-    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName());
+  public void enqueueLog(Path wal) {
+    if (!this.filterInWALs.test(wal)) {
+      LOG.trace("NOT replicating {}", wal);
+      return;
+    }
+    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
     PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
     if (queue == null) {
       queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
       queues.put(logPrefix, queue);
       if (this.isSourceActive() && this.walEntryFilter != null) {
         // new wal group observed after source startup, start a new worker thread to track it
-        // notice: it's possible that log enqueued when this.running is set but worker thread
+        // notice: it's possible that wal enqueued when this.running is set but worker thread
         // still not launched, so it's necessary to check workerThreads before start the worker
         tryStartNewShipper(logPrefix, queue);
       }
     }
-    queue.put(log);
+    queue.put(wal);
     if (LOG.isTraceEnabled()) {
-      LOG.trace("{} Added log file {} to queue of source {}.", logPeerId(), logPrefix,
+      LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), logPrefix,
         this.replicationQueueInfo.getQueueId());
     }
     this.metrics.incrSizeOfLogQueue();
-    // This will log a warning for each new log that gets created above the warn threshold
+    // This will wal a warning for each new wal that gets created above the warn threshold
     int queueSize = queue.size();
     if (queueSize > this.logQueueWarnThreshold) {
       LOG.warn("{} WAL group {} queue size: {} exceeds value of "
@@ -302,8 +344,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
 
   private void initializeWALEntryFilter(UUID peerClusterId) {
     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
-    ArrayList<WALEntryFilter> filters =
-      Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter());
+    List<WALEntryFilter> filters = new ArrayList<>(this.baseFilterOutWALEntries);
     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
     if (filterFromEndpoint != null) {
       filters.add(filterFromEndpoint);
@@ -392,6 +433,16 @@ public class ReplicationSource implements ReplicationSourceInterface {
       : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
   }
 
+  /**
+   * Call after {@link #initializeWALEntryFilter(UUID)} else it will be null.
+   * @return The WAL Entry Filter Chain this ReplicationSource will use on WAL files filtering
+   * out WALEntry edits.
+   */
+  @VisibleForTesting
+  WALEntryFilter getWalEntryFilter() {
+    return walEntryFilter;
+  }
+
   protected final void uncaughtException(Thread t, Throwable e) {
     RSRpcServices.exitIfOOME(e);
     LOG.error("Unexpected exception in {} currentPath={}",
@@ -623,7 +674,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
       }
     }
     if (clearMetrics) {
-      this.metrics.clear();
+      // Can be null in test context.
+      if (this.metrics != null) {
+        this.metrics.clear();
+      }
     }
   }
 
@@ -653,10 +707,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
     return !this.server.isStopped() && this.sourceRunning;
   }
 
-  public UUID getPeerClusterUUID(){
-    return this.clusterId;
-  }
-
   /**
    * Comparator used to compare logs together based on their start time
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index ce6770f..8e96262 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -45,7 +45,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
@@ -308,7 +307,6 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
    * Construct the directory name for all old WALs on a given server. The default old WALs dir looks
    * like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver to
    * true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>.
-   * @param conf
    * @param serverName Server name formatted as described in {@link ServerName}
    * @return the relative WAL directory name
    */
@@ -414,11 +412,11 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
     return isMetaFile(p.getName());
   }
 
+  /**
+   * @return True if String ends in {@link #META_WAL_PROVIDER_ID}
+   */
   public static boolean isMetaFile(String p) {
-    if (p != null && p.endsWith(META_WAL_PROVIDER_ID)) {
-      return true;
-    }
-    return false;
+    return p != null && p.endsWith(META_WAL_PROVIDER_ID);
   }
 
   public static boolean isArchivedLogFile(Path p) {
@@ -461,12 +459,9 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
    * @param path path to WAL file
    * @param conf configuration
    * @return WAL Reader instance
-   * @throws IOException
    */
   public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf)
-      throws IOException
-
-  {
+      throws IOException {
     long retryInterval = 2000; // 2 sec
     int maxAttempts = 30;
     int attempt = 0;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 274ccab..6323da3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,13 +16,14 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hbase.replication.regionserver;
-
+import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-
+import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.util.OptionalLong;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -41,19 +42,14 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.Waiter.Predicate;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
-import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -63,7 +59,6 @@ import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALProvider;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -96,9 +91,13 @@ public class TestReplicationSource {
     FS = TEST_UTIL.getDFSCluster().getFileSystem();
     Path rootDir = TEST_UTIL.createRootDir();
     oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true);
+    if (FS.exists(oldLogDir)) {
+      FS.delete(oldLogDir, true);
+    }
     logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
-    if (FS.exists(logDir)) FS.delete(logDir, true);
+    if (FS.exists(logDir)) {
+      FS.delete(logDir, true);
+    }
   }
 
   @AfterClass
@@ -109,15 +108,99 @@ public class TestReplicationSource {
   }
 
   /**
+   * Test the default ReplicationSource skips queuing hbase:meta WAL files.
+   */
+  @Test
+  public void testDefaultSkipsMetaWAL() throws IOException {
+    ReplicationSource rs = new ReplicationSource();
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setInt("replication.source.maxretriesmultiplier", 1);
+    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
+    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
+    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
+    Mockito.when(peerConfig.getReplicationEndpointImpl()).
+      thenReturn(DoNothingReplicationEndpoint.class.getName());
+    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
+    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
+    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+    String queueId = "qid";
+    RegionServerServices rss =
+      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
+    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
+      p -> OptionalLong.empty(), new MetricsSource(queueId));
+    try {
+      rs.startup();
+      assertTrue(rs.isSourceActive());
+      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
+      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
+      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
+      rs.enqueueLog(new Path("a.1"));
+      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
+    } finally {
+      rs.terminate("Done");
+      rss.stop("Done");
+    }
+  }
+
+  /**
+   * Test that we filter out meta edits, etc.
+   */
+  @Test
+  public void testWALEntryFilter() throws IOException {
+    // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource
+    // instance and init it.
+    ReplicationSource rs = new ReplicationSource();
+    UUID uuid = UUID.randomUUID();
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
+    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
+    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
+    Mockito.when(peerConfig.getReplicationEndpointImpl()).
+      thenReturn(DoNothingReplicationEndpoint.class.getName());
+    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
+    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
+    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+    String queueId = "qid";
+    RegionServerServices rss =
+      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
+    rs.init(conf, null, manager, null, mockPeer, rss, queueId,
+      uuid, p -> OptionalLong.empty(), new MetricsSource(queueId));
+    try {
+      rs.startup();
+      TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
+      WALEntryFilter wef = rs.getWalEntryFilter();
+      // Test non-system WAL edit.
+      WAL.Entry e = new WAL.Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY,
+        TableName.valueOf("test"), -1), new WALEdit());
+      assertTrue(wef.filter(e) == e);
+      // Test system WAL edit.
+      e = new WAL.Entry(
+        new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1),
+        new WALEdit());
+      assertNull(wef.filter(e));
+    } finally {
+      rs.terminate("Done");
+      rss.stop("Done");
+    }
+  }
+
+  /**
    * Sanity check that we can move logs around while we are reading
    * from them. Should this test fail, ReplicationSource would have a hard
    * time reading logs that are being archived.
    */
+  // This tests doesn't belong in here... it is not about ReplicationSource.
   @Test
   public void testLogMoving() throws Exception{
     Path logPath = new Path(logDir, "log");
-    if (!FS.exists(logDir)) FS.mkdirs(logDir);
-    if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir);
+    if (!FS.exists(logDir)) {
+      FS.mkdirs(logDir);
+    }
+    if (!FS.exists(oldLogDir)) {
+      FS.mkdirs(oldLogDir);
+    }
     WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath,
         TEST_UTIL.getConfiguration());
     for(int i = 0; i < 3; i++) {
@@ -142,7 +225,7 @@ public class TestReplicationSource {
     entry = reader.next();
     assertNotNull(entry);
 
-    entry = reader.next();
+    reader.next();
     entry = reader.next();
 
     assertNull(entry);
@@ -151,47 +234,31 @@ public class TestReplicationSource {
 
   /**
    * Tests that {@link ReplicationSource#terminate(String)} will timeout properly
+   * Moved here from TestReplicationSource because doesn't need cluster.
    */
   @Test
   public void testTerminateTimeout() throws Exception {
     ReplicationSource source = new ReplicationSource();
-    ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() {
-      @Override
-      protected void doStart() {
-        notifyStarted();
-      }
-
-      @Override
-      protected void doStop() {
-        // not calling notifyStopped() here causes the caller of stop() to get a Future that never
-        // completes
-      }
-    };
-    replicationEndpoint.start();
-    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
-    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
-    Configuration testConf = HBaseConfiguration.create();
-    testConf.setInt("replication.source.maxretriesmultiplier", 1);
-    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
-    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
-    source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
-      p -> OptionalLong.empty(), null);
-    ExecutorService executor = Executors.newSingleThreadExecutor();
-    Future<?> future = executor.submit(new Runnable() {
-
-      @Override
-      public void run() {
-        source.terminate("testing source termination");
-      }
-    });
-    long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
-    Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() {
-
-      @Override
-      public boolean evaluate() throws Exception {
-        return future.isDone();
-      }
-    });
+    ReplicationEndpoint
+      replicationEndpoint = new DoNothingReplicationEndpoint();
+    try {
+      replicationEndpoint.start();
+      ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
+      Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+      Configuration testConf = HBaseConfiguration.create();
+      testConf.setInt("replication.source.maxretriesmultiplier", 1);
+      ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
+      Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+      source.init(testConf, null, manager, null, mockPeer, null, "testPeer",
+        null, p -> OptionalLong.empty(), null);
+      ExecutorService executor = Executors.newSingleThreadExecutor();
+      Future<?> future = executor.submit(
+        () -> source.terminate("testing source termination"));
+      long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
+      Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone);
+    } finally {
+      replicationEndpoint.stop();
+    }
   }
 
   /**
@@ -211,10 +278,10 @@ public class TestReplicationSource {
 
       HRegionServer serverA = cluster.getRegionServer(0);
       final ReplicationSourceManager managerA =
-          ((Replication) serverA.getReplicationSourceService()).getReplicationManager();
+          serverA.getReplicationSourceService().getReplicationManager();
       HRegionServer serverB = cluster.getRegionServer(1);
       final ReplicationSourceManager managerB =
-          ((Replication) serverB.getReplicationSourceService()).getReplicationManager();
+          serverB.getReplicationSourceService().getReplicationManager();
       final Admin admin = TEST_UTIL.getAdmin();
 
       final String peerId = "TestPeer";
@@ -222,7 +289,7 @@ public class TestReplicationSource {
         ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
       // Wait for replication sources to come up
       Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-        @Override public boolean evaluate() throws Exception {
+        @Override public boolean evaluate() {
           return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
         }
       });
@@ -257,17 +324,11 @@ public class TestReplicationSource {
       // 1. The serverB's normal queue
       // 2. serverA's recovered queue on serverB
       cluster.stopRegionServer(serverB.getServerName());
-      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-        @Override public boolean evaluate() throws Exception {
-          return managerC.getOldSources().size() == 2;
-        }
-      });
+      Waiter.waitFor(conf, 20000,
+        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2);
       admin.enableReplicationPeer(peerId);
-      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-        @Override public boolean evaluate() throws Exception {
-          return managerC.getOldSources().size() == 0;
-        }
-      });
+      Waiter.waitFor(conf, 20000,
+        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0);
     } finally {
       conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
     }
@@ -277,7 +338,7 @@ public class TestReplicationSource {
    * Regionserver implementation that adds a delay on the graceful shutdown.
    */
   public static class ShutdownDelayRegionServer extends HRegionServer {
-    public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException {
+    public ShutdownDelayRegionServer(Configuration conf) throws IOException {
       super(conf);
     }
 
@@ -295,7 +356,39 @@ public class TestReplicationSource {
     }
   }
 
-  // Test HBASE-20497
+  /**
+   * Deadend Endpoint. Does nothing.
+   */
+  public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint {
+    private final UUID uuid = UUID.randomUUID();
+
+    @Override public void init(Context context) throws IOException {
+      this.ctx = context;
+    }
+
+    @Override public WALEntryFilter getWALEntryfilter() {
+      return null;
+    }
+
+    @Override public synchronized UUID getPeerUUID() {
+      return this.uuid;
+    }
+
+    @Override
+    protected void doStart() {
+      notifyStarted();
+    }
+
+    @Override
+    protected void doStop() {
+      notifyStopped();
+    }
+  }
+
+  /**
+   * Test HBASE-20497
+   * Moved here from TestReplicationSource because doesn't need cluster.
+   */
   @Test
   public void testRecoveredReplicationSourceShipperGetPosition() throws Exception {
     String walGroupId = "fake-wal-group-id";
@@ -310,14 +403,14 @@ public class TestReplicationSource {
     Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
     ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
     Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
-        .thenReturn(1001L);
+      .thenReturn(1001L);
     Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
-        .thenReturn(-1L);
+      .thenReturn(-1L);
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     conf.setInt("replication.source.maxretriesmultiplier", -1);
     RecoveredReplicationSourceShipper shipper =
-        new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
-    Assert.assertEquals(1001L, shipper.getStartPosition());
-    conf.unset("replication.source.maxretriesmultiplier");
+      new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
+    assertEquals(1001L, shipper.getStartPosition());
   }
 }