You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/11/21 07:36:22 UTC

hbase git commit: HBASE-19293 Support add a disabled state replication peer directly

Repository: hbase
Updated Branches:
  refs/heads/master 8f806ab48 -> e1133d520


HBASE-19293 Support add a disabled state replication peer directly


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e1133d52
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e1133d52
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e1133d52

Branch: refs/heads/master
Commit: e1133d52015ffce9ff6cc2f60d6d48536779d0e4
Parents: 8f806ab
Author: Guanghao Zhang <zg...@apache.org>
Authored: Sat Nov 18 08:48:40 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Tue Nov 21 15:26:06 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/Admin.java   | 89 ++++++++------------
 .../apache/hadoop/hbase/client/AsyncAdmin.java  | 13 ++-
 .../hadoop/hbase/client/AsyncHBaseAdmin.java    |  6 +-
 .../apache/hadoop/hbase/client/HBaseAdmin.java  | 10 +--
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java |  6 +-
 .../hbase/shaded/protobuf/RequestConverter.java |  7 +-
 .../src/main/protobuf/Replication.proto         |  1 +
 .../hbase/replication/ReplicationPeers.java     | 13 ++-
 .../replication/ReplicationPeersZKImpl.java     | 19 ++---
 .../org/apache/hadoop/hbase/master/HMaster.java |  6 +-
 .../hadoop/hbase/master/MasterRpcServices.java  |  5 +-
 .../hadoop/hbase/master/MasterServices.java     |  3 +-
 .../master/replication/ReplicationManager.java  |  6 +-
 .../replication/TestReplicationAdmin.java       | 16 ++++
 .../hbase/master/MockNoopMasterServices.java    |  4 +-
 .../src/main/ruby/hbase/replication_admin.rb    |  8 +-
 hbase-shell/src/main/ruby/hbase_constants.rb    |  1 +
 .../src/main/ruby/shell/commands/add_peer.rb    |  4 +
 .../test/ruby/hbase/replication_admin_test.rb   | 23 ++++-
 src/main/asciidoc/_chapters/ops_mgt.adoc        |  1 +
 20 files changed, 147 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index fd02a48..6f1190e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
@@ -2466,111 +2465,97 @@ public interface Admin extends Abortable, Closeable {
    * Add a new replication peer for replicating data to slave cluster.
    * @param peerId a short name that identifies the peer
    * @param peerConfig configuration for the replication slave cluster
-   * @throws IOException
+   * @throws IOException if a remote or network exception occurs
    */
   default void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
       throws IOException {
+    addReplicationPeer(peerId, peerConfig, true);
   }
 
   /**
+   * Add a new replication peer for replicating data to slave cluster.
+   * @param peerId a short name that identifies the peer
+   * @param peerConfig configuration for the replication slave cluster
+   * @param enabled peer state, true if ENABLED and false if DISABLED
+   * @throws IOException if a remote or network exception occurs
+   */
+  void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
+      throws IOException;
+
+  /**
    * Remove a peer and stop the replication.
    * @param peerId a short name that identifies the peer
-   * @throws IOException
+   * @throws IOException if a remote or network exception occurs
    */
-  default void removeReplicationPeer(String peerId) throws IOException {
-  }
+  void removeReplicationPeer(String peerId) throws IOException;
 
   /**
    * Restart the replication stream to the specified peer.
    * @param peerId a short name that identifies the peer
-   * @throws IOException
+   * @throws IOException if a remote or network exception occurs
    */
-  default void enableReplicationPeer(String peerId) throws IOException {
-  }
+  void enableReplicationPeer(String peerId) throws IOException;
 
   /**
    * Stop the replication stream to the specified peer.
    * @param peerId a short name that identifies the peer
-   * @throws IOException
+   * @throws IOException if a remote or network exception occurs
    */
-  default void disableReplicationPeer(String peerId) throws IOException {
-  }
+  void disableReplicationPeer(String peerId) throws IOException;
 
   /**
    * Returns the configured ReplicationPeerConfig for the specified peer.
    * @param peerId a short name that identifies the peer
    * @return ReplicationPeerConfig for the peer
-   * @throws IOException
+   * @throws IOException if a remote or network exception occurs
    */
-  default ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws IOException {
-    return new ReplicationPeerConfig();
-  }
+  ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws IOException;
 
   /**
    * Update the peerConfig for the specified peer.
    * @param peerId a short name that identifies the peer
    * @param peerConfig new config for the peer
-   * @throws IOException
+   * @throws IOException if a remote or network exception occurs
    */
-  default void updateReplicationPeerConfig(String peerId,
-      ReplicationPeerConfig peerConfig) throws IOException {
-  }
+  void updateReplicationPeerConfig(String peerId,
+      ReplicationPeerConfig peerConfig) throws IOException;
 
   /**
    * Append the replicable table column family config from the specified peer.
    * @param id a short that identifies the cluster
    * @param tableCfs A map from tableName to column family names
-   * @throws ReplicationException
-   * @throws IOException
+   * @throws ReplicationException if tableCfs has conflict with existing config
+   * @throws IOException if a remote or network exception occurs
    */
-  default void appendReplicationPeerTableCFs(String id,
-      Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException,
-      IOException {
-  }
+  void appendReplicationPeerTableCFs(String id,
+      Map<TableName, ? extends Collection<String>> tableCfs)
+      throws ReplicationException, IOException;
 
   /**
    * Remove some table-cfs from config of the specified peer.
    * @param id a short name that identifies the cluster
    * @param tableCfs A map from tableName to column family names
-   * @throws ReplicationException
-   * @throws IOException
-   */
-  default void removeReplicationPeerTableCFs(String id,
-      Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException,
-      IOException {
-  }
-
-  /**
-   * Return a list of replication peers.
-   * @return a list of replication peers description
-   * @throws IOException
+   * @throws ReplicationException if tableCfs has conflict with existing config
+   * @throws IOException if a remote or network exception occurs
    */
-  default List<ReplicationPeerDescription> listReplicationPeers() throws IOException {
-    return new ArrayList<>();
-  }
+  void removeReplicationPeerTableCFs(String id,
+      Map<TableName, ? extends Collection<String>> tableCfs)
+      throws ReplicationException, IOException;
 
   /**
    * Return a list of replication peers.
-   * @param regex The regular expression to match peer id
    * @return a list of replication peers description
-   * @throws IOException
-   * @deprecated since 2.0 version and will be removed in 3.0 version. Use
-   *             {@link #listReplicationPeers(Pattern)} instead.
+   * @throws IOException if a remote or network exception occurs
    */
-  @Deprecated
-  default List<ReplicationPeerDescription> listReplicationPeers(String regex) throws IOException {
-    return new ArrayList<>();
-  }
+  List<ReplicationPeerDescription> listReplicationPeers() throws IOException;
 
   /**
    * Return a list of replication peers.
    * @param pattern The compiled regular expression to match peer id
    * @return a list of replication peers description
-   * @throws IOException
+   * @throws IOException if a remote or network exception occurs
    */
-  default List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) throws IOException {
-    return new ArrayList<>();
-  }
+  List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) throws IOException;
 
   /**
    * Mark region server(s) as decommissioned to prevent additional regions from getting

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index 0e0a673..abd8e5a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -499,8 +499,19 @@ public interface AsyncAdmin {
    * @param peerId a short name that identifies the peer
    * @param peerConfig configuration for the replication slave cluster
    */
+  default CompletableFuture<Void> addReplicationPeer(String peerId,
+      ReplicationPeerConfig peerConfig) {
+    return addReplicationPeer(peerId, peerConfig, true);
+  }
+
+  /**
+   * Add a new replication peer for replicating data to slave cluster
+   * @param peerId a short name that identifies the peer
+   * @param peerConfig configuration for the replication slave cluster
+   * @param enabled peer state, true if ENABLED and false if DISABLED
+   */
   CompletableFuture<Void> addReplicationPeer(String peerId,
-      ReplicationPeerConfig peerConfig);
+      ReplicationPeerConfig peerConfig, boolean enabled);
 
   /**
    * Remove a peer and stop the replication

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 3fe7951..fb16fce 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -375,9 +375,9 @@ class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<Void>
-      addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) {
-    return wrap(rawAdmin.addReplicationPeer(peerId, peerConfig));
+  public CompletableFuture<Void> addReplicationPeer(String peerId,
+      ReplicationPeerConfig peerConfig, boolean enabled) {
+    return wrap(rawAdmin.addReplicationPeer(peerId, peerConfig, enabled));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 7669eb2..05157dd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -103,7 +103,6 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -3850,13 +3849,13 @@ public class HBaseAdmin implements Admin {
   }
 
   @Override
-  public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
+  public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
       throws IOException {
     executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
       @Override
       protected Void rpcCall() throws Exception {
         master.addReplicationPeer(getRpcController(),
-          RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig));
+          RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled));
         return null;
       }
     });
@@ -3954,11 +3953,6 @@ public class HBaseAdmin implements Admin {
   }
 
   @Override
-  public List<ReplicationPeerDescription> listReplicationPeers(String regex) throws IOException {
-    return listReplicationPeers(Pattern.compile(regex));
-  }
-
-  @Override
   public List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern)
       throws IOException {
     return executeCallable(new MasterCallable<List<ReplicationPeerDescription>>(getConnection(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index fe1d685..f56e7ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -1501,14 +1501,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public CompletableFuture<Void> addReplicationPeer(String peerId,
-      ReplicationPeerConfig peerConfig) {
+      ReplicationPeerConfig peerConfig, boolean enabled) {
     return this
         .<Void> newMasterCaller()
         .action(
           (controller, stub) -> this
               .<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub,
-                RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig), (s, c, req,
-                    done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
+                RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), (s,
+                    c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 9eff114..4fdc87d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -1618,10 +1618,15 @@ public final class RequestConverter {
   }
 
   public static ReplicationProtos.AddReplicationPeerRequest buildAddReplicationPeerRequest(
-      String peerId, ReplicationPeerConfig peerConfig) {
+      String peerId, ReplicationPeerConfig peerConfig, boolean enabled) {
     AddReplicationPeerRequest.Builder builder = AddReplicationPeerRequest.newBuilder();
     builder.setPeerId(peerId);
     builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
+    ReplicationProtos.ReplicationState.Builder stateBuilder =
+        ReplicationProtos.ReplicationState.newBuilder();
+    stateBuilder.setState(enabled ? ReplicationProtos.ReplicationState.State.ENABLED
+        : ReplicationProtos.ReplicationState.State.DISABLED);
+    builder.setPeerState(stateBuilder.build());
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-protocol-shaded/src/main/protobuf/Replication.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
index 7e78144..88efa00 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
@@ -77,6 +77,7 @@ message ReplicationHLogPosition {
 message AddReplicationPeerRequest {
   required string peer_id = 1;
   required ReplicationPeer peer_config = 2;
+  required ReplicationState peer_state = 3;
 }
 
 message AddReplicationPeerResponse {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 3e2e1ad..10936bf 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -51,7 +51,18 @@ public interface ReplicationPeers {
    * @param peerId a short that identifies the cluster
    * @param peerConfig configuration for the replication slave cluster
    */
-  void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
+  default void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
+      throws ReplicationException {
+    registerPeer(peerId, peerConfig, true);
+  }
+
+  /**
+   * Add a new remote slave cluster for replication.
+   * @param peerId a short that identifies the cluster
+   * @param peerConfig configuration for the replication slave cluster
+   * @param enabled peer state, true if ENABLED and false if DISABLED
+   */
+  void registerPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
       throws ReplicationException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 9c4e3fe..b7564f4 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -105,7 +105,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   }
 
   @Override
-  public void registerPeer(String id, ReplicationPeerConfig peerConfig)
+  public void registerPeer(String id, ReplicationPeerConfig peerConfig, boolean enabled)
       throws ReplicationException {
     try {
       if (peerExists(id)) {
@@ -130,19 +130,18 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
 
       List<ZKUtilOp> listOfOps = new ArrayList<>(2);
-      ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
-        ReplicationPeerConfigUtil.toByteArray(peerConfig));
-      // b/w PeerWatcher and ReplicationZookeeper#add method to create the
-      // peer-state znode. This happens while adding a peer
-      // The peer state data is set as "ENABLED" by default.
-      ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES);
+      ZKUtilOp op1 =
+          ZKUtilOp.createAndFailSilent(getPeerNode(id),
+            ReplicationPeerConfigUtil.toByteArray(peerConfig));
+      ZKUtilOp op2 =
+          ZKUtilOp.createAndFailSilent(getPeerStateNode(id), enabled ? ENABLED_ZNODE_BYTES
+              : DISABLED_ZNODE_BYTES);
       listOfOps.add(op1);
       listOfOps.add(op2);
       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
-      // A peer is enabled by default
     } catch (KeeperException e) {
-      throw new ReplicationException("Could not add peer with id=" + id
-          + ", peerConfif=>" + peerConfig, e);
+      throw new ReplicationException("Could not add peer with id=" + id + ", peerConfif=>"
+          + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 1967296..97982b9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -3322,14 +3322,14 @@ public class HMaster extends HRegionServer implements MasterServices {
   }
 
   @Override
-  public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
+  public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
       throws ReplicationException, IOException {
     if (cpHost != null) {
       cpHost.preAddReplicationPeer(peerId, peerConfig);
     }
     LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config="
-        + peerConfig);
-    this.replicationManager.addReplicationPeer(peerId, peerConfig);
+        + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"));
+    this.replicationManager.addReplicationPeer(peerId, peerConfig, enabled);
     if (cpHost != null) {
       cpHost.postAddReplicationPeer(peerId, peerConfig);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 2e3df2d..2fd60af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
-
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
@@ -267,6 +266,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationState;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
@@ -1809,7 +1809,8 @@ public class MasterRpcServices extends RSRpcServices
       AddReplicationPeerRequest request) throws ServiceException {
     try {
       master.addReplicationPeer(request.getPeerId(),
-        ReplicationPeerConfigUtil.convert(request.getPeerConfig()));
+        ReplicationPeerConfigUtil.convert(request.getPeerConfig()), request.getPeerState()
+            .getState().equals(ReplicationState.State.ENABLED));
       return AddReplicationPeerResponse.newBuilder().build();
     } catch (ReplicationException | IOException e) {
       throw new ServiceException(e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 57f7df5..6b3c212 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -428,8 +428,9 @@ public interface MasterServices extends Server {
    * Add a new replication peer for replicating data to slave cluster
    * @param peerId a short name that identifies the peer
    * @param peerConfig configuration for the replication slave cluster
+   * @param enabled peer state, true if ENABLED and false if DISABLED
    */
-  void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
+  void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
       throws ReplicationException, IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
index 7cfaefd..3615992 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
@@ -40,6 +39,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * Manages and performs all replication admin operations.
@@ -69,12 +69,12 @@ public class ReplicationManager {
     }
   }
 
-  public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
+  public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
       throws ReplicationException, IOException {
     checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
       peerConfig.getTableCFsMap());
     checkConfiguredWALEntryFilters(peerConfig);
-    replicationPeers.registerPeer(peerId, peerConfig);
+    replicationPeers.registerPeer(peerId, peerConfig, enabled);
     replicationPeers.peerConnected(peerId);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index c98a02c..036706a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -141,6 +142,21 @@ public class TestReplicationAdmin {
     assertEquals(0, admin.getPeersCount());
   }
 
+  @Test
+  public void testAddPeerWithState() throws Exception {
+    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
+    rpc1.setClusterKey(KEY_ONE);
+    hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true);
+    assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled());
+    hbaseAdmin.removeReplicationPeer(ID_ONE);
+
+    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
+    rpc2.setClusterKey(KEY_SECOND);
+    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false);
+    assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled());
+    hbaseAdmin.removeReplicationPeer(ID_SECOND);
+  }
+
   /**
    * Tests that the peer configuration used by ReplicationAdmin contains all
    * the peer's properties.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index dadec1f..a752e9b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -393,7 +393,7 @@ public class MockNoopMasterServices implements MasterServices, Server {
   }
 
   @Override
-  public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
+  public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
       throws ReplicationException {
   }
 
@@ -460,4 +460,4 @@ public class MockNoopMasterServices implements MasterServices, Server {
   public FileSystem getFileSystem() {
     return null;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index ceb728e..3f64356 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -65,6 +65,7 @@ module Hbase
         data = args.fetch(DATA, nil)
         table_cfs = args.fetch(TABLE_CFS, nil)
         namespaces = args.fetch(NAMESPACES, nil)
+        peer_state = args.fetch(STATE, nil)
 
         # Create and populate a ReplicationPeerConfig
         replication_peer_config = ReplicationPeerConfig.new
@@ -102,7 +103,12 @@ module Hbase
           end
           replication_peer_config.set_table_cfs_map(map)
         end
-        @admin.addReplicationPeer(id, replication_peer_config)
+
+        enabled = true
+        unless peer_state.nil?
+          enabled = false if peer_state == 'DISABLED'
+        end
+        @admin.addReplicationPeer(id, replication_peer_config, enabled)
       else
         raise(ArgumentError, 'args must be a Hash')
       end

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-shell/src/main/ruby/hbase_constants.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase_constants.rb b/hbase-shell/src/main/ruby/hbase_constants.rb
index ebaae78..28484cb 100644
--- a/hbase-shell/src/main/ruby/hbase_constants.rb
+++ b/hbase-shell/src/main/ruby/hbase_constants.rb
@@ -79,6 +79,7 @@ module HBaseConstants
   CLUSTER_KEY = 'CLUSTER_KEY'.freeze
   TABLE_CFS = 'TABLE_CFS'.freeze
   NAMESPACES = 'NAMESPACES'.freeze
+  STATE = 'STATE'.freeze
   CONFIG = 'CONFIG'.freeze
   DATA = 'DATA'.freeze
   SERVER_NAME = 'SERVER_NAME'.freeze

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
index edaa386..eb2da83 100644
--- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
@@ -28,6 +28,8 @@ must be specified to identify the peer.
 For a HBase cluster peer, a cluster key must be provided and is composed like this:
 hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
 This gives a full path for HBase to connect to another HBase cluster.
+An optional parameter for state identifies the replication peer's state is enabled or disabled.
+And the default state is enabled.
 An optional parameter for namespaces identifies which namespace's tables will be replicated
 to the peer cluster.
 An optional parameter for table column families identifies which tables and/or column families
@@ -40,6 +42,8 @@ then you can't set this namespace's tables in the peer config again.
 Examples:
 
   hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
+  hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", STATE => "ENABLED"
+  hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", STATE => "DISABLED"
   hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
     TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
   hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
index 0d92287..75f3c04 100644
--- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -93,7 +93,7 @@ module Hbase
       command(:remove_peer, @peer_id)
     end
 
-    define_test "add_peer: single zk cluster key - peer config" do
+    define_test "add_peer: single zk cluster key with enabled/disabled state" do
       cluster_key = "server1.cie.com:2181:/hbase"
 
       args = { CLUSTER_KEY => cluster_key }
@@ -101,9 +101,26 @@ module Hbase
 
       assert_equal(1, command(:list_peers).length)
       assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
-      assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
+      assert_equal(true, command(:list_peers).get(0).isEnabled)
+
+      command(:remove_peer, @peer_id)
+
+      enable_args = { CLUSTER_KEY => cluster_key, STATE => 'ENABLED' }
+      command(:add_peer, @peer_id, enable_args)
+
+      assert_equal(1, command(:list_peers).length)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      assert_equal(true, command(:list_peers).get(0).isEnabled)
+
+      command(:remove_peer, @peer_id)
+
+      disable_args = { CLUSTER_KEY => cluster_key, STATE => 'DISABLED' }
+      command(:add_peer, @peer_id, disable_args)
+
+      assert_equal(1, command(:list_peers).length)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      assert_equal(false, command(:list_peers).get(0).isEnabled)
 
-      # cleanup for future tests
       command(:remove_peer, @peer_id)
     end
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1133d52/src/main/asciidoc/_chapters/ops_mgt.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/ops_mgt.adoc b/src/main/asciidoc/_chapters/ops_mgt.adoc
index b6babd6..2bb2510 100644
--- a/src/main/asciidoc/_chapters/ops_mgt.adoc
+++ b/src/main/asciidoc/_chapters/ops_mgt.adoc
@@ -1415,6 +1415,7 @@ add_peer <ID> <CLUSTER_KEY>::
   Adds a replication relationship between two clusters. +
   * ID -- a unique string, which must not contain a hyphen.
   * CLUSTER_KEY: composed using the following template, with appropriate place-holders: `hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent`
+  * STATE(optional): ENABLED or DISABLED, default value is ENABLED
 list_peers:: list all replication relationships known by this cluster
 enable_peer <ID>::
   Enable a previously-disabled replication relationship