You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2010/10/20 02:21:09 UTC

svn commit: r1024470 - in /hbase/trunk: ./ bin/replication/ src/main/java/org/apache/hadoop/hbase/client/replication/ src/main/java/org/apache/hadoop/hbase/replication/ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ src/main/java/org/a...

Author: jdcryans
Date: Wed Oct 20 00:21:08 2010
New Revision: 1024470

URL: http://svn.apache.org/viewvc?rev=1024470&view=rev
Log:
HBASE-2201  JRuby shell for replication

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
    hbase/trunk/src/main/ruby/hbase/replication_admin.rb
    hbase/trunk/src/main/ruby/shell/commands/add_peer.rb
    hbase/trunk/src/main/ruby/shell/commands/disable_peer.rb
    hbase/trunk/src/main/ruby/shell/commands/enable_peer.rb
    hbase/trunk/src/main/ruby/shell/commands/remove_peer.rb
    hbase/trunk/src/main/ruby/shell/commands/start_replication.rb
    hbase/trunk/src/main/ruby/shell/commands/stop_replication.rb
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/replication/
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
Removed:
    hbase/trunk/bin/replication/add_peer.rb
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
    hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html
    hbase/trunk/src/main/ruby/hbase.rb
    hbase/trunk/src/main/ruby/hbase/hbase.rb
    hbase/trunk/src/main/ruby/shell.rb
    hbase/trunk/src/main/ruby/shell/commands.rb
    hbase/trunk/src/site/xdoc/replication.xml
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Oct 20 00:21:08 2010
@@ -1072,6 +1072,7 @@ Release 0.21.0 - Unreleased
    HBASE-3073  New APIs for Result, faster implementation for some calls
    HBASE-3053  Add ability to have multiple Masters LocalHBaseCluster for
                test writing
+   HBASE-2201  JRuby shell for replication
 
   OPTIMIZATIONS
    HBASE-410   [testing] Speed up the test suite

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java Wed Oct 20 00:21:08 2010
@@ -0,0 +1,165 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.replication;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * <p>
+ * This class provides the administrative interface to HBase cluster
+ * replication. In order to use it, the cluster and the client using
+ * ReplicationAdmin must be configured with <code>hbase.replication</code>
+ * set to true.
+ * </p>
+ * <p>
+ * Adding a new peer results in creating new outbound connections from every
+ * region server to a subset of region servers on the slave cluster. Each
+ * new stream of replication will start replicating from the beginning of the
+ * current HLog, meaning that edits from that past will be replicated.
+ * </p>
+ * <p>
+ * Removing a peer is a destructive and irreversible operation that stops
+ * all the replication streams for the given cluster and deletes the metadata
+ * used to keep track of the replication state.
+ * </p>
+ * <p>
+ * Enabling and disabling peers is currently not supported.
+ * </p>
+ * <p>
+ * As cluster replication is still experimental, a kill switch is provided
+ * in order to stop all replication-related operations, see
+ * {@link #setReplicating(boolean)}. When setting it back to true, the new
+ * state of all the replication streams will be unknown and may have holes.
+ * Use at your own risk.
+ * </p>
+ * <p>
+ * To see which commands are available in the shell, type
+ * <code>replication</code>.
+ * </p>
+ */
+public class ReplicationAdmin {
+
+  private final ReplicationZookeeper replicationZk;
+
+  /**
+   * Constructor that creates a connection to the local ZooKeeper ensemble.
+   * @param conf Configuration to use
+   * @throws IOException if the connection to ZK cannot be made
+   * @throws RuntimeException if replication isn't enabled.
+   */
+  public ReplicationAdmin(Configuration conf) throws IOException {
+    if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
+      throw new RuntimeException("hbase.replication isn't true, please " +
+          "enable it in order to use replication");
+    }
+    ZooKeeperWatcher zkw = HConnectionManager.getConnection(conf).
+        getZooKeeperWatcher();
+    try {
+      this.replicationZk = new ReplicationZookeeper(conf, zkw);
+    } catch (KeeperException e) {
+      throw new IOException("Unable setup the ZooKeeper connection", e);
+    }
+  }
+
+  /**
+   * Add a new peer cluster to replicate to.
+   * @param id a short that identifies the cluster
+   * @param clusterKey the concatenation of the slave cluster's
+   * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
+   * @throws IllegalStateException if there's already one slave since
+   * multi-slave isn't supported yet.
+   */
+  public void addPeer(String id, String clusterKey) throws IOException {
+    this.replicationZk.addPeer(id, clusterKey);
+  }
+
+  /**
+   * Removes a peer cluster and stops the replication to it.
+   * @param id a short that identifies the cluster
+   */
+  public void removePeer(String id) throws IOException {
+    this.replicationZk.removePeer(id);
+  }
+
+  /**
+   * Restart the replication stream to the specified peer.
+   * @param id a short that identifies the cluster
+   */
+  public void enablePeer(String id) {
+    throw new NotImplementedException("Not implemented");
+  }
+
+  /**
+   * Stop the replication stream to the specified peer.
+   * @param id a short that identifies the cluster
+   */
+  public void disablePeer(String id) {
+    throw new NotImplementedException("Not implemented");
+  }
+
+  /**
+   * Get the number of slave clusters the local cluster has.
+   * @return number of slave clusters
+   */
+  public int getPeersCount() {
+    return this.replicationZk.listPeersIdsAndWatch().size();
+  }
+
+  /**
+   * Get the current status of the kill switch, if the cluster is replicating
+   * or not.
+   * @return true if the cluster is replicated, otherwise false
+   */
+  public boolean getReplicating() throws IOException {
+    try {
+      return this.replicationZk.getReplication();
+    } catch (KeeperException e) {
+      throw new IOException("Couldn't get the replication status");
+    }
+  }
+
+  /**
+   * Kill switch for all replication-related features
+   * @param newState true to start replication, false to stop it.
+   * completely
+   * @return the previous state
+   */
+  public boolean setReplicating(boolean newState) throws IOException {
+    boolean prev = getReplicating();
+    this.replicationZk.setReplicating(newState);
+    return prev;
+  }
+
+  /**
+   * Get the ZK-support tool created and used by this object for replication.
+   * @return the ZK-support tool
+   */
+  ReplicationZookeeper getReplicationZk() {
+    return replicationZk;
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Wed Oct 20 00:21:08 2010
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+/**
+ * This class acts as a wrapper for all the objects used to identify and
+ * communicate with remote peers. Everything needs to be created for objects
+ * of this class as it doesn't encapsulate any specific functionality e.g.
+ * it's a container class.
+ */
+public class ReplicationPeer {
+
+  private final String clusterKey;
+  private final String id;
+  private List<HServerAddress> regionServers =
+      new ArrayList<HServerAddress>(0);
+  private final AtomicBoolean peerEnabled = new AtomicBoolean();
+  // Cannot be final since a new object needs to be recreated when session fails
+  private ZooKeeperWatcher zkw;
+  private final Configuration conf;
+
+  /**
+   * Constructor that takes all the objects required to communicate with the
+   * specified peer, except for the region server addresses.
+   * @param conf configuration object to this peer
+   * @param key cluster key used to locate the peer
+   * @param id string representation of this peer's identifier
+   * @param zkw zookeeper connection to the peer
+   */
+  public ReplicationPeer(Configuration conf, String key,
+      String id, ZooKeeperWatcher zkw) {
+    this.conf = conf;
+    this.clusterKey = key;
+    this.id = id;
+    this.zkw = zkw;
+  }
+
+  /**
+   * Get the cluster key of that peer
+   * @return string consisting of zk ensemble addresses, client port
+   * and root znode
+   */
+  public String getClusterKey() {
+    return clusterKey;
+  }
+
+  /**
+   * Get the state of this peer
+   * @return atomic boolean that holds the status
+   */
+  public AtomicBoolean getPeerEnabled() {
+    return peerEnabled;
+  }
+
+  /**
+   * Get a list of all the addresses of all the region servers
+   * for this peer cluster
+   * @return list of addresses
+   */
+  public List<HServerAddress> getRegionServers() {
+    return regionServers;
+  }
+
+  /**
+   * Set the list of region servers for that peer
+   * @param regionServers list of addresses for the region servers
+   */
+  public void setRegionServers(List<HServerAddress> regionServers) {
+    this.regionServers = regionServers;
+  }
+
+  /**
+   * Get the ZK connection to this peer
+   * @return zk connection
+   */
+  public ZooKeeperWatcher getZkw() {
+    return zkw;
+  }
+
+  /**
+   * Get the identifier of this peer
+   * @return string representation of the id (short)
+   */
+  public String getId() {
+    return id;
+  }
+
+  /**
+   * Get the configuration object required to communicate with this peer
+   * @return configuration object
+   */
+  public Configuration getConfiguration() {
+    return conf;
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Wed Oct 20 00:21:08 2010
@@ -54,7 +54,6 @@ import org.apache.zookeeper.KeeperExcept
  * <p/>
  * <pre>
  * replication/
- *  master     {contains a full cluster address}
  *  state      {contains true or false}
  *  clusterId  {contains a byte}
  *  peers/
@@ -80,8 +79,8 @@ public class ReplicationZookeeper {
   private final static String RS_LOCK_ZNODE = "lock";
   // Our handle on zookeeper
   private final ZooKeeperWatcher zookeeper;
-  // Map of addresses of peer clusters with their ZKW
-  private Map<String, ZooKeeperWatcher> peerClusters;
+  // Map of peer clusters keyed by their id
+  private Map<String, ReplicationPeer> peerClusters;
   // Path to the root replication znode
   private String replicationZNode;
   // Path to the peer clusters znode
@@ -92,16 +91,22 @@ public class ReplicationZookeeper {
   private String rsServerNameZnode;
   // Name node if the replicationState znode
   private String replicationStateNodeName;
-  // If this RS is part of a master cluster
-  private boolean replicationMaster;
   private final Configuration conf;
   // Is this cluster replicating at the moment?
   private AtomicBoolean replicating;
   // Byte (stored as string here) that identifies this cluster
   private String clusterId;
+  // The key to our own cluster
+  private String ourClusterKey;
   // Abortable
   private Abortable abortable;
 
+  /**
+   * Constructor used by clients of replication (like master and HBase clients)
+   * @param conf  conf to use
+   * @param zk    zk connection to use
+   * @throws IOException
+   */
   public ReplicationZookeeper(final Configuration conf, final ZooKeeperWatcher zk)
     throws KeeperException {
 
@@ -125,22 +130,18 @@ public class ReplicationZookeeper {
     this.conf = server.getConfiguration();
     setZNodes();
 
-    this.peerClusters = new HashMap<String, ZooKeeperWatcher>();
+    this.peerClusters = new HashMap<String, ReplicationPeer>();
     this.replicating = replicating;
-    setReplicating();
+    ZKUtil.createWithParents(this.zookeeper,
+        ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
+    readReplicationStateZnode();
     this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
     ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
     // Set a tracker on replicationStateNodeNode
     ReplicationStatusTracker tracker =
         new ReplicationStatusTracker(this.zookeeper, server);
     tracker.start();
-
-    List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
-    if (znodes != null) {
-      for (String z : znodes) {
-        connectToPeer(z);
-      }
-    }
+    connectExistingPeers();
   }
 
   private void setZNodes() throws KeeperException {
@@ -156,27 +157,44 @@ public class ReplicationZookeeper {
         conf.get("zookeeper.znode.replication.clusterId", "clusterId");
     String rsZNodeName =
         conf.get("zookeeper.znode.replication.rs", "rs");
-    String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
+    this.ourClusterKey = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
           this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
           this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
     this.replicationZNode =
       ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
     this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
+    ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
     this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
+    ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
 
     String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName);
     byte [] data = ZKUtil.getData(this.zookeeper, znode);
     String idResult = Bytes.toString(data);
     this.clusterId = idResult == null?
       Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult;
+  }
 
-    znode = ZKUtil.joinZNode(this.replicationZNode, repMasterZNodeName);
-    data = ZKUtil.getData(this.zookeeper, znode);
-    String address = Bytes.toString(data);
-    this.replicationMaster = thisCluster.equals(address);
-    LOG.info("This cluster (" + thisCluster + ") is a " +
-      (this.replicationMaster ? "master" : "slave") + " for replication" +
-        ", compared with (" + address + ")");
+  private void connectExistingPeers() throws IOException, KeeperException {
+    List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+    if (znodes != null) {
+      for (String z : znodes) {
+        connectToPeer(z);
+      }
+    }
+  }
+
+  /**
+   * List this cluster's peers' IDs
+   * @return list of all peers' identifiers
+   */
+  public List<String> listPeersIdsAndWatch() {
+    List<String> ids = null;
+    try {
+      ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Cannot get the list of peers ", e);
+    }
+    return ids;
   }
 
   /**
@@ -185,16 +203,32 @@ public class ReplicationZookeeper {
    * @param peerClusterId (byte) the cluster to interrogate
    * @return addresses of all region servers
    */
-  public List<HServerAddress> getPeersAddresses(String peerClusterId)
+  public List<HServerAddress> getSlavesAddresses(String peerClusterId)
       throws KeeperException {
     if (this.peerClusters.size() == 0) {
       return new ArrayList<HServerAddress>(0);
     }
-    ZooKeeperWatcher zkw = this.peerClusters.get(peerClusterId);
-    
-    return zkw == null?
-      new ArrayList<HServerAddress>(0):
-      ZKUtil.listChildrenAndGetAsAddresses(zkw, zkw.rsZNode);
+    ReplicationPeer peer = this.peerClusters.get(peerClusterId);
+    if (peer == null) {
+      return new ArrayList<HServerAddress>(0);
+    }
+    peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
+    return peer.getRegionServers();
+  }
+
+  /**
+   * Get the list of all the region servers from the specified peer
+   * @param zkw zk connection to use
+   * @return list of region server addresses
+   */
+  private List<HServerAddress> fetchSlavesAddresses(ZooKeeperWatcher zkw) {
+    List<HServerAddress> rss = null;
+    try {
+      rss = ZKUtil.listChildrenAndGetAsAddresses(zkw, zkw.rsZNode);
+    } catch (KeeperException e) {
+      LOG.warn("Cannot get peer's region server addresses", e);
+    }
+    return rss;
   }
 
   /**
@@ -203,44 +237,146 @@ public class ReplicationZookeeper {
    * @param peerId id of the peer cluster
    * @throws KeeperException 
    */
-  private void connectToPeer(String peerId) throws IOException, KeeperException {
+  public boolean connectToPeer(String peerId)
+      throws IOException, KeeperException {
+    if (peerClusters == null) {
+      return false;
+    }
+    if (this.peerClusters.containsKey(peerId)) {
+      return false;
+      // TODO remove when we support it
+    } else if (this.peerClusters.size() > 0) {
+      LOG.warn("Multiple slaves feature not supported");
+      return false;
+    }
+    ReplicationPeer peer = getPeer(peerId);
+    if (peer == null) {
+      return false;
+    }
+    this.peerClusters.put(peerId, peer);
+    ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
+        this.rsServerNameZnode, peerId));
+    LOG.info("Added new peer cluster " + peer.getClusterKey());
+    return true;
+  }
+
+  /**
+   * Helper method to connect to a peer
+   * @param peerId peer's identifier
+   * @return object representing the peer
+   * @throws IOException
+   * @throws KeeperException
+   */
+  private ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
     String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
     byte [] data = ZKUtil.getData(this.zookeeper, znode);
-    String [] ensemble = Bytes.toString(data).split(":");
+    String otherClusterKey = Bytes.toString(data);
+    if (this.ourClusterKey.equals(otherClusterKey)) {
+      LOG.debug("Not connecting to " + peerId + " because it's us");
+      return null;
+    }
+    String[] ensemble = otherClusterKey.split(":");
     if (ensemble.length != 3) {
-      throw new IllegalArgumentException("Wrong format of cluster address: " +
+      LOG.warn("Wrong format of cluster address: " +
         Bytes.toStringBinary(data));
+      return null;
     }
+    // Construct the connection to the new peer
     Configuration otherConf = new Configuration(this.conf);
     otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
     otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
     otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
     ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf,
         "connection to cluster: " + peerId, this.abortable);
-    this.peerClusters.put(peerId, zkw);
-    ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
-        this.rsServerNameZnode, peerId));
-    LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
+    return new ReplicationPeer(otherConf, peerId,
+        otherClusterKey, zkw);
   }
 
   /**
-   * This reads the state znode for replication and sets the atomic boolean
+   * Set the new replication state for this cluster
+   * @param newState
+   */
+  public void setReplicating(boolean newState) throws IOException {
+    try {
+      ZKUtil.createWithParents(this.zookeeper,
+        ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
+      ZKUtil.setData(this.zookeeper,
+          ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName),
+          Bytes.toBytes(Boolean.toString(newState)));
+    } catch (KeeperException e) {
+      throw new IOException("Unable to set the replication state", e);
+    }
+  }
+
+  /**
+   * Remove the peer from zookeeper. which will trigger the watchers on every
+   * region server and close their sources
+   * @param id
+   * @throws IllegalArgumentException Thrown when the peer doesn't exist
    */
-  private void setReplicating() {
+  public void removePeer(String id) throws IOException {
     try {
-      byte [] data = ZKUtil.getDataAndWatch(this.zookeeper, getRepStateNode());
-      String value = Bytes.toString(data);
-      if (value == null) LOG.info(getRepStateNode() + " data is null");
-      else {
-        this.replicating.set(Boolean.parseBoolean(value));
-        LOG.info("Replication is now " + (this.replicating.get()?
-          "started" : "stopped"));
+      if (!peerExists(id)) {
+        throw new IllegalArgumentException("Cannot remove inexisting peer");
       }
+      ZKUtil.deleteNode(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+    } catch (KeeperException e) {
+      throw new IOException("Unable to remove a peer", e);
+    }
+  }
+
+  /**
+   * Add a new peer to this cluster
+   * @param id peer's identifier
+   * @param clusterKey ZK ensemble's addresses, client port and root znode
+   * @throws IllegalArgumentException Thrown when the peer doesn't exist
+   * @throws IllegalStateException Thrown when a peer already exists, since
+   *         multi-slave isn't supported yet.
+   */
+  public void addPeer(String id, String clusterKey) throws IOException {
+    try {
+      if (peerExists(id)) {
+        throw new IllegalArgumentException("Cannot add existing peer");
+      } else if (countPeers() > 0) {
+        throw new IllegalStateException("Multi-slave isn't supported yet");
+      }
+      ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+      ZKUtil.createAndWatch(this.zookeeper,
+          ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey));
+    } catch (KeeperException e) {
+      throw new IOException("Unable to add peer", e);
+    }
+  }
+
+  private boolean peerExists(String id) throws KeeperException {
+    return ZKUtil.checkExists(this.zookeeper,
+          ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
+  }
+
+  private int countPeers() throws KeeperException {
+    List<String> peers =
+        ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+    return peers == null ? 0 : peers.size();
+  }
+
+  /**
+   * This reads the state znode for replication and sets the atomic boolean
+   */
+  private void readReplicationStateZnode() {
+    try {
+      this.replicating.set(getReplication());
+      LOG.info("Replication is now " + (this.replicating.get()?
+        "started" : "stopped"));
     } catch (KeeperException e) {
       this.abortable.abort("Failed getting data on from " + getRepStateNode(), e);
     }
   }
 
+  public boolean getReplication() throws KeeperException {
+    byte [] data = ZKUtil.getDataAndWatch(this.zookeeper, getRepStateNode());
+    return Boolean.parseBoolean(Bytes.toString(data));
+  }
+
   private String getRepStateNode() {
     return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
   }
@@ -303,13 +439,8 @@ public class ReplicationZookeeper {
   public List<String> getRegisteredRegionServers() {
     List<String> result = null;
     try {
-      List<ZKUtil.NodeAndData> nads =
-          ZKUtil.watchAndGetNewChildren(this.zookeeper, this.zookeeper.rsZNode);
-      result = new ArrayList<String>(nads.size());
-      for (ZKUtil.NodeAndData nad : nads) {
-        String[] fullPath = nad.getNode().split("/");
-        result.add(fullPath[fullPath.length - 1]);
-      }
+      result = ZKUtil.listChildrenAndWatchThem(
+          this.zookeeper, this.zookeeper.rsZNode);
     } catch (KeeperException e) {
       this.abortable.abort("Get list of registered region servers", e);
     }
@@ -442,10 +573,14 @@ public class ReplicationZookeeper {
    * Delete a complete queue of hlogs
    * @param peerZnode znode of the peer cluster queue of hlogs to delete
    */
-  public void deleteSource(String peerZnode) {
+  public void deleteSource(String peerZnode, boolean closeConnection) {
     try {
       ZKUtil.deleteNodeRecursively(this.zookeeper,
           ZKUtil.joinZNode(rsServerNameZnode, peerZnode));
+      if (closeConnection) {
+        this.peerClusters.get(peerZnode).getZkw().close();
+        this.peerClusters.remove(peerZnode);
+      }
     } catch (KeeperException e) {
       this.abortable.abort("Failed delete of " + peerZnode, e);
     }
@@ -506,24 +641,39 @@ public class ReplicationZookeeper {
 
   /**
    * Get a map of all peer clusters
-   * @return map of peer cluster, zk address to ZKW
+   * @return map of peer cluster keyed by id
    */
-  public Map<String, ZooKeeperWatcher> getPeerClusters() {
+  public Map<String, ReplicationPeer> getPeerClusters() {
     return this.peerClusters;
   }
 
-  public String getRSZNode() {
-    return rsZNode;
+  /**
+   * Extracts the znode name of a peer cluster from a ZK path
+   * @param fullPath Path to extract the id from
+   * @return the id or an empty string if path is invalid
+   */
+  public static String getZNodeName(String fullPath) {
+    String[] parts = fullPath.split("/");
+    return parts.length > 0 ? parts[parts.length-1] : "";
   }
 
   /**
-   * 
-   * @return
+   * Get this cluster's zk connection
+   * @return zk connection
    */
   public ZooKeeperWatcher getZookeeperWatcher() {
     return this.zookeeper;
   }
 
+
+  /**
+   * Get the full path to the peers' znode
+   * @return path to peers in zk
+   */
+  public String getPeersZNode() {
+    return peersZNode;
+  }
+
   /**
    * Tracker for status of the replication
    */
@@ -536,7 +686,7 @@ public class ReplicationZookeeper {
     @Override
     public synchronized void nodeDataChanged(String path) {
       super.nodeDataChanged(path);
-      setReplicating();
+      readReplicationStateZnode();
     }
   }
 }
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Oct 20 00:21:08 2010
@@ -126,6 +126,9 @@ public class ReplicationSource extends T
   private volatile boolean running = true;
   // Metrics for this source
   private ReplicationSourceMetrics metrics;
+  // If source is enabled, replication happens. If disabled, nothing will be
+  // replicated but HLogs will still be queued
+  private AtomicBoolean sourceEnabled = new AtomicBoolean();
 
   /**
    * Instantiation method used by region servers
@@ -199,7 +202,7 @@ public class ReplicationSource extends T
   private void chooseSinks() throws KeeperException {
     this.currentPeers.clear();
     List<HServerAddress> addresses =
-        this.zkHelper.getPeersAddresses(peerClusterId);
+        this.zkHelper.getSlavesAddresses(peerClusterId);
     Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
     int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
     LOG.info("Getting " + nbPeers +
@@ -236,14 +239,20 @@ public class ReplicationSource extends T
         this.position = this.zkHelper.getHLogRepPosition(
             this.peerClusterZnode, this.queue.peek().getName());
       } catch (KeeperException e) {
-        LOG.error("Couldn't get the position of this recovered queue " +
+        this.terminate("Couldn't get the position of this recovered queue " +
             peerClusterZnode, e);
-        this.abort();
       }
     }
     int sleepMultiplier = 1;
     // Loop until we close down
     while (!stopper.isStopped() && this.running) {
+      // Sleep until replication is enabled again
+      if (!this.replicating.get() || !this.sourceEnabled.get()) {
+        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+        continue;
+      }
       // Get a new path
       if (!getNextPath()) {
         if (sleepForRetries("No log to process", sleepMultiplier)) {
@@ -419,7 +428,7 @@ public class ReplicationSource extends T
    */
   protected boolean openReader(int sleepMultiplier) {
     try {
-      LOG.info("Opening log for replication " + this.currentPath.getName() +
+      LOG.debug("Opening log for replication " + this.currentPath.getName() +
           " at " + this.position);
       try {
        this.reader = null;
@@ -445,6 +454,12 @@ public class ReplicationSource extends T
           // TODO What happens if the log was missing from every single location?
           // Although we need to check a couple of times as the log could have
           // been moved by the master between the checks
+          // It can also happen if a recovered queue wasn't properly cleaned,
+          // such that the znode pointing to a log exists but the log was
+          // deleted a long time ago.
+          // For the moment, we'll throw the IO and processEndOfFile
+          throw new IOException("File from recovered queue is " +
+              "nowhere to be found", fnfe);
         } else {
           // If the log was archived, continue reading from there
           Path archivedLogLocation =
@@ -590,7 +605,7 @@ public class ReplicationSource extends T
       return true;
     } else if (this.queueRecovered) {
       this.manager.closeRecoveredQueue(this);
-      this.abort();
+      this.terminate("Finished recovering the queue");
       return true;
     }
     return false;
@@ -601,25 +616,26 @@ public class ReplicationSource extends T
     Thread.UncaughtExceptionHandler handler =
         new Thread.UncaughtExceptionHandler() {
           public void uncaughtException(final Thread t, final Throwable e) {
-            LOG.fatal("Set stop flag in " + t.getName(), e);
-            abort();
+            terminate("Uncaught exception during runtime", new Exception(e));
           }
         };
     Threads.setDaemonThreadRunning(
         this, n + ".replicationSource," + peerClusterZnode, handler);
   }
 
-  /**
-   * Hastily stop the replication, then wait for shutdown
-   */
-  private void abort() {
-    LOG.info("abort");
-    this.running = false;
-    terminate();
+  public void terminate(String reason) {
+    terminate(reason, null);
   }
 
-  public void terminate() {
-    LOG.info("terminate");
+  public void terminate(String reason, Exception cause) {
+    if (cause == null) {
+      LOG.error("Closing source " + this.peerClusterZnode
+          + " because an error occurred: " + reason, cause);
+    } else {
+      LOG.info("Closing source "
+          + this.peerClusterZnode + " because: " + reason);
+    }
+    this.running = false;
     Threads.shutdown(this, this.sleepForRetries);
   }
 
@@ -663,23 +679,22 @@ public class ReplicationSource extends T
     return down;
   }
 
-  /**
-   * Get the id that the source is replicating to
-   *
-   * @return peer cluster id
-   */
   public String getPeerClusterZnode() {
     return this.peerClusterZnode;
   }
 
-  /**
-   * Get the path of the current HLog
-   * @return current hlog's path
-   */
+  public String getPeerClusterId() {
+    return this.peerClusterId;
+  }
+
   public Path getCurrentPath() {
     return this.currentPath;
   }
 
+  public void setSourceEnabled(boolean status) {
+    this.sourceEnabled.set(status);
+  }
+
   /**
    * Comparator used to compare logs together based on their start time
    */

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java Wed Oct 20 00:21:08 2010
@@ -68,8 +68,16 @@ public interface ReplicationSourceInterf
 
   /**
    * End the replication
+   * @param reason why it's terminating
    */
-  public void terminate();
+  public void terminate(String reason);
+
+  /**
+   * End the replication
+   * @param reason why it's terminating
+   * @param cause the error that's causing it
+   */
+  public void terminate(String reason, Exception cause);
 
   /**
    * Get the id that the source is replicating to
@@ -77,4 +85,17 @@ public interface ReplicationSourceInterf
    * @return peer cluster id
    */
   public String getPeerClusterZnode();
+
+  /**
+   * Get the id that the source is replicating to.
+   *
+   * @return peer cluster id
+   */
+  public String getPeerClusterId();
+
+  /**
+   * Set if this source is enabled or disabled
+   * @param status the new status
+   */
+  public void setSourceEnabled(boolean status);
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Wed Oct 20 00:21:08 2010
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.Stoppable
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * This class is responsible to manage all the replication
@@ -108,6 +109,9 @@ public class ReplicationSourceManager {
         new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher()));
     List<String> otherRSs =
         this.zkHelper.getRegisteredRegionServers();
+    this.zkHelper.registerRegionServerListener(
+        new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
+    this.zkHelper.listPeersIdsAndWatch();
     this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
   }
 
@@ -144,8 +148,7 @@ public class ReplicationSourceManager {
    */
   public void init() throws IOException {
     for (String id : this.zkHelper.getPeerClusters().keySet()) {
-      ReplicationSourceInterface src = addSource(id);
-      src.startup();
+      addSource(id);
     }
     List<String> currentReplicators = this.zkHelper.getRegisteredRegionServers();
     if (currentReplicators == null || currentReplicators.size() == 0) {
@@ -168,20 +171,24 @@ public class ReplicationSourceManager {
   /**
    * Add a new normal source to this region server
    * @param id the id of the peer cluster
-   * @return the created source
+   * @return the source that was created
    * @throws IOException
    */
   public ReplicationSourceInterface addSource(String id) throws IOException {
     ReplicationSourceInterface src =
         getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
-    this.sources.add(src);
+    // TODO set it to what's in ZK
+    src.setSourceEnabled(true);
     synchronized (this.hlogs) {
+      this.sources.add(src);
       if (this.hlogs.size() > 0) {
-        this.zkHelper.addLogToList(this.hlogs.first(),
+        // Add the latest hlog to that source's queue
+        this.zkHelper.addLogToList(this.hlogs.last(),
             this.sources.get(0).getPeerClusterZnode());
         src.enqueueLog(this.latestPath);
       }
     }
+    src.startup();
     return src;
   }
 
@@ -193,7 +200,7 @@ public class ReplicationSourceManager {
       this.zkHelper.deleteOwnRSZNode();
     }
     for (ReplicationSourceInterface source : this.sources) {
-      source.terminate();
+      source.terminate("Region server is closing");
     }
   }
 
@@ -214,6 +221,11 @@ public class ReplicationSourceManager {
   }
 
   void logRolled(Path newLog) {
+    if (!this.replicating.get()) {
+      LOG.warn("Replication stopped, won't add new log");
+      return;
+    }
+    
     if (this.sources.size() > 0) {
       this.zkHelper.addLogToList(newLog.getName(),
           this.sources.get(0).getPeerClusterZnode());
@@ -300,10 +312,16 @@ public class ReplicationSourceManager {
       try {
         ReplicationSourceInterface src = getReplicationSource(this.conf,
             this.fs, this, this.stopper, this.replicating, peerId);
+        if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
+          src.terminate("Recovered queue doesn't belong to any current peer");
+          break;
+        }
         this.oldsources.add(src);
         for (String hlog : entry.getValue()) {
           src.enqueueLog(new Path(this.oldLogDir, hlog));
         }
+        // TODO set it to what's in ZK
+        src.setSourceEnabled(true);
         src.startup();
       } catch (IOException e) {
         // TODO manage it
@@ -319,7 +337,46 @@ public class ReplicationSourceManager {
   public void closeRecoveredQueue(ReplicationSourceInterface src) {
     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
     this.oldsources.remove(src);
-    this.zkHelper.deleteSource(src.getPeerClusterZnode());
+    this.zkHelper.deleteSource(src.getPeerClusterZnode(), false);
+  }
+
+  /**
+   * Thie method first deletes all the recovered sources for the specified
+   * id, then deletes the normal source (deleting all related data in ZK).
+   * @param id The id of the peer cluster
+   */
+  public void removePeer(String id) {
+    LOG.info("Closing the following queue " + id + ", currently have "
+        + sources.size() + " and another "
+        + oldsources.size() + " that were recovered");
+    ReplicationSourceInterface srcToRemove = null;
+    List<ReplicationSourceInterface> oldSourcesToDelete =
+        new ArrayList<ReplicationSourceInterface>();
+    // First close all the recovered sources for this peer
+    for (ReplicationSourceInterface src : oldsources) {
+      if (id.equals(src.getPeerClusterId())) {
+        oldSourcesToDelete.add(src);
+      }
+    }
+    for (ReplicationSourceInterface src : oldSourcesToDelete) {
+      closeRecoveredQueue((src));
+    }
+    LOG.info("Number of deleted recovered sources for " + id + ": "
+        + oldSourcesToDelete.size());
+    // Now look for the one on this cluster
+    for (ReplicationSourceInterface src : this.sources) {
+      if (id.equals(src.getPeerClusterId())) {
+        srcToRemove = src;
+        break;
+      }
+    }
+    if (srcToRemove == null) {
+      LOG.error("The queue we wanted to close is missing " + id);
+      return;
+    }
+    srcToRemove.terminate("Replication stream was removed by a user");
+    this.sources.remove(srcToRemove);
+    this.zkHelper.deleteSource(id, true);
   }
 
   /**
@@ -354,8 +411,7 @@ public class ReplicationSourceManager {
         return;
       }
       LOG.info(path + " znode expired, trying to lock it");
-      String[] rsZnodeParts = path.split("/");
-      transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
+      transferQueues(zkHelper.getZNodeName(path));
     }
 
     /**
@@ -384,6 +440,70 @@ public class ReplicationSourceManager {
   }
 
   /**
+   * Watcher used to follow the creation and deletion of peer clusters.
+   */
+  public class PeersWatcher extends ZooKeeperListener {
+
+    /**
+     * Construct a ZooKeeper event listener.
+     */
+    public PeersWatcher(ZooKeeperWatcher watcher) {
+      super(watcher);
+    }
+
+    /**
+     * Called when a node has been deleted
+     * @param path full path of the deleted node
+     */
+    public void nodeDeleted(String path) {
+      List<String> peers = refreshPeersList(path);
+      if (peers == null) {
+        return;
+      }
+      String id = zkHelper.getZNodeName(path);
+      removePeer(id);
+    }
+
+    /**
+     * Called when an existing node has a child node added or removed.
+     * @param path full path of the node whose children have changed
+     */
+    public void nodeChildrenChanged(String path) {
+      List<String> peers = refreshPeersList(path);
+      if (peers == null) {
+        return;
+      }
+      for (String id : peers) {
+        try {
+          boolean added = zkHelper.connectToPeer(id);
+          if (added) {
+            addSource(id);
+          }
+        } catch (IOException e) {
+          // TODO manage better than that ?
+          LOG.error("Error while adding a new peer", e);
+        } catch (KeeperException e) {
+          LOG.error("Error while adding a new peer", e);
+        }
+      }
+    }
+
+    /**
+     * Verify if this event is meant for us, and if so then get the latest
+     * peers' list from ZK. Also reset the watches.
+     * @param path path to check against
+     * @return A list of peers' identifiers if the event concerns this watcher,
+     * else null.
+     */
+    private List<String> refreshPeersList(String path) {
+      if (!path.startsWith(zkHelper.getPeersZNode())) {
+        return null;
+      }
+      return zkHelper.listPeersIdsAndWatch();
+    }
+  }
+
+  /**
    * Get the directory where hlogs are archived
    * @return the directory where hlogs are archived
    */

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Wed Oct 20 00:21:08 2010
@@ -276,6 +276,27 @@ public class ZKUtil {
   }
 
   /**
+   * List all the children of the specified znode, setting a watch for children
+   * changes and also setting a watch on every individual child in order to get
+   * the NodeCreated and NodeDeleted events.
+   * @param zkw zookeeper reference
+   * @param znode node to get children of and watch
+   * @return list of znode names, null if the node doesn't exist
+   * @throws KeeperException
+   */
+  public static List<String> listChildrenAndWatchThem(ZooKeeperWatcher zkw, 
+      String znode) throws KeeperException {
+    List<String> children = listChildrenAndWatchForNewChildren(zkw, znode);
+    if (children == null) {
+      return null;
+    }
+    for (String child : children) {
+      watchAndCheckExists(zkw, joinZNode(znode, child));
+    }
+    return children;
+  }
+
+  /**
    * Lists the children of the specified znode, retrieving the data of each
    * child as a server address.
    *

Modified: hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html (original)
+++ hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html Wed Oct 20 00:21:08 2010
@@ -47,7 +47,8 @@ features:
     <li>Replication of scoped families in user tables.</li>
     <li>Start/stop replication stream.</li>
     <li>Supports clusters of different sizes.</li>
-    <li>Handling of partitions longer than 10 minutes</li>
+    <li>Handling of partitions longer than 10 minutes.</li>
+    <li>Ability to add/remove slave clusters at runtime.</li>
 </ol>
 Please report bugs on the project's Jira when found.
 <p>
@@ -80,7 +81,7 @@ Before trying out replication, make sure
 <p>
 
 The following steps describe how to enable replication from a cluster
-to another. This must be done with both clusters offlined.
+to another.
 <ol>
     <li>Edit ${HBASE_HOME}/conf/hbase-site.xml on both cluster to add
     the following configurations:
@@ -90,15 +91,13 @@ to another. This must be done with both 
   &lt;value&gt;true&lt;/value&gt;
 &lt;/property&gt;</pre>
     </li>
-    <li>Run the following command on any cluster:
-    <pre>
-$HBASE_HOME/bin/hbase org.jruby.Main $HBASE_HOME/bin/replication/add_peer.tb</pre>
+    <li>Run the following command in the master's shell while it's running
+    <pre>add_peer</pre>
     This will show you the help to setup the replication stream between
     both clusters. If both clusters use the same Zookeeper cluster, you have
     to use a different <b>zookeeper.znode.parent</b> since they can't
     write in the same folder.
     </li>
-    <li>You can now start and stop the clusters with your preferred method.</li>
 </ol>
 
 You can confirm that your setup works by looking at any region server's log
@@ -115,12 +114,11 @@ was chosen for replication.<br><br>
 Should you want to stop the replication while the clusters are running, open
 the shell on the master cluster and issue this command:
 <pre>
-hbase(main):001:0> zk 'set /zookeeper.znode.parent/replication/state false'</pre>
+hbase(main):001:0> stop_replication</pre>
 
-Where you replace the znode parent with the one configured on your master
-cluster. Replication of already queued edits will still happen after you
+Replication of already queued edits will still happen after you
 issued that command but new entries won't be. To start it back, simply replace
-"false" with "true" in the command.
+"false" with "true" in the command. 
 
 <p>
 

Modified: hbase/trunk/src/main/ruby/hbase.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/hbase.rb?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/hbase.rb (original)
+++ hbase/trunk/src/main/ruby/hbase.rb Wed Oct 20 00:21:08 2010
@@ -72,3 +72,4 @@ end
 require 'hbase/hbase'
 require 'hbase/admin'
 require 'hbase/table'
+require 'hbase/replication_admin'

Modified: hbase/trunk/src/main/ruby/hbase/hbase.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/hbase/hbase.rb?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/hbase/hbase.rb (original)
+++ hbase/trunk/src/main/ruby/hbase/hbase.rb Wed Oct 20 00:21:08 2010
@@ -49,5 +49,10 @@ module Hbase
     def table(table, formatter)
       ::Hbase::Table.new(configuration, table, formatter)
     end
+
+    def replication_admin(formatter)
+      ::Hbase::RepAdmin.new(configuration, formatter)
+    end
+
   end
 end

Added: hbase/trunk/src/main/ruby/hbase/replication_admin.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/hbase/replication_admin.rb?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/ruby/hbase/replication_admin.rb (added)
+++ hbase/trunk/src/main/ruby/hbase/replication_admin.rb Wed Oct 20 00:21:08 2010
@@ -0,0 +1,72 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+include Java
+
+java_import org.apache.hadoop.hbase.client.replication.ReplicationAdmin
+
+# Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
+
+module Hbase
+  class RepAdmin
+    include HBaseConstants
+
+    def initialize(configuration, formatter)
+      @replication_admin = ReplicationAdmin.new(configuration)
+      @formatter = formatter
+    end
+
+    #----------------------------------------------------------------------------------------------
+    # Add a new peer cluster to replicate to
+    def add_peer(id, cluster_key)
+      @replication_admin.addPeer(id, cluster_key)
+    end
+
+    #----------------------------------------------------------------------------------------------
+    # Remove a peer cluster, stops the replication
+    def remove_peer(id)
+      @replication_admin.removePeer(id)
+    end
+
+    #----------------------------------------------------------------------------------------------
+    # Restart the replication stream to the specified peer
+    def enable_peer(id)
+      @replication_admin.enablePeer(id)
+    end
+
+    #----------------------------------------------------------------------------------------------
+    # Stop the replication stream to the specified peer
+    def disable_peer(id)
+      @replication_admin.disablePeer(id)
+    end
+
+    #----------------------------------------------------------------------------------------------
+    # Restart the replication, in an unknown state
+    def start_replication
+      @replication_admin.setReplicating(true)
+    end
+
+    #----------------------------------------------------------------------------------------------
+    # Kill switch for replication, stops all its features
+    def stop_replication
+      @replication_admin.setReplicating(false)
+    end
+  end
+end

Modified: hbase/trunk/src/main/ruby/shell.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell.rb?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/shell.rb (original)
+++ hbase/trunk/src/main/ruby/shell.rb Wed Oct 20 00:21:08 2010
@@ -83,6 +83,10 @@ module Shell
       hbase.table(name, formatter)
     end
 
+    def hbase_replication_admin
+      @hbase_replication_admin ||= hbase.replication_admin(formatter)
+    end
+
     def export_commands(where)
       ::Shell.commands.keys.each do |cmd|
         where.send :instance_eval, <<-EOF
@@ -254,3 +258,17 @@ Shell.load_command_group(
   ]
 )
 
+Shell.load_command_group(
+  'replication',
+  :full_name => 'CLUSTER REPLICATION TOOLS',
+  :comment => "In order to use these tools, hbase.replication must be true. enabling/disabling is currently unsupported",
+  :commands => %w[
+    add_peer
+    remove_peer
+    enable_peer
+    disable_peer
+    start_replication
+    stop_replication
+  ]
+)
+

Modified: hbase/trunk/src/main/ruby/shell/commands.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands.rb?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands.rb (original)
+++ hbase/trunk/src/main/ruby/shell/commands.rb Wed Oct 20 00:21:08 2010
@@ -49,6 +49,10 @@ module Shell
         shell.hbase_table(name)
       end
 
+      def replication_admin
+        shell.hbase_replication_admin
+      end
+
       #----------------------------------------------------------------------
 
       def formatter

Added: hbase/trunk/src/main/ruby/shell/commands/add_peer.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/add_peer.rb?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/add_peer.rb (added)
+++ hbase/trunk/src/main/ruby/shell/commands/add_peer.rb Wed Oct 20 00:21:08 2010
@@ -0,0 +1,44 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class AddPeer< Command
+      def help
+        return <<-EOF
+          Add a peer cluster to replicate to, the id must be a short and
+          the cluster key is composed like this:
+          hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
+          This gives a full path for HBase to connect to another cluster.
+          Examples:
+
+            hbase> add_peer '1', "server1.cie.com:2181:/hbase"
+            hbase> add_peer '2', "zk1,zk2,zk3:2182:/hbase-prod"
+        EOF
+      end
+
+      def command(id, cluster_key)
+        format_simple_command do
+          replication_admin.add_peer(id, cluster_key)
+        end
+      end
+    end
+  end
+end

Added: hbase/trunk/src/main/ruby/shell/commands/disable_peer.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/disable_peer.rb?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/disable_peer.rb (added)
+++ hbase/trunk/src/main/ruby/shell/commands/disable_peer.rb Wed Oct 20 00:21:08 2010
@@ -0,0 +1,44 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class DisablePeer< Command
+      def help
+        return <<-EOF
+          Stops the replication stream to the specified cluster, but still
+          keeps track of new edits to replicate.
+
+          CURRENTLY UNSUPPORTED
+
+          Examples:
+
+            hbase> disable_peer '1'
+        EOF
+      end
+
+      def command(id)
+        format_simple_command do
+          replication_admin.disable_peer(id)
+        end
+      end
+    end
+  end
+end

Added: hbase/trunk/src/main/ruby/shell/commands/enable_peer.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/enable_peer.rb?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/enable_peer.rb (added)
+++ hbase/trunk/src/main/ruby/shell/commands/enable_peer.rb Wed Oct 20 00:21:08 2010
@@ -0,0 +1,44 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class EnablePeer< Command
+      def help
+        return <<-EOF
+          Restarts the replication to the specified peer cluster,
+          continuing from where it was disabled.
+
+          CURRENTLY UNSUPPORTED
+
+          Examples:
+
+            hbase> enable_peer '1'
+        EOF
+      end
+
+      def command(id)
+        format_simple_command do
+          replication_admin.enable_peer(id)
+        end
+      end
+    end
+  end
+end

Added: hbase/trunk/src/main/ruby/shell/commands/remove_peer.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/remove_peer.rb?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/remove_peer.rb (added)
+++ hbase/trunk/src/main/ruby/shell/commands/remove_peer.rb Wed Oct 20 00:21:08 2010
@@ -0,0 +1,41 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class RemovePeer< Command
+      def help
+        return <<-EOF
+          Stops the specified replication stream and deletes all the meta
+          information kept about it.
+          Examples:
+
+            hbase> remove_peer '1'
+        EOF
+      end
+
+      def command(id)
+        format_simple_command do
+          replication_admin.remove_peer(id)
+        end
+      end
+    end
+  end
+end

Added: hbase/trunk/src/main/ruby/shell/commands/start_replication.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/start_replication.rb?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/start_replication.rb (added)
+++ hbase/trunk/src/main/ruby/shell/commands/start_replication.rb Wed Oct 20 00:21:08 2010
@@ -0,0 +1,43 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class StartReplication < Command
+      def help
+        return <<-EOF
+          Restarts all the replication features. The state in which each
+          stream starts in is undetermined.
+          WARNING:
+          start/stop replication is only meant to be used in critical load situations.
+          Examples:
+
+            hbase> start_replication
+        EOF
+      end
+
+      def command
+        format_simple_command do
+          replication_admin.start_replication
+        end
+      end
+    end
+  end
+end

Added: hbase/trunk/src/main/ruby/shell/commands/stop_replication.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/stop_replication.rb?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/stop_replication.rb (added)
+++ hbase/trunk/src/main/ruby/shell/commands/stop_replication.rb Wed Oct 20 00:21:08 2010
@@ -0,0 +1,43 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class StopReplication < Command
+      def help
+        return <<-EOF
+          Stops all the replication features. The state in which each
+          stream stops in is undetermined.
+          WARNING:
+          start/stop replication is only meant to be used in critical load situations.
+          Examples:
+
+            hbase> stop_replication
+        EOF
+      end
+
+      def command
+        format_simple_command do
+          replication_admin.stop_replication
+        end
+      end
+    end
+  end
+end

Modified: hbase/trunk/src/site/xdoc/replication.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/src/site/xdoc/replication.xml?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/site/xdoc/replication.xml (original)
+++ hbase/trunk/src/site/xdoc/replication.xml Wed Oct 20 00:21:08 2010
@@ -31,7 +31,7 @@
       <p>
         HBase replication is a way to copy data between HBase deployments. It
         can serve as a disaster recovery solution and can contribute to provide
-        higher availability at HBase layer. It can also serve more practically;
+        higher availability at the HBase layer. It can also serve more practically;
         for example, as a way to easily copy edits from a web-facing cluster to a "MapReduce"
         cluster which will process old and new data and ship back the results
         automatically.
@@ -97,10 +97,10 @@
         </p>
         <p>
           Synchronously, the region server that receives the edits reads them
-          sequentially and applies them on its own cluster using the HBase client
-          (HTables managed by a HTablePool) automatically. If consecutive rows
-          belong to the same table, they are inserted together in order to
-          leverage parallel insertions.
+          sequentially and separates each of them into buffers, one per table.
+          Once all edits are read, each buffer is flushed using the normal HBase
+          client (HTables managed by a HTablePool). This is done in order to
+          leverage parallel insertion (MultiPut).
         </p>
         <p>
           Back in the master cluster's region server, the offset for the current
@@ -221,23 +221,6 @@
           10 times until trying to find a different sink.
         </p>
       </section>
-      <section name="Applying edits">
-        <p>
-          The sink synchronously applies the edits to its local cluster using
-          the native client API. This method is also synchronized between every
-          incoming sources, which means that only one batch of log entries can be
-          replicated at a time by each slave region server.
-        </p>
-        <p>
-          The sink applies the edits one by one, unless it's able to batch
-          sequential Puts that belong to the same table in order to use the
-          parallel puts feature of HConnectionManager. The Put and Delete objects
-          are recreated by inspecting the incoming WALEdit objects and are
-          with the exact same row, family, qualifier, timestamp, and value (for
-          Put). Note that if the master and slave cluster don't have the same
-          time, time-related issues may occur.
-        </p>
-      </section>
       <section name="Cleaning logs">
         <p>
           If replication isn't enabled, the master's logs cleaning thread will
@@ -402,10 +385,6 @@
       </p>
       <ol>
         <li>
-            HBASE-2688, master-master replication is disabled in the code, we need
-            to enable and test it.
-        </li>
-        <li>
             HBASE-2611, basically if a region server dies while recovering the
             queues of another dead RS, we will miss the data from the queues
             that weren't copied.
@@ -419,9 +398,8 @@
             carry that data and check it.
         </li>
         <li>
-            HBASE-2200, currently all the replication operations (adding or removing
-            streams for example) are done only when the clusters are offline. This
-            should be possible at runtime.
+            HBASE-3130, the master cluster needs to be restarted if its region
+            servers lose their session with a slave cluster.
         </li>
       </ol>
     </section>

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java Wed Oct 20 00:21:08 2010
@@ -0,0 +1,92 @@
+package org.apache.hadoop.hbase.client.replication;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit testing of ReplicationAdmin
+ */
+public class TestReplicationAdmin {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestReplicationAdmin.class);
+  private final static HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+
+  private final String ID_ONE = "1";
+  private final String KEY_ONE = "127.0.0.1:2181:/hbase";
+  private final String ID_SECOND = "2";
+  private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
+
+  private static ReplicationSourceManager manager;
+  private static ReplicationAdmin admin;
+  private static AtomicBoolean replicating = new AtomicBoolean(true);
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniZKCluster();
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+    admin = new ReplicationAdmin(conf);
+    Path oldLogDir = new Path(TEST_UTIL.getTestDir(),
+        HConstants.HREGION_OLDLOGDIR_NAME);
+    Path logDir = new Path(TEST_UTIL.getTestDir(),
+        HConstants.HREGION_LOGDIR_NAME);
+    manager = new ReplicationSourceManager(admin.getReplicationZk(),
+        conf, null, FileSystem.get(conf), replicating, logDir, oldLogDir);
+  }
+
+  /**
+   * Simple testing of adding and removing peers, basically shows that
+   * all interactions with ZK work
+   * @throws Exception
+   */
+  @Test
+  public void testAddRemovePeer() throws Exception {
+    assertEquals(0, manager.getSources().size());
+    // Add a valid peer
+    admin.addPeer(ID_ONE, KEY_ONE);
+    // try adding the same (fails)
+    try {
+      admin.addPeer(ID_ONE, KEY_ONE);
+    } catch (IllegalArgumentException iae) {
+      // OK!
+    }
+    assertEquals(1, admin.getPeersCount());
+    // Try to remove an inexisting peer
+    try {
+      admin.removePeer(ID_SECOND);
+      fail();
+    } catch (IllegalArgumentException iae) {
+      // OK!
+    }
+    assertEquals(1, admin.getPeersCount());
+    // Add a second, returns illegal since multi-slave isn't supported
+    try {
+      admin.addPeer(ID_SECOND, KEY_SECOND);
+      fail();
+    } catch (IllegalStateException iae) {
+      // OK!
+    }
+    assertEquals(1, admin.getPeersCount());
+    // Remove the first peer we added
+    admin.removePeer(ID_ONE);
+    assertEquals(0, admin.getPeersCount());
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java Wed Oct 20 00:21:08 2010
@@ -63,7 +63,12 @@ public class ReplicationSourceDummy impl
   }
 
   @Override
-  public void terminate() {
+  public void terminate(String reason) {
+
+  }
+
+  @Override
+  public void terminate(String reason, Exception e) {
 
   }
 
@@ -71,4 +76,15 @@ public class ReplicationSourceDummy impl
   public String getPeerClusterZnode() {
     return peerClusterId;
   }
+
+  @Override
+  public String getPeerClusterId() {
+    return peerClusterId;
+  }
+
+  @Override
+  public void setSourceEnabled(boolean status) {
+
+  }
+
 }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java Wed Oct 20 00:21:08 2010
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -59,6 +60,9 @@ public class TestReplication {
   private static ZooKeeperWatcher zkw1;
   private static ZooKeeperWatcher zkw2;
 
+  private static ReplicationAdmin admin;
+  private static String slaveClusterKey;
+
   private static HTable htable1;
   private static HTable htable2;
 
@@ -92,16 +96,12 @@ public class TestReplication {
     conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
     conf1.setBoolean("dfs.support.append", true);
     conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+
     utility1 = new HBaseTestingUtility(conf1);
     utility1.startMiniZKCluster();
     MiniZooKeeperCluster miniZK = utility1.getZkCluster();
     zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null);
-    ZKUtil.createWithParents(zkw1, "/1/replication/master");
-    ZKUtil.createWithParents(zkw1, "/1/replication/state");
-    ZKUtil.setData(zkw1, "/1/replication/master", Bytes.toBytes(
-        conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
-            conf1.get("hbase.zookeeper.property.clientPort")+":/1"));
-    setIsReplication(true);
+    admin = new ReplicationAdmin(conf1);
     LOG.info("Setup first Zk");
 
     conf2 = HBaseConfiguration.create();
@@ -114,12 +114,11 @@ public class TestReplication {
     utility2 = new HBaseTestingUtility(conf2);
     utility2.setZkCluster(miniZK);
     zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null);
-    ZKUtil.createWithParents(zkw2, "/2/replication");
 
-    ZKUtil.createWithParents(zkw1, "/1/replication/peers/2");
-    ZKUtil.setData(zkw1, "/1/replication/peers/2", Bytes.toBytes(
-        conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
-            conf2.get("hbase.zookeeper.property.clientPort")+":/2"));
+    slaveClusterKey = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+            conf2.get("hbase.zookeeper.property.clientPort")+":/2";
+    admin.addPeer("2", slaveClusterKey);
+    setIsReplication(true);
 
     LOG.info("Setup second Zk");
 
@@ -145,9 +144,7 @@ public class TestReplication {
 
   private static void setIsReplication(boolean rep) throws Exception {
     LOG.info("Set rep " + rep);
-    ZKUtil.setData(zkw1,"/1/replication/state",
-        Bytes.toBytes(Boolean.toString(rep)));
-    // Takes some ms for ZK to fire the watcher
+    admin.setReplicating(rep);
     Thread.sleep(SLEEP_TIME);
   }
 
@@ -156,12 +153,31 @@ public class TestReplication {
    */
   @Before
   public void setUp() throws Exception {
-    setIsReplication(false);
     utility1.truncateTable(tableName);
-    utility2.truncateTable(tableName);
-    // If test is flaky, set that sleep higher
-    Thread.sleep(SLEEP_TIME*8);
-    setIsReplication(true);
+    // truncating the table will send on Delete per row to the slave cluster
+    // in an async fashion, which is why we cannot just call truncateTable on
+    // utility2 since late writes could make it to the slave in some way.
+    // Instead, we truncate the first table and wait for all the Deletes to
+    // make it to the slave.
+    Scan scan = new Scan();
+    int lastCount = 0;
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for truncate");
+      }
+      ResultScanner scanner = htable2.getScanner(scan);
+      Result[] res = scanner.next(NB_ROWS_IN_BATCH);
+      scanner.close();
+      if (res.length != 0) {
+       if (lastCount < res.length) {
+          i--; // Don't increment timeout if we make progress
+        }
+        LOG.info("Still got " + res.length + " rows");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
   }
 
   /**
@@ -186,13 +202,12 @@ public class TestReplication {
     htable1 = new HTable(conf1, tableName);
     htable1.put(put);
 
-    HTable table2 = new HTable(conf2, tableName);
     Get get = new Get(row);
     for (int i = 0; i < NB_RETRIES; i++) {
       if (i==NB_RETRIES-1) {
         fail("Waited too much time for put replication");
       }
-      Result res = table2.get(get);
+      Result res = htable2.get(get);
       if (res.size() == 0) {
         LOG.info("Row not available");
         Thread.sleep(SLEEP_TIME);
@@ -205,13 +220,12 @@ public class TestReplication {
     Delete del = new Delete(row);
     htable1.delete(del);
 
-    table2 = new HTable(conf2, tableName);
     get = new Get(row);
     for (int i = 0; i < NB_RETRIES; i++) {
       if (i==NB_RETRIES-1) {
         fail("Waited too much time for del replication");
       }
-      Result res = table2.get(get);
+      Result res = htable2.get(get);
       if (res.size() >= 1) {
         LOG.info("Row not deleted");
         Thread.sleep(SLEEP_TIME);
@@ -334,6 +348,59 @@ public class TestReplication {
   }
 
   /**
+   * Integration test for TestReplicationAdmin, removes and re-add a peer
+   * cluster
+   * @throws Exception
+   */
+  @Test
+  public void testAddAndRemoveClusters() throws Exception {
+    LOG.info("testAddAndRemoveClusters");
+    admin.removePeer("2");
+    Thread.sleep(SLEEP_TIME);
+    byte[] rowKey = Bytes.toBytes("Won't be replicated");
+    Put put = new Put(rowKey);
+    put.add(famName, row, row);
+    htable1.put(put);
+
+    Get get = new Get(rowKey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES-1) {
+        break;
+      }
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        fail("Not supposed to be replicated");
+      } else {
+        LOG.info("Row not replicated, let's wait a bit more...");
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+
+    admin.addPeer("2", slaveClusterKey);
+    Thread.sleep(SLEEP_TIME);
+    rowKey = Bytes.toBytes("do rep");
+    put = new Put(rowKey);
+    put.add(famName, row, row);
+    LOG.info("Adding new row");
+    htable1.put(put);
+
+    get = new Get(rowKey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() == 0) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME*i);
+      } else {
+        assertArrayEquals(res.value(), row);
+        break;
+      }
+    }
+  }
+
+  /**
    * Do a more intense version testSmallBatch, one  that will trigger
    * hlog rolling and other non-trivial code paths
    * @throws Exception

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Wed Oct 20 00:21:08 2010
@@ -64,8 +64,6 @@ public class TestReplicationSourceManage
 
   private static HBaseTestingUtility utility;
 
-  private static final AtomicBoolean REPLICATING = new AtomicBoolean(false);
-
   private static Replication replication;
 
   private static ReplicationSourceManager manager;
@@ -105,14 +103,12 @@ public class TestReplicationSourceManage
 
     zkw = new ZooKeeperWatcher(conf, "test", null);
     ZKUtil.createWithParents(zkw, "/hbase/replication");
-    ZKUtil.createWithParents(zkw, "/hbase/replication/master");
     ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
-    ZKUtil.setData(zkw, "/hbase/replication/master",
-        Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
-        conf.get("hbase.zookeeper.property.clientPort")+":/1"));
     ZKUtil.setData(zkw, "/hbase/replication/peers/1",Bytes.toBytes(
           conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
           conf.get("hbase.zookeeper.property.clientPort")+":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+    ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true"));
 
     replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
     manager = replication.getReplicationManager();