You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/12/27 01:42:07 UTC

[05/17] hbase git commit: HBASE-19573 Rewrite ReplicationPeer with the new replication storage interface

HBASE-19573 Rewrite ReplicationPeer with the new replication storage interface


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7defbdc2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7defbdc2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7defbdc2

Branch: refs/heads/HBASE-19397
Commit: 7defbdc23a727d3541e774b11a21ffcd8632655e
Parents: e889a95
Author: Guanghao Zhang <zg...@apache.org>
Authored: Tue Dec 26 11:39:34 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Dec 27 09:40:34 2017 +0800

----------------------------------------------------------------------
 .../replication/VerifyReplication.java          |   5 -
 .../hbase/replication/ReplicationPeer.java      |  42 ++--
 .../hbase/replication/ReplicationPeerImpl.java  | 170 ++++++++++++++
 .../replication/ReplicationPeerZKImpl.java      | 233 -------------------
 .../hbase/replication/ReplicationPeers.java     |   4 +-
 .../replication/ReplicationPeersZKImpl.java     |  23 +-
 .../replication/TestReplicationStateBasic.java  |   7 +-
 .../regionserver/PeerProcedureHandlerImpl.java  |  29 +--
 8 files changed, 217 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7defbdc2/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 01df2bd..da231e6 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.mapreduce.TableSplit;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -330,7 +329,6 @@ public class VerifyReplication extends Configured implements Tool {
   private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
       final Configuration conf, String peerId) throws IOException {
     ZKWatcher localZKW = null;
-    ReplicationPeerZKImpl peer = null;
     try {
       localZKW = new ZKWatcher(conf, "VerifyReplication",
           new Abortable() {
@@ -351,9 +349,6 @@ public class VerifyReplication extends Configured implements Tool {
       throw new IOException(
           "An error occurred while trying to connect to the remove peer cluster", e);
     } finally {
-      if (peer != null) {
-        peer.close();
-      }
       if (localZKW != null) {
         localZKW.close();
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7defbdc2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index b66d76d..4846018 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
 
-
 /**
  * ReplicationPeer manages enabled / disabled state for the peer.
  */
@@ -49,65 +48,52 @@ public interface ReplicationPeer {
   String getId();
 
   /**
-   * Get the peer config object
-   * @return the ReplicationPeerConfig for this peer
-   */
-  public ReplicationPeerConfig getPeerConfig();
-
-  /**
-   * Get the peer config object. if loadFromBackingStore is true, it will load from backing store
-   * directly and update its load peer config. otherwise, just return the local cached peer config.
-   * @return the ReplicationPeerConfig for this peer
-   */
-  public ReplicationPeerConfig getPeerConfig(boolean loadFromBackingStore)
-      throws ReplicationException;
-
-  /**
    * Returns the state of the peer by reading local cache.
    * @return the enabled state
    */
   PeerState getPeerState();
 
   /**
-   * Returns the state of peer, if loadFromBackingStore is true, it will load from backing store
-   * directly and update its local peer state. otherwise, just return the local cached peer state.
-   * @return the enabled state
+   * Get the peer config object
+   * @return the ReplicationPeerConfig for this peer
    */
-  PeerState getPeerState(boolean loadFromBackingStore) throws ReplicationException;
+  ReplicationPeerConfig getPeerConfig();
 
   /**
    * Get the configuration object required to communicate with this peer
    * @return configuration object
    */
-  public Configuration getConfiguration();
+  Configuration getConfiguration();
 
   /**
    * Get replicable (table, cf-list) map of this peer
    * @return the replicable (table, cf-list) map
    */
-  public Map<TableName, List<String>> getTableCFs();
+  Map<TableName, List<String>> getTableCFs();
 
   /**
    * Get replicable namespace set of this peer
    * @return the replicable namespaces set
    */
-  public Set<String> getNamespaces();
+  Set<String> getNamespaces();
 
   /**
    * Get the per node bandwidth upper limit for this peer
    * @return the bandwidth upper limit
    */
-  public long getPeerBandwidth();
+  long getPeerBandwidth();
 
   /**
    * Register a peer config listener to catch the peer config change event.
    * @param listener listener to catch the peer config change event.
    */
-  public void registerPeerConfigListener(ReplicationPeerConfigListener listener);
+  void registerPeerConfigListener(ReplicationPeerConfigListener listener);
 
   /**
-   * Notify all the registered ReplicationPeerConfigListener to update their peer config.
-   * @param newPeerConfig the new peer config.
+   * @deprecated Use {@link #registerPeerConfigListener(ReplicationPeerConfigListener)} instead.
    */
-  public void triggerPeerConfigChange(ReplicationPeerConfig newPeerConfig);
-}
+  @Deprecated
+  default void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
+    registerPeerConfigListener(listener);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/7defbdc2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
new file mode 100644
index 0000000..ed9fe14
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
@@ -0,0 +1,170 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+
+@InterfaceAudience.Private
+public class ReplicationPeerImpl implements ReplicationPeer {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerImpl.class);
+
+  private final ReplicationPeerStorage peerStorage;
+
+  private final Configuration conf;
+
+  private final String id;
+
+  private volatile ReplicationPeerConfig peerConfig;
+
+  private volatile PeerState peerState;
+
+  private final List<ReplicationPeerConfigListener> peerConfigListeners;
+
+  /**
+   * Constructor that takes all the objects required to communicate with the specified peer, except
+   * for the region server addresses.
+   * @param conf configuration object to this peer
+   * @param id string representation of this peer's identifier
+   * @param peerConfig configuration for the replication peer
+   */
+  public ReplicationPeerImpl(ZKWatcher zkWatcher, Configuration conf, String id,
+      ReplicationPeerConfig peerConfig) {
+    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkWatcher, conf);
+    this.conf = conf;
+    this.peerConfig = peerConfig;
+    this.id = id;
+    this.peerConfigListeners = new ArrayList<>();
+  }
+
+  public void refreshPeerState() throws ReplicationException {
+    this.peerState = peerStorage.isPeerEnabled(id) ? PeerState.ENABLED : PeerState.DISABLED;
+  }
+
+  public void refreshPeerConfig() throws ReplicationException {
+    this.peerConfig = peerStorage.getPeerConfig(id).orElse(peerConfig);
+    peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig));
+  }
+
+  /**
+   * Get the identifier of this peer
+   * @return string representation of the id (short)
+   */
+  @Override
+  public String getId() {
+    return id;
+  }
+
+  @Override
+  public PeerState getPeerState() {
+    return peerState;
+  }
+
+  /**
+   * Get the peer config object
+   * @return the ReplicationPeerConfig for this peer
+   */
+  @Override
+  public ReplicationPeerConfig getPeerConfig() {
+    return peerConfig;
+  }
+
+  /**
+   * Get the configuration object required to communicate with this peer
+   * @return configuration object
+   */
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  /**
+   * Get replicable (table, cf-list) map of this peer
+   * @return the replicable (table, cf-list) map
+   */
+  @Override
+  public Map<TableName, List<String>> getTableCFs() {
+    return this.peerConfig.getTableCFsMap();
+  }
+
+  /**
+   * Get replicable namespace set of this peer
+   * @return the replicable namespaces set
+   */
+  @Override
+  public Set<String> getNamespaces() {
+    return this.peerConfig.getNamespaces();
+  }
+
+  @Override
+  public long getPeerBandwidth() {
+    return this.peerConfig.getBandwidth();
+  }
+
+  @Override
+  public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
+    this.peerConfigListeners.add(listener);
+  }
+
+  /**
+   * Parse the raw data from ZK to get a peer's state
+   * @param bytes raw ZK data
+   * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
+   * @throws DeserializationException
+   */
+  public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
+    ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes);
+    return ReplicationProtos.ReplicationState.State.ENABLED == state;
+  }
+
+  /**
+   * @param bytes Content of a state znode.
+   * @return State parsed from the passed bytes.
+   * @throws DeserializationException
+   */
+  private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+      throws DeserializationException {
+    ProtobufUtil.expectPBMagicPrefix(bytes);
+    int pbLen = ProtobufUtil.lengthOfPBMagic();
+    ReplicationProtos.ReplicationState.Builder builder =
+        ReplicationProtos.ReplicationState.newBuilder();
+    ReplicationProtos.ReplicationState state;
+    try {
+      ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
+      state = builder.build();
+      return state.getState();
+    } catch (IOException e) {
+      throw new DeserializationException(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/7defbdc2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
deleted file mode 100644
index 5d051a0..0000000
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.log.HBaseMarkers;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@InterfaceAudience.Private
-public class ReplicationPeerZKImpl extends ReplicationStateZKBase
-    implements ReplicationPeer, Abortable, Closeable {
-  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerZKImpl.class);
-
-  private volatile ReplicationPeerConfig peerConfig;
-  private final String id;
-  private volatile PeerState peerState;
-  private volatile Map<TableName, List<String>> tableCFs = new HashMap<>();
-  private final Configuration conf;
-
-  private final List<ReplicationPeerConfigListener> peerConfigListeners;
-
-  /**
-   * Constructor that takes all the objects required to communicate with the specified peer, except
-   * for the region server addresses.
-   * @param conf configuration object to this peer
-   * @param id string representation of this peer's identifier
-   * @param peerConfig configuration for the replication peer
-   */
-  public ReplicationPeerZKImpl(ZKWatcher zkWatcher, Configuration conf, String id,
-      ReplicationPeerConfig peerConfig, Abortable abortable) throws ReplicationException {
-    super(zkWatcher, conf, abortable);
-    this.conf = conf;
-    this.peerConfig = peerConfig;
-    this.id = id;
-    this.peerConfigListeners = new ArrayList<>();
-  }
-
-  private PeerState readPeerState() throws ReplicationException {
-    try {
-      byte[] data = ZKUtil.getData(zookeeper, this.getPeerStateNode(id));
-      this.peerState = isStateEnabled(data) ? PeerState.ENABLED : PeerState.DISABLED;
-    } catch (DeserializationException | KeeperException | InterruptedException e) {
-      throw new ReplicationException("Get and deserialize peer state data from zookeeper failed: ",
-          e);
-    }
-    return this.peerState;
-  }
-
-  private ReplicationPeerConfig readPeerConfig() throws ReplicationException {
-    try {
-      byte[] data = ZKUtil.getData(zookeeper, this.getPeerNode(id));
-      if (data != null) {
-        this.peerConfig = ReplicationPeerConfigUtil.parsePeerFrom(data);
-      }
-    } catch (DeserializationException | KeeperException | InterruptedException e) {
-      throw new ReplicationException("Get and deserialize peer config date from zookeeper failed: ",
-          e);
-    }
-    return this.peerConfig;
-  }
-
-  @Override
-  public PeerState getPeerState() {
-    return peerState;
-  }
-
-  @Override
-  public PeerState getPeerState(boolean loadFromBackingStore) throws ReplicationException {
-    if (loadFromBackingStore) {
-      return readPeerState();
-    } else {
-      return peerState;
-    }
-  }
-
-  /**
-   * Get the identifier of this peer
-   * @return string representation of the id (short)
-   */
-  @Override
-  public String getId() {
-    return id;
-  }
-
-  /**
-   * Get the peer config object
-   * @return the ReplicationPeerConfig for this peer
-   */
-  @Override
-  public ReplicationPeerConfig getPeerConfig() {
-    return peerConfig;
-  }
-
-  @Override
-  public ReplicationPeerConfig getPeerConfig(boolean loadFromBackingStore)
-      throws ReplicationException {
-    if (loadFromBackingStore) {
-      return readPeerConfig();
-    } else {
-      return peerConfig;
-    }
-  }
-
-  /**
-   * Get the configuration object required to communicate with this peer
-   * @return configuration object
-   */
-  @Override
-  public Configuration getConfiguration() {
-    return conf;
-  }
-
-  /**
-   * Get replicable (table, cf-list) map of this peer
-   * @return the replicable (table, cf-list) map
-   */
-  @Override
-  public Map<TableName, List<String>> getTableCFs() {
-    this.tableCFs = peerConfig.getTableCFsMap();
-    return this.tableCFs;
-  }
-
-  /**
-   * Get replicable namespace set of this peer
-   * @return the replicable namespaces set
-   */
-  @Override
-  public Set<String> getNamespaces() {
-    return this.peerConfig.getNamespaces();
-  }
-
-  @Override
-  public long getPeerBandwidth() {
-    return this.peerConfig.getBandwidth();
-  }
-
-  @Override
-  public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
-    this.peerConfigListeners.add(listener);
-  }
-
-  @Override
-  public void triggerPeerConfigChange(ReplicationPeerConfig newPeerConfig) {
-    for (ReplicationPeerConfigListener listener : this.peerConfigListeners) {
-      listener.peerConfigUpdated(newPeerConfig);
-    }
-  }
-
-  @Override
-  public void abort(String why, Throwable e) {
-    LOG.error(HBaseMarkers.FATAL, "The ReplicationPeer corresponding to peer " +
-        peerConfig + " was aborted for the following reason(s):" + why, e);
-  }
-
-  @Override
-  public boolean isAborted() {
-    // Currently the replication peer is never "Aborted", we just log when the
-    // abort method is called.
-    return false;
-  }
-
-  @Override
-  public void close() throws IOException {
-    // TODO: stop zkw?
-  }
-
-  /**
-   * Parse the raw data from ZK to get a peer's state
-   * @param bytes raw ZK data
-   * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
-   * @throws DeserializationException
-   */
-  public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
-    ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes);
-    return ReplicationProtos.ReplicationState.State.ENABLED == state;
-  }
-
-  /**
-   * @param bytes Content of a state znode.
-   * @return State parsed from the passed bytes.
-   * @throws DeserializationException
-   */
-  private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
-      throws DeserializationException {
-    ProtobufUtil.expectPBMagicPrefix(bytes);
-    int pbLen = ProtobufUtil.lengthOfPBMagic();
-    ReplicationProtos.ReplicationState.Builder builder =
-        ReplicationProtos.ReplicationState.newBuilder();
-    ReplicationProtos.ReplicationState state;
-    try {
-      ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
-      state = builder.build();
-      return state.getState();
-    } catch (IOException e) {
-      throw new DeserializationException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7defbdc2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 10936bf..afc19bd 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -116,13 +116,13 @@ public interface ReplicationPeers {
       throws ReplicationException;
 
   /**
-   * Returns the ReplicationPeer for the specified connected peer. This ReplicationPeer will
+   * Returns the ReplicationPeerImpl for the specified connected peer. This ReplicationPeer will
    * continue to track changes to the Peer's state and config. This method returns null if no
    * peer has been connected with the given peerId.
    * @param peerId id for the peer
    * @return ReplicationPeer object
    */
-  ReplicationPeer getConnectedPeer(String peerId);
+  ReplicationPeerImpl getConnectedPeer(String peerId);
 
   /**
    * Returns the set of peerIds of the clusters that have been connected and have an underlying

http://git-wip-us.apache.org/repos/asf/hbase/blob/7defbdc2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index f2e5647..268ba87 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -80,7 +80,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
 
   // Map of peer clusters keyed by their id
-  private Map<String, ReplicationPeerZKImpl> peerClusters;
+  private ConcurrentMap<String, ReplicationPeerImpl> peerClusters;
   private final ReplicationQueueStorage queueStorage;
   private Abortable abortable;
 
@@ -232,7 +232,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       }
       String peerStateZNode = getPeerStateNode(id);
       try {
-        return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
+        return ReplicationPeerImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
       } catch (KeeperException e) {
         throw new ReplicationException(e);
       } catch (DeserializationException e) {
@@ -270,7 +270,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   }
 
   @Override
-  public ReplicationPeer getConnectedPeer(String peerId) {
+  public ReplicationPeerImpl getConnectedPeer(String peerId) {
     return peerClusters.get(peerId);
   }
 
@@ -425,7 +425,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   public void peerDisconnected(String peerId) {
     ReplicationPeer rp = this.peerClusters.get(peerId);
     if (rp != null) {
-      ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp);
+      peerClusters.remove(peerId, rp);
     }
   }
 
@@ -442,7 +442,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       return false;
     }
 
-    ReplicationPeerZKImpl peer = null;
+    ReplicationPeerImpl peer = null;
     try {
       peer = createPeer(peerId);
     } catch (Exception e) {
@@ -451,8 +451,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     if (peer == null) {
       return false;
     }
-    ReplicationPeerZKImpl previous =
-      ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer);
+    ReplicationPeerImpl previous = peerClusters.putIfAbsent(peerId, peer);
     if (previous == null) {
       LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey());
     } else {
@@ -495,19 +494,19 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
    * @return object representing the peer
    * @throws ReplicationException
    */
-  private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
+  private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
     Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
     if (pair == null) {
       return null;
     }
     Configuration peerConf = pair.getSecond();
 
-    ReplicationPeerZKImpl peer =
-        new ReplicationPeerZKImpl(zookeeper, peerConf, peerId, pair.getFirst(), abortable);
+    ReplicationPeerImpl peer =
+        new ReplicationPeerImpl(zookeeper, peerConf, peerId, pair.getFirst());
 
     // Load peer state and peer config by reading zookeeper directly.
-    peer.getPeerState(true);
-    peer.getPeerConfig(true);
+    peer.refreshPeerState();
+    peer.refreshPeerConfig();
 
     return peer;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7defbdc2/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 6fe869c..8905d43 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -312,12 +312,15 @@ public abstract class TestReplicationStateBasic {
     rp.disablePeer(ID_ONE);
     // now we do not rely on zk watcher to trigger the state change so we need to trigger it
     // manually...
-    assertEquals(PeerState.DISABLED, rp.getConnectedPeer(ID_ONE).getPeerState(true));
+    ReplicationPeerImpl peer = rp.getConnectedPeer(ID_ONE);
+    peer.refreshPeerState();
+    assertEquals(PeerState.DISABLED, peer.getPeerState());
     assertConnectedPeerStatus(false, ID_ONE);
     rp.enablePeer(ID_ONE);
     // now we do not rely on zk watcher to trigger the state change so we need to trigger it
     // manually...
-    assertEquals(PeerState.ENABLED, rp.getConnectedPeer(ID_ONE).getPeerState(true));
+    peer.refreshPeerState();
+    assertEquals(PeerState.ENABLED, peer.getPeerState());
     assertConnectedPeerStatus(true, ID_ONE);
 
     // Disconnect peer

http://git-wip-us.apache.org/repos/asf/hbase/blob/7defbdc2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index 9b493d9..598357c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -21,15 +21,14 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeer;
-import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.log4j.Logger;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @InterfaceAudience.Private
 public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
-  private static final Logger LOG = Logger.getLogger(PeerProcedureHandlerImpl.class);
+  private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
 
   private ReplicationSourceManager replicationSourceManager;
 
@@ -49,10 +48,11 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
 
   @Override
   public void disablePeer(String peerId) throws ReplicationException, IOException {
-    ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
+    ReplicationPeerImpl peer =
+        replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
     if (peer != null) {
-      PeerState peerState = peer.getPeerState(true);
-      LOG.info("disablePeer state, peer id: " + peerId + ", state: " + peerState);
+      peer.refreshPeerState();
+      LOG.info("disable replication peer, id: " + peerId + ", new state: " + peer.getPeerState());
     } else {
       throw new ReplicationException("No connected peer found, peerId=" + peerId);
     }
@@ -60,10 +60,11 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
 
   @Override
   public void enablePeer(String peerId) throws ReplicationException, IOException {
-    ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
+    ReplicationPeerImpl peer =
+        replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
     if (peer != null) {
-      PeerState peerState = peer.getPeerState(true);
-      LOG.info("enablePeer state, peer id: " + peerId + ", state: " + peerState);
+      peer.refreshPeerState();
+      LOG.info("enable replication peer, id: " + peerId + ", new state: " + peer.getPeerState());
     } else {
       throw new ReplicationException("No connected peer found, peerId=" + peerId);
     }
@@ -71,11 +72,11 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
 
   @Override
   public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
-    ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
+    ReplicationPeerImpl peer =
+        replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
     if (peer == null) {
       throw new ReplicationException("No connected peer found, peerId=" + peerId);
     }
-    ReplicationPeerConfig rpc = peer.getPeerConfig(true);
-    peer.triggerPeerConfigChange(rpc);
+    peer.refreshPeerConfig();
   }
 }