You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/03/09 12:59:58 UTC

[25/40] hbase git commit: HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface

HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1e36a84a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1e36a84a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1e36a84a

Branch: refs/heads/branch-2
Commit: 1e36a84afc388987592f1ec003bfe6a665da4ffd
Parents: 4c6942d
Author: huzheng <op...@gmail.com>
Authored: Tue Dec 26 16:46:10 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Mar 9 20:55:48 2018 +0800

----------------------------------------------------------------------
 .../replication/ReplicationPeerConfigUtil.java  |  10 +-
 .../replication/VerifyReplication.java          |   9 +-
 .../hbase/replication/ReplicationFactory.java   |  10 +-
 .../hbase/replication/ReplicationPeerImpl.java  |  60 +-
 .../replication/ReplicationPeerStorage.java     |   3 +-
 .../hbase/replication/ReplicationPeers.java     | 238 ++++----
 .../replication/ReplicationPeersZKImpl.java     | 552 -------------------
 .../replication/ZKReplicationPeerStorage.java   |  12 +-
 .../replication/ZKReplicationStorageBase.java   |   3 +-
 .../replication/TestReplicationStateBasic.java  | 125 ++---
 .../replication/TestReplicationStateZKImpl.java |   2 +-
 .../TestZKReplicationPeerStorage.java           |  12 +-
 .../cleaner/ReplicationZKNodeCleaner.java       |  57 +-
 .../replication/ReplicationPeerManager.java     |   6 +-
 .../regionserver/DumpReplicationQueues.java     |   2 +-
 .../regionserver/PeerProcedureHandlerImpl.java  |  49 +-
 .../replication/regionserver/Replication.java   |   2 +-
 .../regionserver/ReplicationSource.java         |   7 +-
 .../regionserver/ReplicationSourceManager.java  |  44 +-
 .../cleaner/TestReplicationHFileCleaner.java    |   7 +-
 .../replication/TestMultiSlaveReplication.java  |   2 -
 .../TestReplicationTrackerZKImpl.java           |  26 +-
 .../TestReplicationSourceManager.java           |  17 +-
 .../hadoop/hbase/HBaseZKTestingUtility.java     |   3 +-
 24 files changed, 307 insertions(+), 951 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
index 022bf64..a234a9b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -247,22 +247,22 @@ public final class ReplicationPeerConfigUtil {
   public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
       throws DeserializationException {
     if (ProtobufUtil.isPBMagicPrefix(bytes)) {
-      int pblen = ProtobufUtil.lengthOfPBMagic();
+      int pbLen = ProtobufUtil.lengthOfPBMagic();
       ReplicationProtos.ReplicationPeer.Builder builder =
           ReplicationProtos.ReplicationPeer.newBuilder();
       ReplicationProtos.ReplicationPeer peer;
       try {
-        ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
+        ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
         peer = builder.build();
       } catch (IOException e) {
         throw new DeserializationException(e);
       }
       return convert(peer);
     } else {
-      if (bytes.length > 0) {
-        return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
+      if (bytes == null || bytes.length <= 0) {
+        throw new DeserializationException("Bytes to deserialize should not be empty.");
       }
-      return ReplicationPeerConfig.newBuilder().setClusterKey("").build();
+      return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 09d4b4b..f0070f0 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -339,15 +339,10 @@ public class VerifyReplication extends Configured implements Tool {
             @Override public boolean isAborted() {return false;}
           });
 
-      ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
+      ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf);
       rp.init();
 
-      Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
-      if (pair == null) {
-        throw new IOException("Couldn't get peer conf!");
-      }
-
-      return pair;
+      return Pair.newPair(rp.getPeerConfig(peerId), rp.getPeerClusterConfiguration(peerId));
     } catch (ReplicationException e) {
       throw new IOException(
           "An error occurred while trying to connect to the remove peer cluster", e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index 5e70e57..6c66aff 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -29,14 +29,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public class ReplicationFactory {
 
-  public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf,
-      Abortable abortable) {
-    return getReplicationPeers(zk, conf, null, abortable);
-  }
-
-  public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf,
-      ReplicationQueueStorage queueStorage, Abortable abortable) {
-    return new ReplicationPeersZKImpl(zk, conf, queueStorage, abortable);
+  public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf) {
+    return new ReplicationPeers(zk, conf);
   }
 
   public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper,

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
index 2c7ea9b..3e17025 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
@@ -18,28 +18,16 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 
 @InterfaceAudience.Private
 public class ReplicationPeerImpl implements ReplicationPeer {
-  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerImpl.class);
-
-  private final ReplicationPeerStorage peerStorage;
-
   private final Configuration conf;
 
   private final String id;
@@ -57,21 +45,21 @@ public class ReplicationPeerImpl implements ReplicationPeer {
    * @param id string representation of this peer's identifier
    * @param peerConfig configuration for the replication peer
    */
-  public ReplicationPeerImpl(ZKWatcher zkWatcher, Configuration conf, String id,
+  public ReplicationPeerImpl(Configuration conf, String id, boolean peerState,
       ReplicationPeerConfig peerConfig) {
-    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkWatcher, conf);
     this.conf = conf;
-    this.peerConfig = peerConfig;
     this.id = id;
+    this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED;
+    this.peerConfig = peerConfig;
     this.peerConfigListeners = new ArrayList<>();
   }
 
-  public void refreshPeerState() throws ReplicationException {
-    this.peerState = peerStorage.isPeerEnabled(id) ? PeerState.ENABLED : PeerState.DISABLED;
+  void setPeerState(boolean enabled) {
+    this.peerState = enabled ? PeerState.ENABLED : PeerState.DISABLED;
   }
 
-  public void refreshPeerConfig() throws ReplicationException {
-    this.peerConfig = peerStorage.getPeerConfig(id).orElse(peerConfig);
+  void setPeerConfig(ReplicationPeerConfig peerConfig) {
+    this.peerConfig = peerConfig;
     peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig));
   }
 
@@ -134,36 +122,4 @@ public class ReplicationPeerImpl implements ReplicationPeer {
   public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
     this.peerConfigListeners.add(listener);
   }
-
-  /**
-   * Parse the raw data from ZK to get a peer's state
-   * @param bytes raw ZK data
-   * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
-   * @throws DeserializationException
-   */
-  public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
-    ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes);
-    return ReplicationProtos.ReplicationState.State.ENABLED == state;
-  }
-
-  /**
-   * @param bytes Content of a state znode.
-   * @return State parsed from the passed bytes.
-   * @throws DeserializationException
-   */
-  private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
-      throws DeserializationException {
-    ProtobufUtil.expectPBMagicPrefix(bytes);
-    int pbLen = ProtobufUtil.lengthOfPBMagic();
-    ReplicationProtos.ReplicationState.Builder builder =
-        ReplicationProtos.ReplicationState.newBuilder();
-    ReplicationProtos.ReplicationState state;
-    try {
-      ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
-      state = builder.build();
-      return state.getState();
-    } catch (IOException e) {
-      throw new DeserializationException(e);
-    }
-  }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
index e00cd0d..1adda02 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.util.List;
-import java.util.Optional;
 
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -70,5 +69,5 @@ public interface ReplicationPeerStorage {
    * Get the peer config of a replication peer.
    * @throws ReplicationException if there are errors accessing the storage service.
    */
-  Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException;
+  ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index afc19bd..e58482e 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,58 +17,53 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import java.io.IOException;
 import java.util.Set;
-
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.CompoundConfiguration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
- * This provides an interface for maintaining a set of peer clusters. These peers are remote slave
- * clusters that data is replicated to. A peer cluster can be in three different states:
- *
- * 1. Not-Registered - There is no notion of the peer cluster.
- * 2. Registered - The peer has an id and is being tracked but there is no connection.
- * 3. Connected - There is an active connection to the remote peer.
- *
- * In the registered or connected state, a peer cluster can either be enabled or disabled.
+ * This provides an class for maintaining a set of peer clusters. These peers are remote slave
+ * clusters that data is replicated to.
  */
 @InterfaceAudience.Private
-public interface ReplicationPeers {
+public class ReplicationPeers {
 
-  /**
-   * Initialize the ReplicationPeers interface.
-   */
-  void init() throws ReplicationException;
+  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class);
 
-  /**
-   * Add a new remote slave cluster for replication.
-   * @param peerId a short that identifies the cluster
-   * @param peerConfig configuration for the replication slave cluster
-   */
-  default void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
-      throws ReplicationException {
-    registerPeer(peerId, peerConfig, true);
+  private final Configuration conf;
+
+  // Map of peer clusters keyed by their id
+  private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
+  private final ReplicationPeerStorage peerStorage;
+
+  protected ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
+    this.conf = conf;
+    this.peerCache = new ConcurrentHashMap<>();
+    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
   }
 
-  /**
-   * Add a new remote slave cluster for replication.
-   * @param peerId a short that identifies the cluster
-   * @param peerConfig configuration for the replication slave cluster
-   * @param enabled peer state, true if ENABLED and false if DISABLED
-   */
-  void registerPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
-      throws ReplicationException;
+  public void init() throws ReplicationException {
+    // Loading all existing peerIds into peer cache.
+    for (String peerId : this.peerStorage.listPeerIds()) {
+      addPeer(peerId);
+    }
+  }
 
-  /**
-   * Removes a remote slave cluster and stops the replication to it.
-   * @param peerId a short that identifies the cluster
-   */
-  void unregisterPeer(String peerId) throws ReplicationException;
+  @VisibleForTesting
+  public ReplicationPeerStorage getPeerStorage() {
+    return this.peerStorage;
+  }
 
   /**
    * Method called after a peer has been connected. It will create a ReplicationPeer to track the
@@ -78,111 +72,115 @@ public interface ReplicationPeers {
    * @return whether a ReplicationPeer was successfully created
    * @throws ReplicationException
    */
-  boolean peerConnected(String peerId) throws ReplicationException;
-
-  /**
-   * Method called after a peer has been disconnected. It will remove the ReplicationPeer that
-   * tracked the disconnected cluster.
-   * @param peerId a short that identifies the cluster
-   */
-  void peerDisconnected(String peerId);
+  public boolean addPeer(String peerId) throws ReplicationException {
+    if (this.peerCache.containsKey(peerId)) {
+      return false;
+    }
 
-  /**
-   * Restart the replication to the specified remote slave cluster.
-   * @param peerId a short that identifies the cluster
-   */
-  void enablePeer(String peerId) throws ReplicationException;
-
-  /**
-   * Stop the replication to the specified remote slave cluster.
-   * @param peerId a short that identifies the cluster
-   */
-  void disablePeer(String peerId) throws ReplicationException;
+    peerCache.put(peerId, createPeer(peerId));
+    return true;
+  }
 
-  /**
-   * Get the table and column-family list string of the peer from the underlying storage.
-   * @param peerId a short that identifies the cluster
-   */
-  public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId)
-      throws ReplicationException;
+  public void removePeer(String peerId) {
+    peerCache.remove(peerId);
+  }
 
   /**
-   * Set the table and column-family list string of the peer to the underlying storage.
+   * Get the peer state for the specified connected remote slave cluster. The value might be read
+   * from cache, so it is recommended to use {@link #peerStorage } to read storage directly if
+   * reading the state after enabling or disabling it.
    * @param peerId a short that identifies the cluster
-   * @param tableCFs the table and column-family list which will be replicated for this peer
+   * @return true if replication is enabled, false otherwise.
    */
-  public void setPeerTableCFsConfig(String peerId,
-                                    Map<TableName, ? extends Collection<String>>  tableCFs)
-      throws ReplicationException;
+  public boolean isPeerEnabled(String peerId) {
+    ReplicationPeer replicationPeer = this.peerCache.get(peerId);
+    if (replicationPeer == null) {
+      throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached");
+    }
+    return replicationPeer.getPeerState() == PeerState.ENABLED;
+  }
 
   /**
-   * Returns the ReplicationPeerImpl for the specified connected peer. This ReplicationPeer will
-   * continue to track changes to the Peer's state and config. This method returns null if no
-   * peer has been connected with the given peerId.
+   * Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will
+   * continue to track changes to the Peer's state and config. This method returns null if no peer
+   * has been cached with the given peerId.
    * @param peerId id for the peer
    * @return ReplicationPeer object
    */
-  ReplicationPeerImpl getConnectedPeer(String peerId);
+  public ReplicationPeerImpl getPeer(String peerId) {
+    return peerCache.get(peerId);
+  }
 
   /**
    * Returns the set of peerIds of the clusters that have been connected and have an underlying
    * ReplicationPeer.
    * @return a Set of Strings for peerIds
    */
-  public Set<String> getConnectedPeerIds();
+  public Set<String> getAllPeerIds() {
+    return peerCache.keySet();
+  }
 
-  /**
-   * Get the replication status for the specified connected remote slave cluster.
-   * The value might be read from cache, so it is recommended to
-   * use {@link #getStatusOfPeerFromBackingStore(String)}
-   * if reading the state after enabling or disabling it.
-   * @param peerId a short that identifies the cluster
-   * @return true if replication is enabled, false otherwise.
-   */
-  boolean getStatusOfPeer(String peerId);
+  public ReplicationPeerConfig getPeerConfig(String peerId) {
+    ReplicationPeer replicationPeer = this.peerCache.get(peerId);
+    if (replicationPeer == null) {
+      throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached");
+    }
+    return replicationPeer.getPeerConfig();
+  }
 
-  /**
-   * Get the replication status for the specified remote slave cluster, which doesn't
-   * have to be connected. The state is read directly from the backing store.
-   * @param peerId a short that identifies the cluster
-   * @return true if replication is enabled, false otherwise.
-   * @throws ReplicationException thrown if there's an error contacting the store
-   */
-  boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException;
+  public Configuration getPeerClusterConfiguration(String peerId) throws ReplicationException {
+    ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
 
-  /**
-   * List the cluster replication configs of all remote slave clusters (whether they are
-   * enabled/disabled or connected/disconnected).
-   * @return A map of peer ids to peer cluster keys
-   */
-  Map<String, ReplicationPeerConfig> getAllPeerConfigs();
+    Configuration otherConf;
+    try {
+      otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
+    } catch (IOException e) {
+      throw new ReplicationException("Can't get peer configuration for peerId=" + peerId, e);
+    }
 
-  /**
-   * List the peer ids of all remote slave clusters (whether they are enabled/disabled or
-   * connected/disconnected).
-   * @return A list of peer ids
-   */
-  List<String> getAllPeerIds();
+    if (!peerConfig.getConfiguration().isEmpty()) {
+      CompoundConfiguration compound = new CompoundConfiguration();
+      compound.add(otherConf);
+      compound.addStringMap(peerConfig.getConfiguration());
+      return compound;
+    }
 
-  /**
-   * Returns the configured ReplicationPeerConfig for this peerId
-   * @param peerId a short name that identifies the cluster
-   * @return ReplicationPeerConfig for the peer
-   */
-  ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException;
+    return otherConf;
+  }
 
-  /**
-   * Returns the configuration needed to talk to the remote slave cluster.
-   * @param peerId a short that identifies the cluster
-   * @return the configuration for the peer cluster, null if it was unable to get the configuration
-   */
-  Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException;
+  public PeerState refreshPeerState(String peerId) throws ReplicationException {
+    ReplicationPeerImpl peer = peerCache.get(peerId);
+    if (peer == null) {
+      throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
+    }
+    peer.setPeerState(peerStorage.isPeerEnabled(peerId));
+    return peer.getPeerState();
+  }
+
+  public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException {
+    ReplicationPeerImpl peer = peerCache.get(peerId);
+    if (peer == null) {
+      throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
+    }
+    peer.setPeerConfig(peerStorage.getPeerConfig(peerId));
+    return peer.getPeerConfig();
+  }
 
   /**
+<<<<<<< 2bb2fd611d4b88c724a2b561f10433b56c6fd3dd
    * Update the peerConfig for the a given peer cluster
    * @param id a short that identifies the cluster
    * @param peerConfig new config for the peer cluster
    * @throws ReplicationException
-   */
-  void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException;
+=======
+   * Helper method to connect to a peer
+   * @param peerId peer's identifier
+   * @return object representing the peer
+>>>>>>> HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface
+   */
+  private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
+    ReplicationPeerConfig peerConf = peerStorage.getPeerConfig(peerId);
+    boolean enabled = peerStorage.isPeerEnabled(peerId);
+    return new ReplicationPeerImpl(getPeerClusterConfiguration(peerId), peerId, enabled, peerConf);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
deleted file mode 100644
index 4d040e9..0000000
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.CompoundConfiguration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class provides an implementation of the ReplicationPeers interface using ZooKeeper. The
- * peers znode contains a list of all peer replication clusters and the current replication state of
- * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
- * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
- * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
- * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
- * For example:
- *
- *  /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
- *  /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
- *
- * Each of these peer znodes has a child znode that indicates whether or not replication is enabled
- * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
- * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
- * ReplicationPeer.PeerStateTracker class. For example:
- *
- * /hbase/replication/peers/1/peer-state [Value: ENABLED]
- *
- * Each of these peer znodes has a child znode that indicates which data will be replicated
- * to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a
- * table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker
- * class. For example:
- *
- * /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"]
- */
-@InterfaceAudience.Private
-public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
-
-  // Map of peer clusters keyed by their id
-  private ConcurrentMap<String, ReplicationPeerImpl> peerClusters;
-  private final ReplicationQueueStorage queueStorage;
-  private Abortable abortable;
-
-  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeersZKImpl.class);
-
-  public ReplicationPeersZKImpl(ZKWatcher zk, Configuration conf,
-      ReplicationQueueStorage queueStorage, Abortable abortable) {
-    super(zk, conf, abortable);
-    this.abortable = abortable;
-    this.peerClusters = new ConcurrentHashMap<>();
-    this.queueStorage = queueStorage;
-  }
-
-  @Override
-  public void init() throws ReplicationException {
-    try {
-      if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
-        ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
-      }
-    } catch (KeeperException e) {
-      throw new ReplicationException("Could not initialize replication peers", e);
-    }
-    addExistingPeers();
-  }
-
-  @Override
-  public void registerPeer(String id, ReplicationPeerConfig peerConfig, boolean enabled)
-      throws ReplicationException {
-    try {
-      if (peerExists(id)) {
-        throw new IllegalArgumentException("Cannot add a peer with id=" + id
-            + " because that id already exists.");
-      }
-
-      if(id.contains("-")){
-        throw new IllegalArgumentException("Found invalid peer name:" + id);
-      }
-
-      if (peerConfig.getClusterKey() != null) {
-        try {
-          ZKConfig.validateClusterKey(peerConfig.getClusterKey());
-        } catch (IOException ioe) {
-          throw new IllegalArgumentException(ioe.getMessage());
-        }
-      }
-
-      checkQueuesDeleted(id);
-
-      ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
-
-      List<ZKUtilOp> listOfOps = new ArrayList<>(2);
-      ZKUtilOp op1 =
-          ZKUtilOp.createAndFailSilent(getPeerNode(id),
-            ReplicationPeerConfigUtil.toByteArray(peerConfig));
-      ZKUtilOp op2 =
-          ZKUtilOp.createAndFailSilent(getPeerStateNode(id), enabled ? ENABLED_ZNODE_BYTES
-              : DISABLED_ZNODE_BYTES);
-      listOfOps.add(op1);
-      listOfOps.add(op2);
-      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
-    } catch (KeeperException e) {
-      throw new ReplicationException("Could not add peer with id=" + id + ", peerConfif=>"
-          + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
-    }
-  }
-
-  @Override
-  public void unregisterPeer(String id) throws ReplicationException {
-    try {
-      if (!peerExists(id)) {
-        throw new IllegalArgumentException("Cannot remove peer with id=" + id
-            + " because that id does not exist.");
-      }
-      ZKUtil.deleteNodeRecursively(this.zookeeper, ZNodePaths.joinZNode(this.peersZNode, id));
-    } catch (KeeperException e) {
-      throw new ReplicationException("Could not remove peer with id=" + id, e);
-    }
-  }
-
-  @Override
-  public void enablePeer(String id) throws ReplicationException {
-    changePeerState(id, ReplicationProtos.ReplicationState.State.ENABLED);
-    LOG.info("peer " + id + " is enabled");
-  }
-
-  @Override
-  public void disablePeer(String id) throws ReplicationException {
-    changePeerState(id, ReplicationProtos.ReplicationState.State.DISABLED);
-    LOG.info("peer " + id + " is disabled");
-  }
-
-  @Override
-  public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException {
-    try {
-      if (!peerExists(id)) {
-        throw new IllegalArgumentException("peer " + id + " doesn't exist");
-      }
-      try {
-        ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
-        if (rpc == null) {
-          throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
-        }
-        return rpc.getTableCFsMap();
-      } catch (Exception e) {
-        throw new ReplicationException(e);
-      }
-    } catch (KeeperException e) {
-      throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
-    }
-  }
-
-  @Override
-  public void setPeerTableCFsConfig(String id,
-                                    Map<TableName, ? extends Collection<String>>  tableCFs)
-      throws ReplicationException {
-    try {
-      if (!peerExists(id)) {
-        throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
-            + " does not exist.");
-      }
-      ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
-      if (rpc == null) {
-        throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
-      }
-      rpc.setTableCFsMap(tableCFs);
-      ZKUtil.setData(this.zookeeper, getPeerNode(id),
-          ReplicationPeerConfigUtil.toByteArray(rpc));
-      LOG.info("Peer tableCFs with id= " + id + " is now " +
-        ReplicationPeerConfigUtil.convertToString(tableCFs));
-    } catch (KeeperException e) {
-      throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
-    }
-  }
-
-  @Override
-  public boolean getStatusOfPeer(String id) {
-    ReplicationPeer replicationPeer = this.peerClusters.get(id);
-    if (replicationPeer == null) {
-      throw new IllegalArgumentException("Peer with id= " + id + " is not cached");
-    }
-    return replicationPeer.getPeerState() == PeerState.ENABLED;
-  }
-
-  @Override
-  public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
-    try {
-      if (!peerExists(id)) {
-        throw new IllegalArgumentException("peer " + id + " doesn't exist");
-      }
-      String peerStateZNode = getPeerStateNode(id);
-      try {
-        return ReplicationPeerImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
-      } catch (KeeperException e) {
-        throw new ReplicationException(e);
-      } catch (DeserializationException e) {
-        throw new ReplicationException(e);
-      }
-    } catch (KeeperException e) {
-      throw new ReplicationException("Unable to get status of the peer with id=" + id +
-          " from backing store", e);
-    } catch (InterruptedException e) {
-      throw new ReplicationException(e);
-    }
-  }
-
-  @Override
-  public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
-    Map<String, ReplicationPeerConfig> peers = new TreeMap<>();
-    List<String> ids = null;
-    try {
-      ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
-      for (String id : ids) {
-        ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
-        if (peerConfig == null) {
-          LOG.warn("Failed to get replication peer configuration of clusterid=" + id
-            + " znode content, continuing.");
-          continue;
-        }
-        peers.put(id, peerConfig);
-      }
-    } catch (KeeperException e) {
-      this.abortable.abort("Cannot get the list of peers ", e);
-    } catch (ReplicationException e) {
-      this.abortable.abort("Cannot get the list of peers ", e);
-    }
-    return peers;
-  }
-
-  @Override
-  public ReplicationPeerImpl getConnectedPeer(String peerId) {
-    return peerClusters.get(peerId);
-  }
-
-  @Override
-  public Set<String> getConnectedPeerIds() {
-    return peerClusters.keySet(); // this is not thread-safe
-  }
-
-  /**
-   * Returns a ReplicationPeerConfig from the znode or null for the given peerId.
-   */
-  @Override
-  public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
-      throws ReplicationException {
-    String znode = getPeerNode(peerId);
-    byte[] data = null;
-    try {
-      data = ZKUtil.getData(this.zookeeper, znode);
-    } catch (InterruptedException e) {
-      LOG.warn("Could not get configuration for peer because the thread " +
-          "was interrupted. peerId=" + peerId);
-      Thread.currentThread().interrupt();
-      return null;
-    } catch (KeeperException e) {
-      throw new ReplicationException("Error getting configuration for peer with id="
-          + peerId, e);
-    }
-    if (data == null) {
-      LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
-      return null;
-    }
-
-    try {
-      return ReplicationPeerConfigUtil.parsePeerFrom(data);
-    } catch (DeserializationException e) {
-      LOG.warn("Failed to parse cluster key from peerId=" + peerId
-          + ", specifically the content from the following znode: " + znode);
-      return null;
-    }
-  }
-
-  @Override
-  public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
-      throws ReplicationException {
-    ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
-
-    if (peerConfig == null) {
-      return null;
-    }
-
-    Configuration otherConf;
-    try {
-      otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
-    } catch (IOException e) {
-      LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
-      return null;
-    }
-
-    if (!peerConfig.getConfiguration().isEmpty()) {
-      CompoundConfiguration compound = new CompoundConfiguration();
-      compound.add(otherConf);
-      compound.addStringMap(peerConfig.getConfiguration());
-      return new Pair<>(peerConfig, compound);
-    }
-
-    return new Pair<>(peerConfig, otherConf);
-  }
-
-  @Override
-  public void updatePeerConfig(String id, ReplicationPeerConfig newConfig)
-      throws ReplicationException {
-    ReplicationPeer peer = getConnectedPeer(id);
-    if (peer == null){
-      throw new ReplicationException("Could not find peer Id " + id + " in connected peers");
-    }
-    ReplicationPeerConfig existingConfig = peer.getPeerConfig();
-    if (!isStringEquals(newConfig.getClusterKey(), existingConfig.getClusterKey())) {
-      throw new ReplicationException(
-          "Changing the cluster key on an existing peer is not allowed." + " Existing key '" +
-              existingConfig.getClusterKey() + "' does not match new key '" +
-              newConfig.getClusterKey() + "'");
-    }
-    if (!isStringEquals(newConfig.getReplicationEndpointImpl(),
-      existingConfig.getReplicationEndpointImpl())) {
-      throw new ReplicationException("Changing the replication endpoint implementation class " +
-          "on an existing peer is not allowed. Existing class '" +
-          existingConfig.getReplicationEndpointImpl() + "' does not match new class '" +
-          newConfig.getReplicationEndpointImpl() + "'");
-    }
-
-    // Update existingConfig's peer config and peer data with the new values, but don't touch config
-    // or data that weren't explicitly changed
-    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(existingConfig);
-    builder.putAllConfiguration(newConfig.getConfiguration())
-        .putAllPeerData(newConfig.getPeerData())
-        .setReplicateAllUserTables(newConfig.replicateAllUserTables())
-        .setNamespaces(newConfig.getNamespaces()).setTableCFsMap(newConfig.getTableCFsMap())
-        .setExcludeNamespaces(newConfig.getExcludeNamespaces())
-        .setExcludeTableCFsMap(newConfig.getExcludeTableCFsMap())
-        .setBandwidth(newConfig.getBandwidth());
-
-    try {
-      ZKUtil.setData(this.zookeeper, getPeerNode(id),
-          ReplicationPeerConfigUtil.toByteArray(builder.build()));
-    }
-    catch(KeeperException ke){
-      throw new ReplicationException("There was a problem trying to save changes to the " +
-          "replication peer " + id, ke);
-    }
-  }
-
-  /**
-   * List all registered peer clusters and set a watch on their znodes.
-   */
-  @Override
-  public List<String> getAllPeerIds() {
-    List<String> ids = null;
-    try {
-      ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Cannot get the list of peers ", e);
-    }
-    return ids;
-  }
-
-  /**
-   * A private method used during initialization. This method attempts to add all registered
-   * peer clusters. This method does not set a watch on the peer cluster znodes.
-   */
-  private void addExistingPeers() throws ReplicationException {
-    List<String> znodes = null;
-    try {
-      znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
-    } catch (KeeperException e) {
-      throw new ReplicationException("Error getting the list of peer clusters.", e);
-    }
-    if (znodes != null) {
-      for (String z : znodes) {
-        createAndAddPeer(z);
-      }
-    }
-  }
-
-  @Override
-  public boolean peerConnected(String peerId) throws ReplicationException {
-    return createAndAddPeer(peerId);
-  }
-
-  @Override
-  public void peerDisconnected(String peerId) {
-    ReplicationPeer rp = this.peerClusters.get(peerId);
-    if (rp != null) {
-      peerClusters.remove(peerId, rp);
-    }
-  }
-
-  /**
-   * Attempt to connect to a new remote slave cluster.
-   * @param peerId a short that identifies the cluster
-   * @return true if a new connection was made, false if no new connection was made.
-   */
-  public boolean createAndAddPeer(String peerId) throws ReplicationException {
-    if (peerClusters == null) {
-      return false;
-    }
-    if (this.peerClusters.containsKey(peerId)) {
-      return false;
-    }
-
-    ReplicationPeerImpl peer = null;
-    try {
-      peer = createPeer(peerId);
-    } catch (Exception e) {
-      throw new ReplicationException("Error adding peer with id=" + peerId, e);
-    }
-    if (peer == null) {
-      return false;
-    }
-    ReplicationPeerImpl previous = peerClusters.putIfAbsent(peerId, peer);
-    if (previous == null) {
-      LOG.info("Added peer cluster=" + peer.getPeerConfig().getClusterKey());
-    } else {
-      LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() +
-        ", new cluster=" + peer.getPeerConfig().getClusterKey());
-    }
-    return true;
-  }
-
-  /**
-   * Update the state znode of a peer cluster.
-   * @param id
-   * @param state
-   */
-  private void changePeerState(String id, ReplicationProtos.ReplicationState.State state)
-      throws ReplicationException {
-    try {
-      if (!peerExists(id)) {
-        throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
-            + " does not exist.");
-      }
-      String peerStateZNode = getPeerStateNode(id);
-      byte[] stateBytes =
-          (state == ReplicationProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
-              : DISABLED_ZNODE_BYTES;
-      if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
-        ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
-      } else {
-        ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
-      }
-      LOG.info("Peer with id= " + id + " is now " + state.name());
-    } catch (KeeperException e) {
-      throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
-    }
-  }
-
-  /**
-   * Helper method to connect to a peer
-   * @param peerId peer's identifier
-   * @return object representing the peer
-   * @throws ReplicationException
-   */
-  private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
-    Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
-    if (pair == null) {
-      return null;
-    }
-    Configuration peerConf = pair.getSecond();
-
-    ReplicationPeerImpl peer =
-        new ReplicationPeerImpl(zookeeper, peerConf, peerId, pair.getFirst());
-
-    // Load peer state and peer config by reading zookeeper directly.
-    peer.refreshPeerState();
-    peer.refreshPeerConfig();
-
-    return peer;
-  }
-
-  private void checkQueuesDeleted(String peerId) throws ReplicationException {
-    if (queueStorage == null) {
-      return;
-    }
-    try {
-      List<ServerName> replicators = queueStorage.getListOfReplicators();
-      if (replicators == null || replicators.isEmpty()) {
-        return;
-      }
-      for (ServerName replicator : replicators) {
-        List<String> queueIds = queueStorage.getAllQueues(replicator);
-        for (String queueId : queueIds) {
-          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
-          if (queueInfo.getPeerId().equals(peerId)) {
-            throw new IllegalArgumentException("undeleted queue for peerId: " + peerId
-                + ", replicator: " + replicator + ", queueId: " + queueId);
-          }
-        }
-      }
-      // Check for hfile-refs queue
-      if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
-          && queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
-        throw new IllegalArgumentException("Undeleted queue for peerId: " + peerId
-            + ", found in hfile-refs node path " + hfileRefsZNode);
-      }
-    } catch (KeeperException e) {
-      throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
-    }
-  }
-
-  /**
-   * For replication peer cluster key or endpoint class, null and empty string is same. So here
-   * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly.
-   */
-  private boolean isStringEquals(String s1, String s2) {
-    if (StringUtils.isBlank(s1)) {
-      return StringUtils.isBlank(s2);
-    }
-    return s1.equals(s2);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
index 49af4c3..bf448e8 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.Optional;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
@@ -144,7 +143,7 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli
   }
 
   @Override
-  public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException {
+  public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException {
     byte[] data;
     try {
       data = ZKUtil.getData(zookeeper, getPeerNode(peerId));
@@ -152,13 +151,14 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli
       throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e);
     }
     if (data == null || data.length == 0) {
-      return Optional.empty();
+      throw new ReplicationException(
+          "Replication peer config data shouldn't be empty, peerId=" + peerId);
     }
     try {
-      return Optional.of(ReplicationPeerConfigUtil.parsePeerFrom(data));
+      return ReplicationPeerConfigUtil.parsePeerFrom(data);
     } catch (DeserializationException e) {
-      LOG.warn("Failed to parse replication peer config for peer with id=" + peerId, e);
-      return Optional.empty();
+      throw new ReplicationException(
+          "Failed to parse replication peer config for peer with id=" + peerId, e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
index b8a2044..d09a56b 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
@@ -48,8 +48,7 @@ class ZKReplicationStorageBase {
     String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
 
     this.replicationZNode =
-      ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName);
-
+        ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 4afda5d..2589199 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -55,7 +55,6 @@ public abstract class TestReplicationStateBasic {
   protected static String KEY_TWO;
 
   // For testing when we try to replicate to ourself
-  protected String OUR_ID = "3";
   protected String OUR_KEY;
 
   protected static int zkTimeoutCount;
@@ -152,37 +151,6 @@ public abstract class TestReplicationStateBasic {
   }
 
   @Test
-  public void testInvalidClusterKeys() throws ReplicationException, KeeperException {
-    rp.init();
-
-    try {
-      rp.registerPeer(ID_ONE,
-        new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"));
-      fail("Should throw an IllegalArgumentException because " +
-        "zookeeper.znode.parent is missing leading '/'.");
-    } catch (IllegalArgumentException e) {
-      // Expected.
-    }
-
-    try {
-      rp.registerPeer(ID_ONE,
-        new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/"));
-      fail("Should throw an IllegalArgumentException because zookeeper.znode.parent is missing.");
-    } catch (IllegalArgumentException e) {
-      // Expected.
-    }
-
-    try {
-      rp.registerPeer(ID_ONE,
-        new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"));
-      fail("Should throw an IllegalArgumentException because " +
-        "hbase.zookeeper.property.clientPort is missing.");
-    } catch (IllegalArgumentException e) {
-      // Expected.
-    }
-  }
-
-  @Test
   public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
     rp.init();
 
@@ -192,7 +160,8 @@ public abstract class TestReplicationStateBasic {
     files1.add(new Pair<>(null, new Path("file_3")));
     assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
     assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
-    rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
+    rp.getPeerStorage().addPeer(ID_ONE,
+            ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
     rqs.addPeerToHFileRefs(ID_ONE);
     rqs.addHFileRefs(ID_ONE, files1);
     assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
@@ -208,15 +177,17 @@ public abstract class TestReplicationStateBasic {
     hfiles2.add(removedString);
     rqs.removeHFileRefs(ID_ONE, hfiles2);
     assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size());
-    rp.unregisterPeer(ID_ONE);
+    rp.getPeerStorage().removePeer(ID_ONE);
   }
 
   @Test
   public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
     rp.init();
-    rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
+    rp.getPeerStorage().addPeer(ID_ONE,
+      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
     rqs.addPeerToHFileRefs(ID_ONE);
-    rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
+    rp.getPeerStorage().addPeer(ID_TWO,
+      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true);
     rqs.addPeerToHFileRefs(ID_TWO);
 
     List<Pair<Path, Path>> files1 = new ArrayList<>(3);
@@ -229,13 +200,13 @@ public abstract class TestReplicationStateBasic {
     assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
     assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
 
-    rp.unregisterPeer(ID_ONE);
+    rp.getPeerStorage().removePeer(ID_ONE);
     rqs.removePeerFromHFileRefs(ID_ONE);
     assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
     assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
     assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
 
-    rp.unregisterPeer(ID_TWO);
+    rp.getPeerStorage().removePeer(ID_TWO);
     rqs.removePeerFromHFileRefs(ID_TWO);
     assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
     assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty());
@@ -245,74 +216,77 @@ public abstract class TestReplicationStateBasic {
   public void testReplicationPeers() throws Exception {
     rp.init();
 
-    // Test methods with non-existent peer ids
     try {
-      rp.unregisterPeer("bogus");
+      rp.getPeerStorage().setPeerState("bogus", true);
       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
-    } catch (IllegalArgumentException e) {
+    } catch (ReplicationException e) {
     }
     try {
-      rp.enablePeer("bogus");
+      rp.getPeerStorage().setPeerState("bogus", false);
       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
-    } catch (IllegalArgumentException e) {
+    } catch (ReplicationException e) {
     }
     try {
-      rp.disablePeer("bogus");
+      rp.isPeerEnabled("bogus");
       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
     } catch (IllegalArgumentException e) {
     }
+
     try {
-      rp.getStatusOfPeer("bogus");
-      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
-    } catch (IllegalArgumentException e) {
+      assertFalse(rp.addPeer("bogus"));
+      fail("Should have thrown an ReplicationException when passed a bogus peerId");
+    } catch (ReplicationException e) {
+    }
+
+    try {
+      assertNull(rp.getPeerClusterConfiguration("bogus"));
+      fail("Should have thrown an ReplicationException when passed a bogus peerId");
+    } catch (ReplicationException e) {
     }
-    assertFalse(rp.peerConnected("bogus"));
-    rp.peerDisconnected("bogus");
 
-    assertNull(rp.getPeerConf("bogus"));
     assertNumberOfPeers(0);
 
     // Add some peers
-    rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
+    rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
     assertNumberOfPeers(1);
-    rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
+    rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
     assertNumberOfPeers(2);
 
     // Test methods with a peer that is added but not connected
     try {
-      rp.getStatusOfPeer(ID_ONE);
+      rp.isPeerEnabled(ID_ONE);
       fail("There are no connected peers, should have thrown an IllegalArgumentException");
     } catch (IllegalArgumentException e) {
     }
-    assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
-    rp.unregisterPeer(ID_ONE);
-    rp.peerDisconnected(ID_ONE);
+    assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerClusterConfiguration(ID_ONE)));
+    rp.getPeerStorage().removePeer(ID_ONE);
+    rp.removePeer(ID_ONE);
     assertNumberOfPeers(1);
 
     // Add one peer
-    rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
-    rp.peerConnected(ID_ONE);
+    rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
+    rp.addPeer(ID_ONE);
     assertNumberOfPeers(2);
-    assertTrue(rp.getStatusOfPeer(ID_ONE));
-    rp.disablePeer(ID_ONE);
+    assertTrue(rp.isPeerEnabled(ID_ONE));
+    rp.getPeerStorage().setPeerState(ID_ONE, false);
     // now we do not rely on zk watcher to trigger the state change so we need to trigger it
     // manually...
-    ReplicationPeerImpl peer = rp.getConnectedPeer(ID_ONE);
-    peer.refreshPeerState();
+    ReplicationPeerImpl peer = rp.getPeer(ID_ONE);
+    rp.refreshPeerState(peer.getId());
     assertEquals(PeerState.DISABLED, peer.getPeerState());
     assertConnectedPeerStatus(false, ID_ONE);
-    rp.enablePeer(ID_ONE);
+    rp.getPeerStorage().setPeerState(ID_ONE, true);
     // now we do not rely on zk watcher to trigger the state change so we need to trigger it
     // manually...
-    peer.refreshPeerState();
+    rp.refreshPeerState(peer.getId());
     assertEquals(PeerState.ENABLED, peer.getPeerState());
     assertConnectedPeerStatus(true, ID_ONE);
 
     // Disconnect peer
-    rp.peerDisconnected(ID_ONE);
+    rp.removePeer(ID_ONE);
     assertNumberOfPeers(2);
     try {
-      rp.getStatusOfPeer(ID_ONE);
+      rp.isPeerEnabled(ID_ONE);
       fail("There are no connected peers, should have thrown an IllegalArgumentException");
     } catch (IllegalArgumentException e) {
     }
@@ -320,16 +294,16 @@ public abstract class TestReplicationStateBasic {
 
   protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
     // we can first check if the value was changed in the store, if it wasn't then fail right away
-    if (status != rp.getStatusOfPeerFromBackingStore(peerId)) {
+    if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {
       fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
     }
     while (true) {
-      if (status == rp.getStatusOfPeer(peerId)) {
+      if (status == rp.isPeerEnabled(peerId)) {
         return;
       }
       if (zkTimeoutCount < ZK_MAX_COUNT) {
-        LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status +
-          ", sleeping and trying again.");
+        LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
+            + ", sleeping and trying again.");
         Thread.sleep(ZK_SLEEP_INTERVAL);
       } else {
         fail("Timed out waiting for ConnectedPeerStatus to be " + status);
@@ -337,10 +311,8 @@ public abstract class TestReplicationStateBasic {
     }
   }
 
-  protected void assertNumberOfPeers(int total) {
-    assertEquals(total, rp.getAllPeerConfigs().size());
-    assertEquals(total, rp.getAllPeerIds().size());
-    assertEquals(total, rp.getAllPeerIds().size());
+  protected void assertNumberOfPeers(int total) throws ReplicationException {
+    assertEquals(total, rp.getPeerStorage().listPeerIds().size());
   }
 
   /*
@@ -359,8 +331,9 @@ public abstract class TestReplicationStateBasic {
         rqs.addWAL(server3, "qId" + i, "filename" + j);
       }
       // Add peers for the corresponding queues so they are not orphans
-      rp.registerPeer("qId" + i,
-        new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i));
+      rp.getPeerStorage().addPeer("qId" + i,
+        ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(),
+        true);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index e7c8b3b..1830103 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -84,7 +84,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
   public void setUp() {
     zkTimeoutCount = 0;
     rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
-    rp = ReplicationFactory.getReplicationPeers(zkw, conf, new WarnOnlyAbortable());
+    rp = ReplicationFactory.getReplicationPeers(zkw, conf);
     OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
index a3be1e6..e8098c8 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -143,14 +144,14 @@ public class TestZKReplicationPeerStorage {
     assertEquals(peerCount, peerIds.size());
     for (String peerId : peerIds) {
       int seed = Integer.parseInt(peerId);
-      assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId).get());
+      assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
     }
     for (int i = 0; i < peerCount; i++) {
       STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
     }
     for (String peerId : peerIds) {
       int seed = Integer.parseInt(peerId);
-      assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId).get());
+      assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
     }
     for (int i = 0; i < peerCount; i++) {
       assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
@@ -166,6 +167,11 @@ public class TestZKReplicationPeerStorage {
     peerIds = STORAGE.listPeerIds();
     assertEquals(peerCount - 1, peerIds.size());
     assertFalse(peerIds.contains(toRemove));
-    assertFalse(STORAGE.getPeerConfig(toRemove).isPresent());
+
+    try {
+      STORAGE.getPeerConfig(toRemove);
+      fail("Should throw a ReplicationException when get peer config of a peerId");
+    } catch (ReplicationException e) {
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
index af41399..f2c3ec9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
@@ -30,8 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
@@ -51,20 +50,14 @@ import org.slf4j.LoggerFactory;
 public class ReplicationZKNodeCleaner {
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleaner.class);
   private final ReplicationQueueStorage queueStorage;
-  private final ReplicationPeers replicationPeers;
+  private final ReplicationPeerStorage peerStorage;
   private final ReplicationQueueDeletor queueDeletor;
 
   public ReplicationZKNodeCleaner(Configuration conf, ZKWatcher zkw, Abortable abortable)
       throws IOException {
-    try {
-      this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
-      this.replicationPeers =
-          ReplicationFactory.getReplicationPeers(zkw, conf, this.queueStorage, abortable);
-      this.replicationPeers.init();
-      this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
-    } catch (ReplicationException e) {
-      throw new IOException("failed to construct ReplicationZKNodeCleaner", e);
-    }
+    this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
+    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf);
+    this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
   }
 
   /**
@@ -73,8 +66,8 @@ public class ReplicationZKNodeCleaner {
    */
   public Map<ServerName, List<String>> getUnDeletedQueues() throws IOException {
     Map<ServerName, List<String>> undeletedQueues = new HashMap<>();
-    Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
     try {
+      Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
       List<ServerName> replicators = this.queueStorage.getListOfReplicators();
       if (replicators == null || replicators.isEmpty()) {
         return undeletedQueues;
@@ -84,8 +77,7 @@ public class ReplicationZKNodeCleaner {
         for (String queueId : queueIds) {
           ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
           if (!peerIds.contains(queueInfo.getPeerId())) {
-            undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<>()).add(
-              queueId);
+            undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<>()).add(queueId);
             if (LOG.isDebugEnabled()) {
               LOG.debug("Undeleted replication queue for removed peer found: "
                   + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
@@ -106,9 +98,9 @@ public class ReplicationZKNodeCleaner {
    */
   public Set<String> getUnDeletedHFileRefsQueues() throws IOException {
     Set<String> undeletedHFileRefsQueue = new HashSet<>();
-    Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
     String hfileRefsZNode = queueDeletor.getHfileRefsZNode();
     try {
+      Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
       List<String> listOfPeers = this.queueStorage.getAllPeersFromHFileRefsQueue();
       Set<String> peers = new HashSet<>(listOfPeers);
       peers.removeAll(peerIds);
@@ -116,15 +108,15 @@ public class ReplicationZKNodeCleaner {
         undeletedHFileRefsQueue.addAll(peers);
       }
     } catch (ReplicationException e) {
-      throw new IOException(
-          "Failed to get list of all peers from hfile-refs znode " + hfileRefsZNode, e);
+      throw new IOException("Failed to get list of all peers from hfile-refs znode "
+          + hfileRefsZNode, e);
     }
     return undeletedHFileRefsQueue;
   }
 
   private class ReplicationQueueDeletor extends ReplicationStateZKBase {
 
-    public ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) {
+    ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) {
       super(zk, conf, abortable);
     }
 
@@ -132,19 +124,20 @@ public class ReplicationZKNodeCleaner {
      * @param replicator The regionserver which has undeleted queue
      * @param queueId The undeleted queue id
      */
-    public void removeQueue(final ServerName replicator, final String queueId) throws IOException {
-      String queueZnodePath = ZNodePaths
-          .joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()), queueId);
+    void removeQueue(final ServerName replicator, final String queueId) throws IOException {
+      String queueZnodePath =
+          ZNodePaths.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()),
+            queueId);
       try {
         ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
-        if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) {
+        if (!peerStorage.listPeerIds().contains(queueInfo.getPeerId())) {
           ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
-          LOG.info("Successfully removed replication queue, replicator: " + replicator +
-            ", queueId: " + queueId);
+          LOG.info("Successfully removed replication queue, replicator: " + replicator
+              + ", queueId: " + queueId);
         }
-      } catch (KeeperException e) {
-        throw new IOException(
-            "Failed to delete queue, replicator: " + replicator + ", queueId: " + queueId);
+      } catch (ReplicationException | KeeperException e) {
+        throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: "
+            + queueId);
       }
     }
 
@@ -152,17 +145,17 @@ public class ReplicationZKNodeCleaner {
      * @param hfileRefsQueueId The undeleted hfile-refs queue id
      * @throws IOException
      */
-    public void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException {
+    void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException {
       String node = ZNodePaths.joinZNode(this.hfileRefsZNode, hfileRefsQueueId);
       try {
-        if (!replicationPeers.getAllPeerIds().contains(hfileRefsQueueId)) {
+        if (!peerStorage.listPeerIds().contains(hfileRefsQueueId)) {
           ZKUtil.deleteNodeRecursively(this.zookeeper, node);
           LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path "
               + hfileRefsZNode);
         }
-      } catch (KeeperException e) {
+      } catch (ReplicationException | KeeperException e) {
         throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
-            + " from path " + hfileRefsZNode);
+            + " from path " + hfileRefsZNode, e);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index f4ccce8..b6732d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -314,12 +314,12 @@ public class ReplicationPeerManager {
   public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
       throws ReplicationException {
     ReplicationPeerStorage peerStorage =
-      ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
+        ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
     ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
     for (String peerId : peerStorage.listPeerIds()) {
-      Optional<ReplicationPeerConfig> peerConfig = peerStorage.getPeerConfig(peerId);
+      ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
       boolean enabled = peerStorage.isPeerEnabled(peerId);
-      peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig.get()));
+      peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
     }
     return new ReplicationPeerManager(peerStorage,
         ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 73e600e..27bda2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -310,7 +310,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
 
     queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
     replicationPeers =
-        ReplicationFactory.getReplicationPeers(zkw, getConf(), queueStorage, connection);
+        ReplicationFactory.getReplicationPeers(zkw, getConf());
     replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
       new WarnOnlyAbortable(), new WarnOnlyStoppable());
     Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers());

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index 598357c..1efe180 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -19,9 +19,10 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,7 +31,8 @@ import org.slf4j.LoggerFactory;
 public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
   private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
 
-  private ReplicationSourceManager replicationSourceManager;
+  private final ReplicationSourceManager replicationSourceManager;
+  private final ReentrantLock peersLock = new ReentrantLock();
 
   public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) {
     this.replicationSourceManager = replicationSourceManager;
@@ -38,45 +40,40 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
 
   @Override
   public void addPeer(String peerId) throws ReplicationException, IOException {
-    replicationSourceManager.addPeer(peerId);
+    peersLock.lock();
+    try {
+      replicationSourceManager.addPeer(peerId);
+    } finally {
+      peersLock.unlock();
+    }
   }
 
   @Override
   public void removePeer(String peerId) throws ReplicationException, IOException {
-    replicationSourceManager.removePeer(peerId);
+    peersLock.lock();
+    try {
+      if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) {
+        replicationSourceManager.removePeer(peerId);
+      }
+    } finally {
+      peersLock.unlock();
+    }
   }
 
   @Override
   public void disablePeer(String peerId) throws ReplicationException, IOException {
-    ReplicationPeerImpl peer =
-        replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
-    if (peer != null) {
-      peer.refreshPeerState();
-      LOG.info("disable replication peer, id: " + peerId + ", new state: " + peer.getPeerState());
-    } else {
-      throw new ReplicationException("No connected peer found, peerId=" + peerId);
-    }
+    PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
+    LOG.info("disable replication peer, id: " + peerId + ", new state: " + newState);
   }
 
   @Override
   public void enablePeer(String peerId) throws ReplicationException, IOException {
-    ReplicationPeerImpl peer =
-        replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
-    if (peer != null) {
-      peer.refreshPeerState();
-      LOG.info("enable replication peer, id: " + peerId + ", new state: " + peer.getPeerState());
-    } else {
-      throw new ReplicationException("No connected peer found, peerId=" + peerId);
-    }
+    PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
+    LOG.info("enable replication peer, id: " + peerId + ", new state: " + newState);
   }
 
   @Override
   public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
-    ReplicationPeerImpl peer =
-        replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
-    if (peer == null) {
-      throw new ReplicationException("No connected peer found, peerId=" + peerId);
-    }
-    peer.refreshPeerConfig();
+    replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 9296469..2a4e772 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -107,7 +107,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
       this.queueStorage =
           ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
       this.replicationPeers =
-          ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
+          ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
       this.replicationPeers.init();
       this.replicationTracker =
           ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e36a84a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 58ea6ee..8250992 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -222,8 +222,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
       // A peerId will not have "-" in its name, see HBASE-11394
       peerId = peerClusterZnode.split("-")[0];
     }
-    Map<TableName, List<String>> tableCFMap =
-        replicationPeers.getConnectedPeer(peerId).getTableCFs();
+    Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs();
     if (tableCFMap != null) {
       List<String> tableCfs = tableCFMap.get(tableName);
       if (tableCFMap.containsKey(tableName)
@@ -371,7 +370,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   }
 
   private long getCurrentBandwidth() {
-    ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId);
+    ReplicationPeer replicationPeer = this.replicationPeers.getPeer(peerId);
     long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
     // user can set peer bandwidth to 0 to use default bandwidth
     return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
@@ -416,7 +415,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
    */
   @Override
   public boolean isPeerEnabled() {
-    return this.replicationPeers.getStatusOfPeer(this.peerId);
+    return this.replicationPeers.isPeerEnabled(this.peerId);
   }
 
   @Override