You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/06/01 06:56:02 UTC

hbase git commit: HBASE-18130 Refactor ReplicationSource

Repository: hbase
Updated Branches:
  refs/heads/master db8ce0566 -> 123086eda


HBASE-18130 Refactor ReplicationSource


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/123086ed
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/123086ed
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/123086ed

Branch: refs/heads/master
Commit: 123086edad75308f8682a491ff8affa545babe4c
Parents: db8ce05
Author: Guanghao Zhang <zg...@apache.org>
Authored: Sun May 28 16:39:59 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Thu Jun 1 14:50:45 2017 +0800

----------------------------------------------------------------------
 .../RecoveredReplicationSource.java             | 182 ++++++
 ...RecoveredReplicationSourceShipperThread.java | 147 +++++
 .../regionserver/ReplicationSource.java         | 626 ++++---------------
 .../regionserver/ReplicationSourceFactory.java  |  55 ++
 .../ReplicationSourceInterface.java             |  65 +-
 .../regionserver/ReplicationSourceManager.java  |  23 +-
 .../ReplicationSourceShipperThread.java         | 336 ++++++++++
 .../ReplicationSourceWALReaderThread.java       |   1 +
 .../replication/ReplicationSourceDummy.java     |  36 +-
 .../TestReplicationSourceManager.java           |   2 +-
 10 files changed, 930 insertions(+), 543 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
new file mode 100644
index 0000000..388b8d4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+
+/**
+ * Class that handles the recovered source of a replication stream, which is transfered from
+ * another dead region server. This will be closed when all logs are pushed to peer cluster.
+ */
+@InterfaceAudience.Private
+public class RecoveredReplicationSource extends ReplicationSource {
+
+  private static final Log LOG = LogFactory.getLog(RecoveredReplicationSource.class);
+
+  private String actualPeerId;
+
+  @Override
+  public void init(final Configuration conf, final FileSystem fs,
+      final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
+      final ReplicationPeers replicationPeers, final Stoppable stopper,
+      final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
+      final MetricsSource metrics) throws IOException {
+    super.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerClusterZnode,
+      clusterId, replicationEndpoint, metrics);
+    this.actualPeerId = this.replicationQueueInfo.getPeerId();
+  }
+
+  @Override
+  protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue<Path> queue) {
+    final RecoveredReplicationSourceShipperThread worker =
+        new RecoveredReplicationSourceShipperThread(conf, walGroupId, queue, this,
+            this.replicationQueues);
+    ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
+    if (extant != null) {
+      LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
+    } else {
+      LOG.debug("Starting up worker for wal group " + walGroupId);
+      worker.startup(getUncaughtExceptionHandler());
+      worker.setWALReader(
+        startNewWALReaderThread(worker.getName(), walGroupId, queue, worker.getStartPosition()));
+      workerThreads.put(walGroupId, worker);
+    }
+  }
+
+  public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
+    boolean hasPathChanged = false;
+    PriorityBlockingQueue<Path> newPaths =
+        new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
+    pathsLoop: for (Path path : queue) {
+      if (fs.exists(path)) { // still in same location, don't need to do anything
+        newPaths.add(path);
+        continue;
+      }
+      // Path changed - try to find the right path.
+      hasPathChanged = true;
+      if (stopper instanceof ReplicationSyncUp.DummyServer) {
+        // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
+        // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
+        Path newPath = getReplSyncUpPath(path);
+        newPaths.add(newPath);
+        continue;
+      } else {
+        // See if Path exists in the dead RS folder (there could be a chain of failures
+        // to look at)
+        List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
+        LOG.info("NB dead servers : " + deadRegionServers.size());
+        final Path walDir = FSUtils.getWALRootDir(conf);
+        for (String curDeadServerName : deadRegionServers) {
+          final Path deadRsDirectory =
+              new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName));
+          Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
+              deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
+          for (Path possibleLogLocation : locs) {
+            LOG.info("Possible location " + possibleLogLocation.toUri().toString());
+            if (manager.getFs().exists(possibleLogLocation)) {
+              // We found the right new location
+              LOG.info("Log " + path + " still exists at " + possibleLogLocation);
+              newPaths.add(possibleLogLocation);
+              continue pathsLoop;
+            }
+          }
+        }
+        // didn't find a new location
+        LOG.error(
+          String.format("WAL Path %s doesn't exist and couldn't find its new location", path));
+        newPaths.add(path);
+      }
+    }
+
+    if (hasPathChanged) {
+      if (newPaths.size() != queue.size()) { // this shouldn't happen
+        LOG.error("Recovery queue size is incorrect");
+        throw new IOException("Recovery queue size error");
+      }
+      // put the correct locations in the queue
+      // since this is a recovered queue with no new incoming logs,
+      // there shouldn't be any concurrency issues
+      queue.clear();
+      for (Path path : newPaths) {
+        queue.add(path);
+      }
+    }
+  }
+
+  // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
+  // area rather than to the wal area for a particular region server.
+  private Path getReplSyncUpPath(Path path) throws IOException {
+    FileStatus[] rss = fs.listStatus(manager.getLogDir());
+    for (FileStatus rs : rss) {
+      Path p = rs.getPath();
+      FileStatus[] logs = fs.listStatus(p);
+      for (FileStatus log : logs) {
+        p = new Path(p, log.getPath().getName());
+        if (p.getName().equals(path.getName())) {
+          LOG.info("Log " + p.getName() + " found at " + p);
+          return p;
+        }
+      }
+    }
+    LOG.error("Didn't find path for: " + path.getName());
+    return path;
+  }
+
+  public void tryFinish() {
+    // use synchronize to make sure one last thread will clean the queue
+    synchronized (workerThreads) {
+      Threads.sleep(100);// wait a short while for other worker thread to fully exit
+      boolean allOtherTaskDone = true;
+      for (ReplicationSourceShipperThread worker : workerThreads.values()) {
+        if (worker.isActive()) {
+          allOtherTaskDone = false;
+          break;
+        }
+      }
+      if (allOtherTaskDone) {
+        manager.closeRecoveredQueue(this);
+        LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: "
+            + getStats());
+      }
+    }
+  }
+
+  @Override
+  public String getPeerId() {
+    return this.actualPeerId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
new file mode 100644
index 0000000..024b0c4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ *  Used by a {@link RecoveredReplicationSource}.
+ */
+@InterfaceAudience.Private
+public class RecoveredReplicationSourceShipperThread extends ReplicationSourceShipperThread {
+
+  private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceShipperThread.class);
+  protected final RecoveredReplicationSource source;
+  private final ReplicationQueues replicationQueues;
+
+  public RecoveredReplicationSourceShipperThread(Configuration conf, String walGroupId,
+      PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
+      ReplicationQueues replicationQueues) {
+    super(conf, walGroupId, queue, source);
+    this.source = source;
+    this.replicationQueues = replicationQueues;
+  }
+
+  @Override
+  public void run() {
+    // Loop until we close down
+    while (isActive()) {
+      int sleepMultiplier = 1;
+      // Sleep until replication is enabled again
+      if (!source.isPeerEnabled()) {
+        if (source.sleepForRetries("Replication is disabled", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+        continue;
+      }
+
+      while (entryReader == null) {
+        if (source.sleepForRetries("Replication WAL entry reader thread not initialized",
+          sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+      }
+
+      try {
+        WALEntryBatch entryBatch = entryReader.take();
+        shipEdits(entryBatch);
+        if (entryBatch.getWalEntries().isEmpty()
+            && entryBatch.getLastSeqIds().isEmpty()) {
+          LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+              + source.getPeerClusterZnode());
+          source.getSourceMetrics().incrCompletedRecoveryQueue();
+          setWorkerRunning(false);
+          continue;
+        }
+      } catch (InterruptedException e) {
+        LOG.trace("Interrupted while waiting for next replication entry batch", e);
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    source.tryFinish();
+  }
+
+  @Override
+  public long getStartPosition() {
+    long startPosition = getRecoveredQueueStartPos();
+    int numRetries = 0;
+    while (numRetries <= maxRetriesMultiplier) {
+      try {
+        source.locateRecoveredPaths(queue);
+        break;
+      } catch (IOException e) {
+        LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
+        numRetries++;
+      }
+    }
+    return startPosition;
+  }
+
+  // If this is a recovered queue, the queue is already full and the first log
+  // normally has a position (unless the RS failed between 2 logs)
+  private long getRecoveredQueueStartPos() {
+    long startPosition = 0;
+    String peerClusterZnode = source.getPeerClusterZnode();
+    try {
+      startPosition = this.replicationQueues.getLogPosition(peerClusterZnode,
+        this.queue.peek().getName());
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
+            + startPosition);
+      }
+    } catch (ReplicationException e) {
+      terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
+    }
+    return startPosition;
+  }
+
+  @Override
+  protected void updateLogPosition(long lastReadPosition) {
+    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
+      lastReadPosition, true, false);
+    lastLoggedPosition = lastReadPosition;
+  }
+
+  private void terminate(String reason, Exception cause) {
+    if (cause == null) {
+      LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
+
+    } else {
+      LOG.error("Closing worker for wal group " + this.walGroupId
+          + " because an error occurred: " + reason, cause);
+    }
+    entryReader.interrupt();
+    Threads.shutdown(entryReader, sleepForRetries);
+    this.interrupt();
+    Threads.shutdown(this, sleepForRetries);
+    LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 72da9bd..b86f35f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -18,9 +18,6 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Service;
@@ -42,19 +39,14 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -66,11 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -96,33 +84,30 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   // each presents a queue for one wal group
   private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
   // per group queue size, keep no more than this number of logs in each wal group
-  private int queueSizePerGroup;
-  private ReplicationQueues replicationQueues;
+  protected int queueSizePerGroup;
+  protected ReplicationQueues replicationQueues;
   private ReplicationPeers replicationPeers;
 
-  private Configuration conf;
-  private ReplicationQueueInfo replicationQueueInfo;
+  protected Configuration conf;
+  protected ReplicationQueueInfo replicationQueueInfo;
   // id of the peer cluster this source replicates to
   private String peerId;
 
-  String actualPeerId;
   // The manager of all sources to which we ping back our progress
-  private ReplicationSourceManager manager;
+  protected ReplicationSourceManager manager;
   // Should we stop everything?
-  private Stoppable stopper;
+  protected Stoppable stopper;
   // How long should we sleep for each retry
   private long sleepForRetries;
-  private FileSystem fs;
+  protected FileSystem fs;
   // id of this cluster
   private UUID clusterId;
   // id of the other cluster
   private UUID peerClusterId;
   // total number of edits we replicated
   private AtomicLong totalReplicatedEdits = new AtomicLong(0);
-  // total number of edits we replicated
-  private AtomicLong totalReplicatedOperations = new AtomicLong(0);
   // The znode we currently play with
-  private String peerClusterZnode;
+  protected String peerClusterZnode;
   // Maximum number of retries before taking bold actions
   private int maxRetriesMultiplier;
   // Indicates if this particular source is running
@@ -139,7 +124,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   private ReplicationThrottler throttler;
   private long defaultBandwidth;
   private long currentBandwidth;
-  private ConcurrentHashMap<String, ReplicationSourceShipperThread> workerThreads = new ConcurrentHashMap<>();
+  protected final ConcurrentHashMap<String, ReplicationSourceShipperThread> workerThreads =
+      new ConcurrentHashMap<>();
 
   private AtomicLong totalBufferUsed;
 
@@ -182,8 +168,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this.peerId = this.replicationQueueInfo.getPeerId();
-    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
-    this.actualPeerId = replicationQueueInfo.getPeerId();
     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
     this.replicationEndpoint = replicationEndpoint;
 
@@ -213,15 +197,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
         // new wal group observed after source startup, start a new worker thread to track it
         // notice: it's possible that log enqueued when this.running is set but worker thread
         // still not launched, so it's necessary to check workerThreads before start the worker
-        final ReplicationSourceShipperThread worker =
-            new ReplicationSourceShipperThread(logPrefix, queue, replicationQueueInfo, this);
-        ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(logPrefix, worker);
-        if (extant != null) {
-          LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
-        } else {
-          LOG.debug("Starting up worker for wal group " + logPrefix);
-          worker.startup();
-        }
+        tryStartNewShipperThread(logPrefix, queue);
       }
     }
     queue.put(log);
@@ -262,15 +238,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     }
   }
 
-  private void uninitialize() {
-    LOG.debug("Source exiting " + this.peerId);
-    metrics.clear();
-    if (replicationEndpoint.state() == Service.State.STARTING
-        || replicationEndpoint.state() == Service.State.RUNNING) {
-      replicationEndpoint.stopAndWait();
-    }
-  }
-
   @Override
   public void run() {
     // mark we are running now
@@ -322,15 +289,98 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
       String walGroupId = entry.getKey();
       PriorityBlockingQueue<Path> queue = entry.getValue();
-      final ReplicationSourceShipperThread worker =
-          new ReplicationSourceShipperThread(walGroupId, queue, replicationQueueInfo, this);
-      ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
-      if (extant != null) {
-        LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
-      } else {
-        LOG.debug("Starting up worker for wal group " + walGroupId);
-        worker.startup();
+      tryStartNewShipperThread(walGroupId, queue);
+    }
+  }
+
+  protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue<Path> queue) {
+    final ReplicationSourceShipperThread worker = new ReplicationSourceShipperThread(conf,
+        walGroupId, queue, this);
+    ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
+    if (extant != null) {
+      LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
+    } else {
+      LOG.debug("Starting up worker for wal group " + walGroupId);
+      worker.startup(getUncaughtExceptionHandler());
+      worker.setWALReader(startNewWALReaderThread(worker.getName(), walGroupId, queue,
+        worker.getStartPosition()));
+      workerThreads.put(walGroupId, worker);
+    }
+  }
+
+  protected ReplicationSourceWALReaderThread startNewWALReaderThread(String threadName,
+      String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) {
+    ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter,
+      new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
+    ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
+    ReplicationSourceWALReaderThread walReader = new ReplicationSourceWALReaderThread(manager,
+        replicationQueueInfo, queue, startPosition, fs, conf, readerFilter, metrics);
+    Threads.setDaemonThreadRunning(walReader, threadName
+        + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
+      getUncaughtExceptionHandler());
+    return walReader;
+  }
+
+  public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
+    return new Thread.UncaughtExceptionHandler() {
+      @Override
+      public void uncaughtException(final Thread t, final Throwable e) {
+        RSRpcServices.exitIfOOME(e);
+        LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
+        stopper.stop("Unexpected exception in " + t.getName());
       }
+    };
+  }
+
+  @Override
+  public ReplicationEndpoint getReplicationEndpoint() {
+    return this.replicationEndpoint;
+  }
+
+  @Override
+  public ReplicationSourceManager getSourceManager() {
+    return this.manager;
+  }
+
+  @Override
+  public void tryThrottle(int batchSize) throws InterruptedException {
+    checkBandwidthChangeAndResetThrottler();
+    if (throttler.isEnabled()) {
+      long sleepTicks = throttler.getNextSleepInterval(batchSize);
+      if (sleepTicks > 0) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
+        }
+        Thread.sleep(sleepTicks);
+        // reset throttler's cycle start tick when sleep for throttling occurs
+        throttler.resetStartTick();
+      }
+    }
+  }
+
+  private void checkBandwidthChangeAndResetThrottler() {
+    long peerBandwidth = getCurrentBandwidth();
+    if (peerBandwidth != currentBandwidth) {
+      currentBandwidth = peerBandwidth;
+      throttler.setBandwidth((double) currentBandwidth / 10.0);
+      LOG.info("ReplicationSource : " + peerId
+          + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
+    }
+  }
+
+  private long getCurrentBandwidth() {
+    ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId);
+    long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
+    // user can set peer bandwidth to 0 to use default bandwidth
+    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
+  }
+
+  private void uninitialize() {
+    LOG.debug("Source exiting " + this.peerId);
+    metrics.clear();
+    if (replicationEndpoint.state() == Service.State.STARTING
+        || replicationEndpoint.state() == Service.State.RUNNING) {
+      replicationEndpoint.stopAndWait();
     }
   }
 
@@ -358,7 +408,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
    *
    * @return true if the peer is enabled, otherwise false
    */
-  protected boolean isPeerEnabled() {
+  @Override
+  public boolean isPeerEnabled() {
     return this.replicationPeers.getStatusOfPeer(this.peerId);
   }
 
@@ -428,7 +479,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   }
 
   @Override
-  public String getPeerClusterId() {
+  public String getPeerId() {
     return this.peerId;
   }
 
@@ -441,7 +492,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     return null;
   }
 
-  private boolean isSourceActive() {
+  @Override
+  public boolean isSourceActive() {
     return !this.stopper.isStopped() && this.sourceRunning;
   }
 
@@ -492,467 +544,17 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
    * Get Replication Source Metrics
    * @return sourceMetrics
    */
+  @Override
   public MetricsSource getSourceMetrics() {
     return this.metrics;
   }
 
-  private long getCurrentBandwidth() {
-    ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId);
-    long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
-    // user can set peer bandwidth to 0 to use default bandwidth
-    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
-  }
-
-  // This thread reads entries from a queue and ships them.
-  // Entries are placed onto the queue by ReplicationSourceWALReaderThread
-  public class ReplicationSourceShipperThread extends Thread {
-    ReplicationSourceInterface source;
-    String walGroupId;
-    PriorityBlockingQueue<Path> queue;
-    ReplicationQueueInfo replicationQueueInfo;
-    // Last position in the log that we sent to ZooKeeper
-    private long lastLoggedPosition = -1;
-    // Path of the current log
-    private volatile Path currentPath;
-    // Indicates whether this particular worker is running
-    private boolean workerRunning = true;
-    ReplicationSourceWALReaderThread entryReader;
-    // Use guava cache to set ttl for each key
-    private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
-        .expireAfterAccess(1, TimeUnit.DAYS).build(
-        new CacheLoader<String, Boolean>() {
-          @Override
-          public Boolean load(String key) throws Exception {
-            return false;
-          }
-        }
-    );
-
-    public ReplicationSourceShipperThread(String walGroupId,
-        PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
-        ReplicationSourceInterface source) {
-      this.walGroupId = walGroupId;
-      this.queue = queue;
-      this.replicationQueueInfo = replicationQueueInfo;
-      this.source = source;
-    }
-
-    @Override
-    public void run() {
-      // Loop until we close down
-      while (isWorkerActive()) {
-        int sleepMultiplier = 1;
-        // Sleep until replication is enabled again
-        if (!isPeerEnabled()) {
-          if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
-            sleepMultiplier++;
-          }
-          continue;
-        }
-        while (entryReader == null) {
-          if (sleepForRetries("Replication WAL entry reader thread not initialized",
-            sleepMultiplier)) {
-            sleepMultiplier++;
-          }
-          if (sleepMultiplier == maxRetriesMultiplier) {
-            LOG.warn("Replication WAL entry reader thread not initialized");
-          }
-        }
-
-        try {
-          WALEntryBatch entryBatch = entryReader.take();
-          for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
-            waitingUntilCanPush(entry);
-          }
-          shipEdits(entryBatch);
-          releaseBufferQuota((int) entryBatch.getHeapSize());
-          if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty()
-              && entryBatch.getLastSeqIds().isEmpty()) {
-            LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
-                + peerClusterZnode);
-            metrics.incrCompletedRecoveryQueue();
-            setWorkerRunning(false);
-            continue;
-          }
-        } catch (InterruptedException e) {
-          LOG.trace("Interrupted while waiting for next replication entry batch", e);
-          Thread.currentThread().interrupt();
-        }
-      }
-
-      if (replicationQueueInfo.isQueueRecovered()) {
-        // use synchronize to make sure one last thread will clean the queue
-        synchronized (workerThreads) {
-          Threads.sleep(100);// wait a short while for other worker thread to fully exit
-          boolean allOtherTaskDone = true;
-          for (ReplicationSourceShipperThread worker : workerThreads.values()) {
-            if (!worker.equals(this) && worker.isAlive()) {
-              allOtherTaskDone = false;
-              break;
-            }
-          }
-          if (allOtherTaskDone) {
-            manager.closeRecoveredQueue(this.source);
-            LOG.info("Finished recovering queue " + peerClusterZnode
-                + " with the following stats: " + getStats());
-          }
-        }
-      }
-    }
-
-    private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
-      String key = entry.getKey();
-      long seq = entry.getValue();
-      boolean deleteKey = false;
-      if (seq <= 0) {
-        // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
-        deleteKey = true;
-        seq = -seq;
-      }
-
-      if (!canSkipWaitingSet.getUnchecked(key)) {
-        try {
-          manager.waitUntilCanBePushed(Bytes.toBytes(key), seq, actualPeerId);
-        } catch (IOException e) {
-          LOG.error("waitUntilCanBePushed fail", e);
-          stopper.stop("waitUntilCanBePushed fail");
-        } catch (InterruptedException e) {
-          LOG.warn("waitUntilCanBePushed interrupted", e);
-          Thread.currentThread().interrupt();
-        }
-        canSkipWaitingSet.put(key, true);
-      }
-      if (deleteKey) {
-        canSkipWaitingSet.invalidate(key);
-      }
-    }
-
-    private void cleanUpHFileRefs(WALEdit edit) throws IOException {
-      String peerId = peerClusterZnode;
-      if (peerId.contains("-")) {
-        // peerClusterZnode will be in the form peerId + "-" + rsZNode.
-        // A peerId will not have "-" in its name, see HBASE-11394
-        peerId = peerClusterZnode.split("-")[0];
-      }
-      List<Cell> cells = edit.getCells();
-      int totalCells = cells.size();
-      for (int i = 0; i < totalCells; i++) {
-        Cell cell = cells.get(i);
-        if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
-          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
-          List<StoreDescriptor> stores = bld.getStoresList();
-          int totalStores = stores.size();
-          for (int j = 0; j < totalStores; j++) {
-            List<String> storeFileList = stores.get(j).getStoreFileList();
-            manager.cleanUpHFileRefs(peerId, storeFileList);
-            metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
-          }
-        }
-      }
-    }
-
-    private void checkBandwidthChangeAndResetThrottler() {
-      long peerBandwidth = getCurrentBandwidth();
-      if (peerBandwidth != currentBandwidth) {
-        currentBandwidth = peerBandwidth;
-        throttler.setBandwidth((double) currentBandwidth / 10.0);
-        LOG.info("ReplicationSource : " + peerId
-            + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
-      }
-    }
-
-    /**
-     * Do the shipping logic
-     */
-    protected void shipEdits(WALEntryBatch entryBatch) {
-      List<Entry> entries = entryBatch.getWalEntries();
-      long lastReadPosition = entryBatch.getLastWalPosition();
-      currentPath = entryBatch.getLastWalPath();
-      int sleepMultiplier = 0;
-      if (entries.isEmpty()) {
-        if (lastLoggedPosition != lastReadPosition) {
-          // Save positions to meta table before zk.
-          updateSerialRepPositions(entryBatch.getLastSeqIds());
-          updateLogPosition(lastReadPosition);
-          // if there was nothing to ship and it's not an error
-          // set "ageOfLastShippedOp" to <now> to indicate that we're current
-          metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
-        }
-        return;
-      }
-      int currentSize = (int) entryBatch.getHeapSize();
-      while (isWorkerActive()) {
-        try {
-          checkBandwidthChangeAndResetThrottler();
-          if (throttler.isEnabled()) {
-            long sleepTicks = throttler.getNextSleepInterval(currentSize);
-            if (sleepTicks > 0) {
-              try {
-                if (LOG.isTraceEnabled()) {
-                  LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
-                }
-                Thread.sleep(sleepTicks);
-              } catch (InterruptedException e) {
-                LOG.debug("Interrupted while sleeping for throttling control");
-                Thread.currentThread().interrupt();
-                // current thread might be interrupted to terminate
-                // directly go back to while() for confirm this
-                continue;
-              }
-              // reset throttler's cycle start tick when sleep for throttling occurs
-              throttler.resetStartTick();
-            }
-          }
-          // create replicateContext here, so the entries can be GC'd upon return from this call
-          // stack
-          ReplicationEndpoint.ReplicateContext replicateContext =
-              new ReplicationEndpoint.ReplicateContext();
-          replicateContext.setEntries(entries).setSize(currentSize);
-          replicateContext.setWalGroupId(walGroupId);
-
-          long startTimeNs = System.nanoTime();
-          // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
-          boolean replicated = replicationEndpoint.replicate(replicateContext);
-          long endTimeNs = System.nanoTime();
-
-          if (!replicated) {
-            continue;
-          } else {
-            sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
-          }
-
-          if (this.lastLoggedPosition != lastReadPosition) {
-            //Clean up hfile references
-            int size = entries.size();
-            for (int i = 0; i < size; i++) {
-              cleanUpHFileRefs(entries.get(i).getEdit());
-            }
-
-            // Save positions to meta table before zk.
-            updateSerialRepPositions(entryBatch.getLastSeqIds());
-
-            //Log and clean up WAL logs
-            updateLogPosition(lastReadPosition);
-          }
-          if (throttler.isEnabled()) {
-            throttler.addPushSize(currentSize);
-          }
-          totalReplicatedEdits.addAndGet(entries.size());
-          totalReplicatedOperations.addAndGet(entryBatch.getNbOperations());
-          // FIXME check relationship between wal group and overall
-          metrics.shipBatch(entryBatch.getNbOperations(), currentSize, entryBatch.getNbHFiles());
-          metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
-            walGroupId);
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
-                + totalReplicatedOperations + " operations in "
-                + ((endTimeNs - startTimeNs) / 1000000) + " ms");
-          }
-          break;
-        } catch (Exception ex) {
-          LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
-              + org.apache.hadoop.util.StringUtils.stringifyException(ex));
-          if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
-            sleepMultiplier++;
-          }
-        }
-      }
-    }
-
-    private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
-      try {
-        MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
-          lastPositionsForSerialScope);
-      } catch (IOException e) {
-        LOG.error("updateReplicationPositions fail", e);
-        stopper.stop("updateReplicationPositions fail");
-      }
-    }
-
-    private void updateLogPosition(long lastReadPosition) {
-      manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition,
-        this.replicationQueueInfo.isQueueRecovered(), false);
-      lastLoggedPosition = lastReadPosition;
-    }
-
-    public void startup() {
-      String n = Thread.currentThread().getName();
-      Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
-        @Override
-        public void uncaughtException(final Thread t, final Throwable e) {
-          RSRpcServices.exitIfOOME(e);
-          LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
-              + getCurrentPath(), e);
-          stopper.stop("Unexpected exception in ReplicationSourceWorkerThread");
-        }
-      };
-      Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
-          + peerClusterZnode, handler);
-      workerThreads.put(walGroupId, this);
-
-      long startPosition = 0;
-
-      if (this.replicationQueueInfo.isQueueRecovered()) {
-        startPosition = getRecoveredQueueStartPos(startPosition);
-        int numRetries = 0;
-        while (numRetries <= maxRetriesMultiplier) {
-          try {
-            locateRecoveredPaths();
-            break;
-          } catch (IOException e) {
-            LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
-            numRetries++;
-          }
-        }
-      }
-
-      startWALReaderThread(n, handler, startPosition);
-    }
-
-    // If this is a recovered queue, the queue is already full and the first log
-    // normally has a position (unless the RS failed between 2 logs)
-    private long getRecoveredQueueStartPos(long startPosition) {
-      try {
-        startPosition =
-            (replicationQueues.getLogPosition(peerClusterZnode, this.queue.peek().getName()));
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
-              + startPosition);
-        }
-      } catch (ReplicationException e) {
-        terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
-      }
-      return startPosition;
-    }
-
-    // start a background thread to read and batch entries
-    private void startWALReaderThread(String threadName, Thread.UncaughtExceptionHandler handler,
-        long startPosition) {
-      ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter,
-        new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
-      ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
-      entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue,
-          startPosition, fs, conf, readerFilter, metrics);
-      Threads.setDaemonThreadRunning(entryReader, threadName
-          + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
-        handler);
-    }
-
-    // Loops through the recovered queue and tries to find the location of each log
-    // this is necessary because the logs may have moved before recovery was initiated
-    private void locateRecoveredPaths() throws IOException {
-      boolean hasPathChanged = false;
-      PriorityBlockingQueue<Path> newPaths =
-          new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
-      pathsLoop: for (Path path : queue) {
-        if (fs.exists(path)) { // still in same location, don't need to do anything
-          newPaths.add(path);
-          continue;
-        }
-        // Path changed - try to find the right path.
-        hasPathChanged = true;
-        if (stopper instanceof ReplicationSyncUp.DummyServer) {
-          // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
-          // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
-          Path newPath = getReplSyncUpPath(path);
-          newPaths.add(newPath);
-          continue;
-        } else {
-          // See if Path exists in the dead RS folder (there could be a chain of failures
-          // to look at)
-          List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
-          LOG.info("NB dead servers : " + deadRegionServers.size());
-          final Path walDir = FSUtils.getWALRootDir(conf);
-          for (String curDeadServerName : deadRegionServers) {
-            final Path deadRsDirectory =
-                new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName));
-            Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
-                deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
-            for (Path possibleLogLocation : locs) {
-              LOG.info("Possible location " + possibleLogLocation.toUri().toString());
-              if (manager.getFs().exists(possibleLogLocation)) {
-                // We found the right new location
-                LOG.info("Log " + path + " still exists at " + possibleLogLocation);
-                newPaths.add(possibleLogLocation);
-                continue pathsLoop;
-              }
-            }
-          }
-          // didn't find a new location
-          LOG.error(
-            String.format("WAL Path %s doesn't exist and couldn't find its new location", path));
-          newPaths.add(path);
-        }
-      }
-
-      if (hasPathChanged) {
-        if (newPaths.size() != queue.size()) { // this shouldn't happen
-          LOG.error("Recovery queue size is incorrect");
-          throw new IOException("Recovery queue size error");
-        }
-        // put the correct locations in the queue
-        // since this is a recovered queue with no new incoming logs,
-        // there shouldn't be any concurrency issues
-        queue.clear();
-        for (Path path : newPaths) {
-          queue.add(path);
-        }
-      }
-    }
-
-    // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
-    // area rather than to the wal area for a particular region server.
-    private Path getReplSyncUpPath(Path path) throws IOException {
-      FileStatus[] rss = fs.listStatus(manager.getLogDir());
-      for (FileStatus rs : rss) {
-        Path p = rs.getPath();
-        FileStatus[] logs = fs.listStatus(p);
-        for (FileStatus log : logs) {
-          p = new Path(p, log.getPath().getName());
-          if (p.getName().equals(path.getName())) {
-            LOG.info("Log " + p.getName() + " found at " + p);
-            return p;
-          }
-        }
-      }
-      LOG.error("Didn't find path for: " + path.getName());
-      return path;
-    }
-
-    public Path getCurrentPath() {
-      return this.currentPath;
-    }
-
-    public long getCurrentPosition() {
-      return this.lastLoggedPosition;
-    }
-
-    private boolean isWorkerActive() {
-      return !stopper.isStopped() && workerRunning && !isInterrupted();
-    }
-
-    private void terminate(String reason, Exception cause) {
-      if (cause == null) {
-        LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
-
-      } else {
-        LOG.error("Closing worker for wal group " + this.walGroupId
-            + " because an error occurred: " + reason, cause);
-      }
-      entryReader.interrupt();
-      Threads.shutdown(entryReader, sleepForRetries);
-      this.interrupt();
-      Threads.shutdown(this, sleepForRetries);
-      LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
-    }
-
-    public void setWorkerRunning(boolean workerRunning) {
-      entryReader.setReaderRunning(workerRunning);
-      this.workerRunning = workerRunning;
-    }
-
-    private void releaseBufferQuota(int size) {
-      totalBufferUsed.addAndGet(-size);
+  @Override
+  public void postShipEdits(List<Entry> entries, int batchSize) {
+    if (throttler.isEnabled()) {
+      throttler.addPushSize(batchSize);
     }
+    totalReplicatedEdits.addAndGet(entries.size());
+    totalBufferUsed.addAndGet(-batchSize);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
new file mode 100644
index 0000000..ee01773
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+
+/**
+ * Constructs a {@link ReplicationSourceInterface}
+ */
+@InterfaceAudience.Private
+public class ReplicationSourceFactory {
+
+  private static final Log LOG = LogFactory.getLog(ReplicationSourceFactory.class);
+
+  static ReplicationSourceInterface create(Configuration conf, String peerId) {
+    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+    boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
+    ReplicationSourceInterface src;
+    try {
+      String defaultReplicationSourceImpl =
+          isQueueRecovered ? RecoveredReplicationSource.class.getCanonicalName()
+              : ReplicationSource.class.getCanonicalName();
+      @SuppressWarnings("rawtypes")
+      Class c = Class.forName(
+        conf.get("replication.replicationsource.implementation", defaultReplicationSourceImpl));
+      src = (ReplicationSourceInterface) c.newInstance();
+    } catch (Exception e) {
+      LOG.warn("Passed replication source implementation throws errors, "
+          + "defaulting to ReplicationSource",
+        e);
+      src = isQueueRecovered ? new RecoveredReplicationSource() : new ReplicationSource();
+    }
+    return src;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 8d5451c..4912948 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 /**
  * Interface that defines a replication source
@@ -65,10 +66,15 @@ public interface ReplicationSourceInterface {
   void enqueueLog(Path log);
 
   /**
-   * Get the current log that's replicated
-   * @return the current log
+   * Add hfile names to the queue to be replicated.
+   * @param tableName Name of the table these files belongs to
+   * @param family Name of the family these files belong to
+   * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
+   *          will be added in the queue for replication}
+   * @throws ReplicationException If failed to add hfile references
    */
-  Path getCurrentPath();
+  void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
+      throws ReplicationException;
 
   /**
    * Start the replication
@@ -89,6 +95,12 @@ public interface ReplicationSourceInterface {
   void terminate(String reason, Exception cause);
 
   /**
+   * Get the current log that's replicated
+   * @return the current log
+   */
+  Path getCurrentPath();
+
+  /**
    * Get the id that the source is replicating to
    *
    * @return peer cluster id
@@ -98,9 +110,9 @@ public interface ReplicationSourceInterface {
   /**
    * Get the id that the source is replicating to.
    *
-   * @return peer cluster id
+   * @return peer id
    */
-  String getPeerClusterId();
+  String getPeerId();
 
   /**
    * Get a string representation of the current statistics
@@ -110,14 +122,41 @@ public interface ReplicationSourceInterface {
   String getStats();
 
   /**
-   * Add hfile names to the queue to be replicated.
-   * @param tableName Name of the table these files belongs to
-   * @param family Name of the family these files belong to
-   * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
-   *          will be added in the queue for replication}
-   * @throws ReplicationException If failed to add hfile references
+   * @return peer enabled or not
    */
-  void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
-      throws ReplicationException;
+  boolean isPeerEnabled();
 
+  /**
+   * @return active or not
+   */
+  boolean isSourceActive();
+
+  /**
+   * @return metrics of this replication source
+   */
+  MetricsSource getSourceMetrics();
+
+  /**
+   * @return the replication endpoint used by this replication source
+   */
+  ReplicationEndpoint getReplicationEndpoint();
+
+  /**
+   * @return the replication source manager
+   */
+  ReplicationSourceManager getSourceManager();
+
+  /**
+   * Try to throttle when the peer config with a bandwidth
+   * @param batchSize entries size will be pushed
+   * @throws InterruptedException
+   */
+  void tryThrottle(int batchSize) throws InterruptedException;
+
+  /**
+   * Call this after the shipper thread ship some entries to peer cluster.
+   * @param entries pushed
+   * @param batchSize entries size pushed
+   */
+  void postShipEdits(List<Entry> entries, int batchSize);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a38e264..cb631c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -464,17 +464,8 @@ public class ReplicationSourceManager implements ReplicationListener {
       rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
     }
-    ReplicationSourceInterface src;
-    try {
-      @SuppressWarnings("rawtypes")
-      Class c = Class.forName(conf.get("replication.replicationsource.implementation",
-          ReplicationSource.class.getCanonicalName()));
-      src = (ReplicationSourceInterface) c.newInstance();
-    } catch (Exception e) {
-      LOG.warn("Passed replication source implementation throws errors, " +
-          "defaulting to ReplicationSource", e);
-      src = new ReplicationSource();
-    }
+
+    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, peerId);
 
     ReplicationEndpoint replicationEndpoint = null;
     try {
@@ -575,7 +566,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     synchronized (oldsources) {
       // First close all the recovered sources for this peer
       for (ReplicationSourceInterface src : oldsources) {
-        if (id.equals(src.getPeerClusterId())) {
+        if (id.equals(src.getPeerId())) {
           oldSourcesToDelete.add(src);
         }
       }
@@ -591,7 +582,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
     synchronized (this.replicationPeers) {
       for (ReplicationSourceInterface src : this.sources) {
-        if (id.equals(src.getPeerClusterId())) {
+        if (id.equals(src.getPeerId())) {
           srcToRemove.add(src);
         }
       }
@@ -752,7 +743,7 @@ public class ReplicationSourceManager implements ReplicationListener {
           // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
           // see removePeer
           synchronized (oldsources) {
-            if (!this.rp.getConnectedPeerIds().contains(src.getPeerClusterId())) {
+            if (!this.rp.getConnectedPeerIds().contains(src.getPeerId())) {
               src.terminate("Recovered queue doesn't belong to any current peer");
               closeRecoveredQueue(src);
               continue;
@@ -834,11 +825,11 @@ public class ReplicationSourceManager implements ReplicationListener {
   public String getStats() {
     StringBuffer stats = new StringBuffer();
     for (ReplicationSourceInterface source : sources) {
-      stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
+      stats.append("Normal source for cluster " + source.getPeerId() + ": ");
       stats.append(source.getStats() + "\n");
     }
     for (ReplicationSourceInterface oldSource : oldsources) {
-      stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
+      stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId()+": ");
       stats.append(oldSource.getStats()+ "\n");
     }
     return stats.toString();

http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
new file mode 100644
index 0000000..b0f7fee
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
@@ -0,0 +1,336 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
+ * ReplicationSourceWALReaderThread
+ */
+@InterfaceAudience.Private
+public class ReplicationSourceShipperThread extends Thread {
+  private static final Log LOG = LogFactory.getLog(ReplicationSourceShipperThread.class);
+
+  protected final Configuration conf;
+  protected final String walGroupId;
+  protected final PriorityBlockingQueue<Path> queue;
+  protected final ReplicationSourceInterface source;
+
+  // Last position in the log that we sent to ZooKeeper
+  protected long lastLoggedPosition = -1;
+  // Path of the current log
+  protected volatile Path currentPath;
+  // Indicates whether this particular worker is running
+  private boolean workerRunning = true;
+  protected ReplicationSourceWALReaderThread entryReader;
+
+  // How long should we sleep for each retry
+  protected final long sleepForRetries;
+  // Maximum number of retries before taking bold actions
+  protected final int maxRetriesMultiplier;
+
+  // Use guava cache to set ttl for each key
+  private final LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
+      .expireAfterAccess(1, TimeUnit.DAYS).build(
+      new CacheLoader<String, Boolean>() {
+        @Override
+        public Boolean load(String key) throws Exception {
+          return false;
+        }
+      }
+  );
+
+  public ReplicationSourceShipperThread(Configuration conf, String walGroupId,
+      PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
+    this.conf = conf;
+    this.walGroupId = walGroupId;
+    this.queue = queue;
+    this.source = source;
+    this.sleepForRetries =
+        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
+    this.maxRetriesMultiplier =
+        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
+  }
+
+  @Override
+  public void run() {
+    // Loop until we close down
+    while (isActive()) {
+      int sleepMultiplier = 1;
+      // Sleep until replication is enabled again
+      if (!source.isPeerEnabled()) {
+        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+        continue;
+      }
+
+      while (entryReader == null) {
+        if (sleepForRetries("Replication WAL entry reader thread not initialized",
+          sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+      }
+
+      try {
+        WALEntryBatch entryBatch = entryReader.take();
+        for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
+          waitingUntilCanPush(entry);
+        }
+        shipEdits(entryBatch);
+      } catch (InterruptedException e) {
+        LOG.trace("Interrupted while waiting for next replication entry batch", e);
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  /**
+   * Do the shipping logic
+   */
+  protected void shipEdits(WALEntryBatch entryBatch) {
+    List<Entry> entries = entryBatch.getWalEntries();
+    long lastReadPosition = entryBatch.getLastWalPosition();
+    currentPath = entryBatch.getLastWalPath();
+    int sleepMultiplier = 0;
+    if (entries.isEmpty()) {
+      if (lastLoggedPosition != lastReadPosition) {
+        // Save positions to meta table before zk.
+        updateSerialRepPositions(entryBatch.getLastSeqIds());
+        updateLogPosition(lastReadPosition);
+        // if there was nothing to ship and it's not an error
+        // set "ageOfLastShippedOp" to <now> to indicate that we're current
+        source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
+          walGroupId);
+      }
+      return;
+    }
+    int currentSize = (int) entryBatch.getHeapSize();
+    while (isActive()) {
+      try {
+        try {
+          source.tryThrottle(currentSize);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted while sleeping for throttling control");
+          Thread.currentThread().interrupt();
+          // current thread might be interrupted to terminate
+          // directly go back to while() for confirm this
+          continue;
+        }
+
+        // create replicateContext here, so the entries can be GC'd upon return from this call
+        // stack
+        ReplicationEndpoint.ReplicateContext replicateContext =
+            new ReplicationEndpoint.ReplicateContext();
+        replicateContext.setEntries(entries).setSize(currentSize);
+        replicateContext.setWalGroupId(walGroupId);
+
+        long startTimeNs = System.nanoTime();
+        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
+        boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
+        long endTimeNs = System.nanoTime();
+
+        if (!replicated) {
+          continue;
+        } else {
+          sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
+        }
+
+        if (this.lastLoggedPosition != lastReadPosition) {
+          //Clean up hfile references
+          int size = entries.size();
+          for (int i = 0; i < size; i++) {
+            cleanUpHFileRefs(entries.get(i).getEdit());
+          }
+
+          // Save positions to meta table before zk.
+          updateSerialRepPositions(entryBatch.getLastSeqIds());
+          //Log and clean up WAL logs
+          updateLogPosition(lastReadPosition);
+        }
+
+        source.postShipEdits(entries, currentSize);
+        // FIXME check relationship between wal group and overall
+        source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
+          entryBatch.getNbHFiles());
+        source.getSourceMetrics().setAgeOfLastShippedOp(
+          entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Replicated " + entries.size() + " entries or " + entryBatch.getNbOperations()
+              + " operations in " + ((endTimeNs - startTimeNs) / 1000000) + " ms");
+        }
+        break;
+      } catch (Exception ex) {
+        LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:"
+            + org.apache.hadoop.util.StringUtils.stringifyException(ex));
+        if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+      }
+    }
+  }
+
+  private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
+    String key = entry.getKey();
+    long seq = entry.getValue();
+    boolean deleteKey = false;
+    if (seq <= 0) {
+      // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
+      deleteKey = true;
+      seq = -seq;
+    }
+
+    if (!canSkipWaitingSet.getUnchecked(key)) {
+      try {
+        source.getSourceManager().waitUntilCanBePushed(Bytes.toBytes(key), seq, source.getPeerId());
+      } catch (IOException e) {
+        LOG.error("waitUntilCanBePushed fail", e);
+        throw new RuntimeException("waitUntilCanBePushed fail");
+      } catch (InterruptedException e) {
+        LOG.warn("waitUntilCanBePushed interrupted", e);
+        Thread.currentThread().interrupt();
+      }
+      canSkipWaitingSet.put(key, true);
+    }
+    if (deleteKey) {
+      canSkipWaitingSet.invalidate(key);
+    }
+  }
+
+  private void cleanUpHFileRefs(WALEdit edit) throws IOException {
+    String peerId = source.getPeerId();
+    if (peerId.contains("-")) {
+      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
+      // A peerId will not have "-" in its name, see HBASE-11394
+      peerId = peerId.split("-")[0];
+    }
+    List<Cell> cells = edit.getCells();
+    int totalCells = cells.size();
+    for (int i = 0; i < totalCells; i++) {
+      Cell cell = cells.get(i);
+      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+        List<StoreDescriptor> stores = bld.getStoresList();
+        int totalStores = stores.size();
+        for (int j = 0; j < totalStores; j++) {
+          List<String> storeFileList = stores.get(j).getStoreFileList();
+          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
+          source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
+        }
+      }
+    }
+  }
+
+  protected void updateLogPosition(long lastReadPosition) {
+    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
+      lastReadPosition, false, false);
+    lastLoggedPosition = lastReadPosition;
+  }
+
+  private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
+    try {
+      MetaTableAccessor.updateReplicationPositions(source.getSourceManager().getConnection(),
+        source.getPeerId(), lastPositionsForSerialScope);
+    } catch (IOException e) {
+      LOG.error("updateReplicationPositions fail", e);
+      throw new RuntimeException("updateReplicationPositions fail");
+    }
+  }
+
+  public void startup(UncaughtExceptionHandler handler) {
+    String name = Thread.currentThread().getName();
+    Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
+        + source.getPeerClusterZnode(), handler);
+  }
+
+  public PriorityBlockingQueue<Path> getLogQueue() {
+    return this.queue;
+  }
+
+  public Path getCurrentPath() {
+    return this.currentPath;
+  }
+
+  public long getCurrentPosition() {
+    return this.lastLoggedPosition;
+  }
+
+  public void setWALReader(ReplicationSourceWALReaderThread entryReader) {
+    this.entryReader = entryReader;
+  }
+
+  public long getStartPosition() {
+    return 0;
+  }
+
+  protected boolean isActive() {
+    return source.isSourceActive() && workerRunning && !isInterrupted();
+  }
+
+  public void setWorkerRunning(boolean workerRunning) {
+    entryReader.setReaderRunning(workerRunning);
+    this.workerRunning = workerRunning;
+  }
+
+  /**
+   * Do the sleeping logic
+   * @param msg Why we sleep
+   * @param sleepMultiplier by how many times the default sleeping time is augmented
+   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
+   */
+  public boolean sleepForRetries(String msg, int sleepMultiplier) {
+    try {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+      }
+      Thread.sleep(this.sleepForRetries * sleepMultiplier);
+    } catch (InterruptedException e) {
+      LOG.debug("Interrupted while sleeping between retries");
+      Thread.currentThread().interrupt();
+    }
+    return sleepMultiplier < maxRetriesMultiplier;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 29808e9..ad08866 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;

http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 57e54d7..3a7f77b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 /**
  * Source that does nothing at all, helpful to test ReplicationSourceManager
@@ -82,7 +83,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   }
 
   @Override
-  public String getPeerClusterId() {
+  public String getPeerId() {
     String[] parts = peerClusterId.split("-", 2);
     return parts.length != 1 ?
         parts[0] : peerClusterId;
@@ -98,4 +99,37 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
       throws ReplicationException {
     return;
   }
+
+  @Override
+  public boolean isPeerEnabled() {
+    return true;
+  }
+
+  @Override
+  public boolean isSourceActive() {
+    return true;
+  }
+
+  @Override
+  public MetricsSource getSourceMetrics() {
+    return null;
+  }
+
+  @Override
+  public ReplicationEndpoint getReplicationEndpoint() {
+    return null;
+  }
+
+  @Override
+  public ReplicationSourceManager getSourceManager() {
+    return manager;
+  }
+
+  @Override
+  public void tryThrottle(int batchSize) throws InterruptedException {
+  }
+
+  @Override
+  public void postShipEdits(List<Entry> entries, int batchSize) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 026f8e4..26aee6d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -462,7 +462,7 @@ public abstract class TestReplicationSourceManager {
       // Make sure that the replication source was not initialized
       List<ReplicationSourceInterface> sources = manager.getSources();
       for (ReplicationSourceInterface source : sources) {
-        assertNotEquals("FakePeer", source.getPeerClusterId());
+        assertNotEquals("FakePeer", source.getPeerId());
       }
 
       // Create a replication queue for the fake peer