You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2013/05/18 02:00:21 UTC

svn commit: r1484031 [1/2] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ hbase-server/src/m...

Author: jdcryans
Date: Sat May 18 00:00:20 2013
New Revision: 1484031

URL: http://svn.apache.org/r1484031
Log:
HBASE-7567  [replication] Create an interface for replication peers (Chris Trezzo via JD)

Added:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1484031&r1=1484030&r2=1484031&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Sat May 18 00:00:20 2013
@@ -25,10 +25,15 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -79,7 +84,7 @@ public class ReplicationPeer implements 
    */
   public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
       throws KeeperException {
-    ReplicationZookeeper.ensurePeerEnabled(zookeeper, peerStateNode);
+    ensurePeerEnabled(zookeeper, peerStateNode);
     this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
     this.peerStateTracker.start();
     try {
@@ -90,7 +95,7 @@ public class ReplicationPeer implements 
   }
 
   private void readPeerStateZnode() throws DeserializationException {
-    this.peerEnabled.set(ReplicationZookeeper.isStateEnabled(this.peerStateTracker.getData(false)));
+    this.peerEnabled.set(isStateEnabled(this.peerStateTracker.getData(false)));
   }
 
   /**
@@ -182,6 +187,57 @@ public class ReplicationPeer implements 
   }
 
   /**
+   * @param bytes
+   * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
+   * @throws DeserializationException
+   */
+  private static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
+    ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
+    return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
+  }
+
+  /**
+   * @param bytes Content of a state znode.
+   * @return State parsed from the passed bytes.
+   * @throws DeserializationException
+   */
+  private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+      throws DeserializationException {
+    ProtobufUtil.expectPBMagicPrefix(bytes);
+    int pblen = ProtobufUtil.lengthOfPBMagic();
+    ZooKeeperProtos.ReplicationState.Builder builder =
+        ZooKeeperProtos.ReplicationState.newBuilder();
+    ZooKeeperProtos.ReplicationState state;
+    try {
+      state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+      return state.getState();
+    } catch (InvalidProtocolBufferException e) {
+      throw new DeserializationException(e);
+    }
+  }
+
+  /**
+   * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
+   * @param zookeeper
+   * @param path Path to znode to check
+   * @return True if we created the znode.
+   * @throws NodeExistsException
+   * @throws KeeperException
+   */
+  private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
+      throws NodeExistsException, KeeperException {
+    if (ZKUtil.checkExists(zookeeper, path) == -1) {
+      // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+      // peer-state znode. This happens while adding a peer.
+      // The peer state data is set as "ENABLED" by default.
+      ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
+        ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+      return true;
+    }
+    return false;
+  }
+
+  /**
    * Tracker for state of this peer
    */
   public class PeerStateTracker extends ZooKeeperNodeTracker {

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java?rev=1484031&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java Sat May 18 00:00:20 2013
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This provides an interface for maintaining a set of peer clusters. These peers are remote slave
+ * clusters that data is replicated to. A peer cluster can be in three different states:
+ *
+ * 1. Not-Registered - There is no notion of the peer cluster.
+ * 2. Registered - The peer has an id and is being tracked but there is no connection.
+ * 3. Connected - There is an active connection to the remote peer.
+ *
+ * In the registered or connected state, a peer cluster can either be enabled or disabled.
+ */
+@InterfaceAudience.Private
+public interface ReplicationPeers {
+
+  /**
+   * Initialize the ReplicationPeers interface.
+   * @throws KeeperException
+   */
+  public void init() throws IOException, KeeperException;
+
+  /**
+   * Add a new remote slave cluster for replication.
+   * @param peerId a short that identifies the cluster
+   * @param clusterKey the concatenation of the slave cluster's:
+   *          hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
+   */
+  public void addPeer(String peerId, String clusterKey) throws IOException;
+
+  /**
+   * Removes a remote slave cluster and stops the replication to it.
+   * @param peerId a short that identifies the cluster
+   */
+  public void removePeer(String peerId) throws IOException;
+
+  /**
+   * Restart the replication to the specified remote slave cluster.
+   * @param peerId a short that identifies the cluster
+   */
+  public void enablePeer(String peerId) throws IOException;
+
+  /**
+   * Stop the replication to the specified remote slave cluster.
+   * @param peerId a short that identifies the cluster
+   */
+  public void disablePeer(String peerId) throws IOException;
+
+  /**
+   * Get the replication status for the specified connected remote slave cluster.
+   * @param peerId a short that identifies the cluster
+   * @return true if replication is enabled, false otherwise.
+   */
+  public boolean getStatusOfConnectedPeer(String peerId);
+
+  /**
+   * Get a set of all connected remote slave clusters.
+   * @return set of peer ids
+   */
+  public Set<String> getConnectedPeers();
+
+  /**
+   * List the cluster keys of all remote slave clusters (whether they are enabled/disabled or
+   * connected/disconnected).
+   * @return A map of peer ids to peer cluster keys
+   */
+  public Map<String, String> getAllPeerClusterKeys();
+
+  /**
+   * List the peer ids of all remote slave clusters (whether they are enabled/disabled or
+   * connected/disconnected).
+   * @return A list of peer ids
+   */
+  public List<String> getAllPeerIds();
+
+  /**
+   * Attempt to connect to a new remote slave cluster.
+   * @param peerId a short that identifies the cluster
+   * @return true if a new connection was made, false if no new connection was made.
+   */
+  public boolean connectToPeer(String peerId) throws IOException, KeeperException;
+
+  /**
+   * Disconnect from a remote slave cluster.
+   * @param peerId a short that identifies the cluster
+   */
+  public void disconnectFromPeer(String peerId);
+
+  /**
+   * Returns all region servers from given connected remote slave cluster.
+   * @param peerId a short that identifies the cluster
+   * @return addresses of all region servers in the peer cluster. Returns an empty list if the peer
+   *         cluster is unavailable or there are no region servers in the cluster.
+   */
+  public List<ServerName> getRegionServersOfConnectedPeer(String peerId);
+
+  /**
+   * Returns the UUID of the provided peer id.
+   * @param peerId the peer's ID that will be converted into a UUID
+   * @return a UUID or null if the peer cluster does not exist or is not connected.
+   */
+  public UUID getPeerUUID(String peerId);
+
+  /**
+   * Returns the configuration needed to talk to the remote slave cluster.
+   * @param peerId a short that identifies the cluster
+   * @return the configuration for the peer cluster, null if it was unable to get the configuration
+   */
+  public Configuration getPeerConf(String peerId) throws KeeperException;
+}
\ No newline at end of file

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java?rev=1484031&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java Sat May 18 00:00:20 2013
@@ -0,0 +1,409 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * This class provides an implementation of the ReplicationPeers interface using Zookeeper. The
+ * peers znode contains a list of all peer replication clusters and the current replication state of
+ * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
+ * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
+ * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
+ * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
+ * For example:
+ *
+ *  /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
+ *  /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
+ *
+ * Each of these peer znodes has a child znode that indicates whether or not replication is enabled
+ * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
+ * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
+ * ReplicationPeer.PeerStateTracker class. For example:
+ *
+ * /hbase/replication/peers/1/peer-state [Value: ENABLED]
+ */
+public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
+
+  // Map of peer clusters keyed by their id
+  private Map<String, ReplicationPeer> peerClusters;
+
+  private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
+
+  public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
+      Abortable abortable) {
+    super(zk, conf, abortable);
+    this.peerClusters = new HashMap<String, ReplicationPeer>();
+  }
+
+  @Override
+  public void init() throws IOException, KeeperException {
+    ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+    connectExistingPeers();
+  }
+
+  @Override
+  public void addPeer(String id, String clusterKey) throws IOException {
+    try {
+      if (peerExists(id)) {
+        throw new IllegalArgumentException("Cannot add existing peer");
+      }
+      ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+      ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
+        toByteArray(clusterKey));
+      // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+      // peer-state znode. This happens while adding a peer.
+      // The peer state data is set as "ENABLED" by default.
+      ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
+        ENABLED_ZNODE_BYTES);
+      // A peer is enabled by default
+    } catch (KeeperException e) {
+      throw new IOException("Unable to add peer", e);
+    }
+  }
+
+  @Override
+  public void removePeer(String id) throws IOException {
+    try {
+      if (!peerExists(id)) {
+        throw new IllegalArgumentException("Cannot remove inexisting peer");
+      }
+      ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+    } catch (KeeperException e) {
+      throw new IOException("Unable to remove a peer", e);
+    }
+  }
+
+  @Override
+  public void enablePeer(String id) throws IOException {
+    changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
+    LOG.info("peer " + id + " is enabled");
+  }
+
+  @Override
+  public void disablePeer(String id) throws IOException {
+    changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
+    LOG.info("peer " + id + " is disabled");
+  }
+
+  @Override
+  public boolean getStatusOfConnectedPeer(String id) {
+    if (!this.peerClusters.containsKey(id)) {
+      throw new IllegalArgumentException("peer " + id + " is not connected");
+    }
+    return this.peerClusters.get(id).getPeerEnabled().get();
+  }
+
+  @Override
+  public boolean connectToPeer(String peerId) throws IOException, KeeperException {
+    if (peerClusters == null) {
+      return false;
+    }
+    if (this.peerClusters.containsKey(peerId)) {
+      return false;
+    }
+    ReplicationPeer peer = getPeer(peerId);
+    if (peer == null) {
+      return false;
+    }
+    this.peerClusters.put(peerId, peer);
+    LOG.info("Added new peer cluster " + peer.getClusterKey());
+    return true;
+  }
+
+  @Override
+  public void disconnectFromPeer(String peerId) {
+    ReplicationPeer rp = this.peerClusters.get(peerId);
+    if (rp != null) {
+      rp.getZkw().close();
+      this.peerClusters.remove(peerId);
+    }
+  }
+
+  @Override
+  public Map<String, String> getAllPeerClusterKeys() {
+    Map<String, String> peers = new TreeMap<String, String>();
+    List<String> ids = null;
+    try {
+      ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+      for (String id : ids) {
+        byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+        String clusterKey = null;
+        try {
+          clusterKey = parsePeerFrom(bytes);
+        } catch (DeserializationException de) {
+          LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
+          continue;
+        }
+        peers.put(id, clusterKey);
+      }
+    } catch (KeeperException e) {
+      this.abortable.abort("Cannot get the list of peers ", e);
+    }
+    return peers;
+  }
+
+  @Override
+  public List<ServerName> getRegionServersOfConnectedPeer(String peerId) {
+    if (this.peerClusters.size() == 0) {
+      return Collections.emptyList();
+    }
+    ReplicationPeer peer = this.peerClusters.get(peerId);
+    if (peer == null) {
+      return Collections.emptyList();
+    }
+    List<ServerName> addresses;
+    try {
+      addresses = fetchSlavesAddresses(peer.getZkw());
+    } catch (KeeperException ke) {
+      reconnectPeer(ke, peer);
+      addresses = Collections.emptyList();
+    }
+    peer.setRegionServers(addresses);
+    return peer.getRegionServers();
+  }
+
+  @Override
+  public UUID getPeerUUID(String peerId) {
+    ReplicationPeer peer = this.peerClusters.get(peerId);
+    if (peer == null) {
+      return null;
+    }
+    UUID peerUUID = null;
+    try {
+      peerUUID = ZKClusterId.getUUIDForCluster(peer.getZkw());
+    } catch (KeeperException ke) {
+      reconnectPeer(ke, peer);
+    }
+    return peerUUID;
+  }
+
+  @Override
+  public Set<String> getConnectedPeers() {
+    return this.peerClusters.keySet();
+  }
+
+  @Override
+  public Configuration getPeerConf(String peerId) throws KeeperException {
+    String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
+    byte[] data = ZKUtil.getData(this.zookeeper, znode);
+    if (data == null) {
+      LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
+      return null;
+    }
+    String otherClusterKey = "";
+    try {
+      otherClusterKey = parsePeerFrom(data);
+    } catch (DeserializationException e) {
+      LOG.warn("Failed to parse cluster key from peerId=" + peerId
+          + ", specifically the content from the following znode: " + znode);
+      return null;
+    }
+
+    Configuration otherConf = new Configuration(this.conf);
+    try {
+      ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
+    } catch (IOException e) {
+      LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
+      return null;
+    }
+    return otherConf;
+  }
+
+  /**
+   * List all registered peer clusters and set a watch on their znodes.
+   */
+  @Override
+  public List<String> getAllPeerIds() {
+    List<String> ids = null;
+    try {
+      ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Cannot get the list of peers ", e);
+    }
+    return ids;
+  }
+
+  /**
+   * A private method used during initialization. This method attempts to connect to all registered
+   * peer clusters. This method does not set a watch on the peer cluster znodes.
+   * @throws IOException
+   * @throws KeeperException
+   */
+  private void connectExistingPeers() throws IOException, KeeperException {
+    List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+    if (znodes != null) {
+      for (String z : znodes) {
+        connectToPeer(z);
+      }
+    }
+  }
+
+  /**
+   * A private method used to re-establish a zookeeper session with a peer cluster.
+   * @param ke
+   * @param peer
+   */
+  private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
+    if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException) {
+      LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke);
+      try {
+        peer.reloadZkWatcher();
+      } catch (IOException io) {
+        LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io);
+      }
+    }
+  }
+
+  /**
+   * Get the list of all the region servers from the specified peer
+   * @param zkw zk connection to use
+   * @return list of region server addresses or an empty list if the slave is unavailable
+   */
+  private static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
+      throws KeeperException {
+    List<String> children = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode);
+    if (children == null) {
+      return Collections.emptyList();
+    }
+    List<ServerName> addresses = new ArrayList<ServerName>(children.size());
+    for (String child : children) {
+      addresses.add(ServerName.parseServerName(child));
+    }
+    return addresses;
+  }
+
+  private String getPeerStateNode(String id) {
+    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
+  }
+
+  /**
+   * Update the state znode of a peer cluster.
+   * @param id
+   * @param state
+   * @throws IOException
+   */
+  private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
+      throws IOException {
+    try {
+      if (!peerExists(id)) {
+        throw new IllegalArgumentException("peer " + id + " is not registered");
+      }
+      String peerStateZNode = getPeerStateNode(id);
+      byte[] stateBytes =
+          (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
+              : DISABLED_ZNODE_BYTES;
+      if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
+        ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
+      } else {
+        ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
+      }
+      LOG.info("state of the peer " + id + " changed to " + state.name());
+    } catch (KeeperException e) {
+      throw new IOException("Unable to change state of the peer " + id, e);
+    }
+  }
+
+  /**
+   * Helper method to connect to a peer
+   * @param peerId peer's identifier
+   * @return object representing the peer
+   * @throws IOException
+   * @throws KeeperException
+   */
+  private ReplicationPeer getPeer(String peerId) throws IOException, KeeperException {
+    Configuration peerConf = getPeerConf(peerId);
+    if (peerConf == null) {
+      return null;
+    }
+    if (this.ourClusterKey.equals(ZKUtil.getZooKeeperClusterKey(peerConf))) {
+      LOG.debug("Not connecting to " + peerId + " because it's us");
+      return null;
+    }
+
+    ReplicationPeer peer =
+        new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
+    peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+    return peer;
+  }
+
+  /**
+   * @param bytes Content of a peer znode.
+   * @return ClusterKey parsed from the passed bytes.
+   * @throws DeserializationException
+   */
+  private static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
+    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+      int pblen = ProtobufUtil.lengthOfPBMagic();
+      ZooKeeperProtos.ReplicationPeer.Builder builder =
+          ZooKeeperProtos.ReplicationPeer.newBuilder();
+      ZooKeeperProtos.ReplicationPeer peer;
+      try {
+        peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+      } catch (InvalidProtocolBufferException e) {
+        throw new DeserializationException(e);
+      }
+      return peer.getClusterkey();
+    } else {
+      if (bytes.length > 0) {
+        return Bytes.toString(bytes);
+      }
+      return "";
+    }
+  }
+
+  /**
+   * @param clusterKey
+   * @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix prepended suitable
+   *         for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
+   *         /hbase/replication/peers/PEER_ID
+   */
+  private static byte[] toByteArray(final String clusterKey) {
+    byte[] bytes =
+        ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
+            .toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java?rev=1484031&r1=1484030&r2=1484031&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java Sat May 18 00:00:20 2013
@@ -22,12 +22,14 @@ import java.util.List;
 import java.util.SortedMap;
 import java.util.SortedSet;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 
 /**
  * This provides an interface for maintaining a region server's replication queues. These queues
  * keep track of the HLogs that still need to be replicated to remote clusters.
  */
+@InterfaceAudience.Private
 public interface ReplicationQueues {
 
   /**
@@ -35,7 +37,7 @@ public interface ReplicationQueues {
    * @param serverName The server name of the region server that owns the replication queues this
    *          interface manages.
    */
-  public void init(String serverName);
+  public void init(String serverName) throws KeeperException;
 
   /**
    * Remove a replication queue.

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java?rev=1484031&r1=1484030&r2=1484031&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java Sat May 18 00:00:20 2013
@@ -39,8 +39,27 @@ import org.apache.hadoop.hbase.zookeeper
 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.protobuf.InvalidProtocolBufferException;
-
+/**
+ * This class provides an implementation of the ReplicationQueues interface using Zookeeper. The
+ * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of
+ * all outstanding HLog files on this region server that need to be replicated. The myQueuesZnode is
+ * the regionserver name (a concatenation of the region server’s hostname, client port and start
+ * code). For example: 
+ * 
+ * /hbase/replication/rs/hostname.example.org,6020,1234 
+ * 
+ * Within this znode, the region server maintains a set of HLog replication queues. These queues are
+ * represented by child znodes named using there give queue id. For example:
+ * 
+ * /hbase/replication/rs/hostname.example.org,6020,1234/1
+ * /hbase/replication/rs/hostname.example.org,6020,1234/2
+ *
+ * Each queue has one child znode for every HLog that still needs to be replicated. The value of
+ * these HLog child znodes is the latest position that has been replicated. This position is updated
+ * every time a HLog entry is replicated. For example:
+ * 
+ * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
+ */
 public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
 
   /** Znode containing all replication queues for this region server. */
@@ -50,14 +69,15 @@ public class ReplicationQueuesZKImpl ext
 
   private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
 
-  public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable)
-      throws KeeperException {
+  public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, 
+      Abortable abortable) {
     super(zk, conf, abortable);
   }
 
   @Override
-  public void init(String serverName) {
+  public void init(String serverName) throws KeeperException {
     this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
+    ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
   }
 
   @Override
@@ -94,7 +114,7 @@ public class ReplicationQueuesZKImpl ext
       String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
       znode = ZKUtil.joinZNode(znode, filename);
       // Why serialize String of Long and not Long as bytes?
-      ZKUtil.setData(this.zookeeper, znode, toByteArray(position));
+      ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
     } catch (KeeperException e) {
       this.abortable.abort("Failed to write replication hlog position (filename=" + filename
           + ", position=" + position + ")", e);
@@ -107,7 +127,7 @@ public class ReplicationQueuesZKImpl ext
     String znode = ZKUtil.joinZNode(clusterZnode, filename);
     byte[] bytes = ZKUtil.getData(this.zookeeper, znode);
     try {
-      return parseHLogPositionFrom(bytes);
+      return ZKUtil.parseHLogPositionFrom(bytes);
     } catch (DeserializationException de) {
       LOG.warn("Failed to parse HLogPosition for queueId=" + queueId + " and hlog=" + filename
           + "znode content, continuing.");
@@ -351,7 +371,7 @@ public class ReplicationQueuesZKImpl ext
           byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
           long position = 0;
           try {
-            position = parseHLogPositionFrom(positionBytes);
+            position = ZKUtil.parseHLogPositionFrom(positionBytes);
           } catch (DeserializationException e) {
             LOG.warn("Failed parse of hlog position from the following znode: " + z
                 + ", Exception: " + e);
@@ -380,44 +400,4 @@ public class ReplicationQueuesZKImpl ext
         ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
     return ProtobufUtil.prependPBMagic(bytes);
   }
-
-  /**
-   * @param position
-   * @return Serialized protobuf of <code>position</code> with pb magic prefix prepended suitable
-   *         for use as content of an hlog position in a replication queue.
-   */
-  static byte[] toByteArray(final long position) {
-    byte[] bytes =
-        ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position).build()
-            .toByteArray();
-    return ProtobufUtil.prependPBMagic(bytes);
-  }
-
-  /**
-   * @param bytes - Content of a HLog position znode.
-   * @return long - The current HLog position.
-   * @throws DeserializationException
-   */
-  private long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException {
-    if(bytes == null) {
-      throw new DeserializationException("Unable to parse null HLog position.");
-    }
-    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
-      int pblen = ProtobufUtil.lengthOfPBMagic();
-      ZooKeeperProtos.ReplicationHLogPosition.Builder builder =
-          ZooKeeperProtos.ReplicationHLogPosition.newBuilder();
-      ZooKeeperProtos.ReplicationHLogPosition position;
-      try {
-        position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
-      } catch (InvalidProtocolBufferException e) {
-        throw new DeserializationException(e);
-      }
-      return position.getPosition();
-    } else {
-      if (bytes.length > 0) {
-        return Bytes.toLong(bytes);
-      }
-      return 0;
-    }
-  }
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java?rev=1484031&r1=1484030&r2=1484031&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java Sat May 18 00:00:20 2013
@@ -54,8 +54,6 @@ public class ReplicationStateImpl extend
     // Set a tracker on replicationStateNode
     this.stateTracker =
         new ReplicationStateTracker(this.zookeeper, this.stateZNode, this.abortable);
-    stateTracker.start();
-    readReplicationStateZnode();
   }
 
   public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
@@ -64,6 +62,13 @@ public class ReplicationStateImpl extend
   }
 
   @Override
+  public void init() throws KeeperException {
+    ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
+    stateTracker.start();
+    readReplicationStateZnode();
+  }
+
+  @Override
   public boolean getState() throws KeeperException {
     return getReplication();
   }
@@ -115,8 +120,7 @@ public class ReplicationStateImpl extend
    */
   private void setReplicating(boolean newState) throws KeeperException {
     ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
-    byte[] stateBytes = (newState == true) ? ReplicationZookeeper.ENABLED_ZNODE_BYTES
-        : ReplicationZookeeper.DISABLED_ZNODE_BYTES;
+    byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
     ZKUtil.setData(this.zookeeper, this.stateZNode, stateBytes);
   }
 

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java?rev=1484031&r1=1484030&r2=1484031&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java Sat May 18 00:00:20 2013
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 
 import java.io.Closeable;
@@ -27,19 +28,23 @@ import java.io.Closeable;
  * cluster. This state is used to indicate whether replication is enabled or
  * disabled on a cluster.
  */
+@InterfaceAudience.Private
 public interface ReplicationStateInterface extends Closeable {
-	
+
+  /**
+   * Initialize the replication state interface.
+   */
+  public void init() throws KeeperException;
+
   /**
    * Get the current state of replication (i.e. ENABLED or DISABLED).
-   * 
    * @return true if replication is enabled, false otherwise
    * @throws KeeperException
    */
   public boolean getState() throws KeeperException;
-	
+
   /**
    * Set the state of replication.
-   * 
    * @param newState
    * @throws KeeperException
    */

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java?rev=1484031&r1=1484030&r2=1484031&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java Sat May 18 00:00:20 2013
@@ -22,6 +22,8 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
@@ -51,7 +53,13 @@ public abstract class ReplicationStateZK
   protected final Configuration conf;
   protected final Abortable abortable;
 
-  public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf, 
+  // Public for testing
+  public static final byte[] ENABLED_ZNODE_BYTES =
+      toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
+  protected static final byte[] DISABLED_ZNODE_BYTES =
+      toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
+
+  public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf,
       Abortable abortable) {
     this.zookeeper = zookeeper;
     this.conf = conf;
@@ -79,8 +87,20 @@ public abstract class ReplicationStateZK
     return result;
   }
 
+  /**
+   * @param state
+   * @return Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for
+   *         use as content of either the cluster state znode -- whether or not we should be
+   *         replicating kept in /hbase/replication/state -- or as content of a peer-state znode
+   *         under a peer cluster id as in /hbase/replication/peers/PEER_ID/peer-state.
+   */
+  protected static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
+    byte[] bytes =
+        ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build().toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+
   public boolean peerExists(String id) throws KeeperException {
-    return ZKUtil.checkExists(this.zookeeper,
-        ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
+    return ZKUtil.checkExists(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
   }
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1484031&r1=1484030&r2=1484031&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Sat May 18 00:00:20 2013
@@ -18,7 +18,6 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -26,29 +25,18 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -83,63 +71,37 @@ import java.util.concurrent.atomic.Atomi
  */
 @InterfaceAudience.Private
 public class ReplicationZookeeper extends ReplicationStateZKBase implements Closeable {
-  private static final Log LOG =
-    LogFactory.getLog(ReplicationZookeeper.class);
+  private static final Log LOG = LogFactory.getLog(ReplicationZookeeper.class);
 
   // Our handle on zookeeper
   private final ZooKeeperWatcher zookeeper;
-  // Map of peer clusters keyed by their id
-  private Map<String, ReplicationPeer> peerClusters;
-  // Path to the root replication znode
-  private String replicationZNode;
-  // Path to the peer clusters znode
   private String peersZNode;
-  // Path to the znode that contains all RS that replicates
-  private String rsZNode;
-  // Path to this region server's name under rsZNode
-  private String rsServerNameZnode;
-  // Name node if the replicationState znode
-  private String replicationStateNodeName;
-  // Name of zk node which stores peer state. The peer-state znode is under a
-  // peers' id node; e.g. /hbase/replication/peers/PEER_ID/peer-state
-  private String peerStateNodeName;
   private final Configuration conf;
-  // The key to our own cluster
-  private String ourClusterKey;
   // Abortable
   private Abortable abortable;
   private final ReplicationStateInterface replicationState;
+  private final ReplicationPeers replicationPeers;
   private final ReplicationQueues replicationQueues;
 
   /**
-   * ZNode content if enabled state.
-   */
-  // Public so it can be seen by test code.
-  public static final byte[] ENABLED_ZNODE_BYTES =
-      toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
-
-  /**
-   * ZNode content if disabled state.
-   */
-  static final byte[] DISABLED_ZNODE_BYTES =
-      toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
-
-  /**
    * Constructor used by clients of replication (like master and HBase clients)
    * @param conf  conf to use
    * @param zk    zk connection to use
    * @throws IOException
    */
   public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
-      final ZooKeeperWatcher zk) throws KeeperException {
+      final ZooKeeperWatcher zk) throws KeeperException, IOException {
     super(zk, conf, abortable);
     this.conf = conf;
     this.zookeeper = zk;
     setZNodes(abortable);
     this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable);
+    this.replicationState.init();
     // TODO This interface is no longer used by anyone using this constructor. When this class goes
     // away, we will no longer have this null initialization business
     this.replicationQueues = null;
+    this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, abortable);
+    this.replicationPeers.init();
   }
 
   /**
@@ -157,39 +119,19 @@ public class ReplicationZookeeper extend
     this.zookeeper = server.getZooKeeper();
     this.conf = server.getConfiguration();
     setZNodes(server);
-
     this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating);
-    this.peerClusters = new HashMap<String, ReplicationPeer>();
-    ZKUtil.createWithParents(this.zookeeper,
-        ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
-    this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString());
-    ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
+    this.replicationState.init();
     this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server);
     this.replicationQueues.init(server.getServerName().toString());
-    connectExistingPeers();
+    this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, server);
+    this.replicationPeers.init();
   }
 
   private void setZNodes(Abortable abortable) throws KeeperException {
     String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
     String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
-    this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
-    this.replicationStateNodeName = conf.get("zookeeper.znode.replication.state", "state");
-    String rsZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
-    this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
-    this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
+    String replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
     this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
-    ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
-    this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
-    ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
-  }
-
-  private void connectExistingPeers() throws IOException, KeeperException {
-    List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
-    if (znodes != null) {
-      for (String z : znodes) {
-        connectToPeer(z);
-      }
-    }
   }
 
   /**
@@ -197,39 +139,15 @@ public class ReplicationZookeeper extend
    * @return list of all peers' identifiers
    */
   public List<String> listPeersIdsAndWatch() {
-    List<String> ids = null;
-    try {
-      ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Cannot get the list of peers ", e);
-    }
-    return ids;
+    return this.replicationPeers.getAllPeerIds();
   }
 
   /**
    * Map of this cluster's peers for display.
    * @return A map of peer ids to peer cluster keys
    */
-  public Map<String,String> listPeers() {
-    Map<String,String> peers = new TreeMap<String,String>();
-    List<String> ids = null;
-    try {
-      ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
-      for (String id : ids) {
-        byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
-        String clusterKey = null;
-        try {
-          clusterKey = parsePeerFrom(bytes);
-        } catch (DeserializationException de) {
-          LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
-          continue;
-        }
-        peers.put(id, clusterKey);
-      }
-    } catch (KeeperException e) {
-      this.abortable.abort("Cannot get the list of peers ", e);
-    }
-    return peers;
+  public Map<String, String> listPeers() {
+    return this.replicationPeers.getAllPeerClusterKeys();
   }
 
   /**
@@ -239,126 +157,17 @@ public class ReplicationZookeeper extend
    * @return addresses of all region servers
    */
   public List<ServerName> getSlavesAddresses(String peerClusterId) {
-    if (this.peerClusters.size() == 0) {
-      return Collections.emptyList();
-    }
-    ReplicationPeer peer = this.peerClusters.get(peerClusterId);
-    if (peer == null) {
-      return Collections.emptyList();
-    }
-    
-    List<ServerName> addresses;
-    try {
-      addresses = fetchSlavesAddresses(peer.getZkw());
-    } catch (KeeperException ke) {
-      reconnectPeer(ke, peer);
-      addresses = Collections.emptyList();
-    }
-    peer.setRegionServers(addresses);
-    return peer.getRegionServers();
-  }
-
-  /**
-   * Get the list of all the region servers from the specified peer
-   * @param zkw zk connection to use
-   * @return list of region server addresses or an empty list if the slave
-   * is unavailable
-   */
-  private List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
-    throws KeeperException {
-    return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode);
-  }
-
-  /**
-   * Lists the children of the specified znode, retrieving the data of each
-   * child as a server address.
-   *
-   * Used to list the currently online regionservers and their addresses.
-   *
-   * Sets no watches at all, this method is best effort.
-   *
-   * Returns an empty list if the node has no children.  Returns null if the
-   * parent node itself does not exist.
-   *
-   * @param zkw zookeeper reference
-   * @param znode node to get children of as addresses
-   * @return list of data of children of specified znode, empty if no children,
-   *         null if parent does not exist
-   * @throws KeeperException if unexpected zookeeper exception
-   */
-  public static List<ServerName> listChildrenAndGetAsServerNames(
-      ZooKeeperWatcher zkw, String znode)
-  throws KeeperException {
-    List<String> children = ZKUtil.listChildrenNoWatch(zkw, znode);
-    if(children == null) {
-      return Collections.emptyList();
-    }
-    List<ServerName> addresses = new ArrayList<ServerName>(children.size());
-    for (String child : children) {
-      addresses.add(ServerName.parseServerName(child));
-    }
-    return addresses;
+    return this.replicationPeers.getRegionServersOfConnectedPeer(peerClusterId);
   }
 
   /**
    * This method connects this cluster to another one and registers it
    * in this region server's replication znode
    * @param peerId id of the peer cluster
-   * @throws KeeperException 
-   */
-  public boolean connectToPeer(String peerId)
-      throws IOException, KeeperException {
-    if (peerClusters == null) {
-      return false;
-    }
-    if (this.peerClusters.containsKey(peerId)) {
-      return false;
-    }
-    ReplicationPeer peer = getPeer(peerId);
-    if (peer == null) {
-      return false;
-    }
-    this.peerClusters.put(peerId, peer);
-    ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
-        this.rsServerNameZnode, peerId));
-    LOG.info("Added new peer cluster " + peer.getClusterKey());
-    return true;
-  }
-
-  /**
-   * Helper method to connect to a peer
-   * @param peerId peer's identifier
-   * @return object representing the peer
-   * @throws IOException
    * @throws KeeperException
    */
-  public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
-    String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
-    byte [] data = ZKUtil.getData(this.zookeeper, znode);
-    String otherClusterKey = "";
-    try {
-      otherClusterKey = parsePeerFrom(data);
-    } catch (DeserializationException e) {
-      LOG.warn("Failed parse of cluster key from peerId=" + peerId
-          + ", specifically the content from the following znode: " + znode);
-    }
-    if (this.ourClusterKey.equals(otherClusterKey)) {
-      LOG.debug("Not connecting to " + peerId + " because it's us");
-      return null;
-    }
-    // Construct the connection to the new peer
-    Configuration otherConf = new Configuration(this.conf);
-    try {
-      ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
-    } catch (IOException e) {
-      LOG.error("Can't get peer because:", e);
-      return null;
-    }
-
-    ReplicationPeer peer = new ReplicationPeer(otherConf, peerId,
-        otherClusterKey);
-    peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
-    return peer;
+  public boolean connectToPeer(String peerId) throws IOException, KeeperException {
+    return this.replicationPeers.connectToPeer(peerId);
   }
 
   /**
@@ -368,15 +177,7 @@ public class ReplicationZookeeper extend
    * @throws IllegalArgumentException Thrown when the peer doesn't exist
    */
   public void removePeer(String id) throws IOException {
-    try {
-      if (!peerExists(id)) {
-        throw new IllegalArgumentException("Cannot remove inexisting peer");
-      }
-      ZKUtil.deleteNodeRecursively(this.zookeeper,
-          ZKUtil.joinZNode(this.peersZNode, id));
-    } catch (KeeperException e) {
-      throw new IOException("Unable to remove a peer", e);
-    }
+    this.replicationPeers.removePeer(id);
   }
 
   /**
@@ -388,154 +189,7 @@ public class ReplicationZookeeper extend
    *         multi-slave isn't supported yet.
    */
   public void addPeer(String id, String clusterKey) throws IOException {
-    try {
-      if (peerExists(id)) {
-        throw new IllegalArgumentException("Cannot add existing peer");
-      }
-      ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
-      ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
-        toByteArray(clusterKey));
-      // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
-      // peer-state znode. This happens while adding a peer.
-      // The peer state data is set as "ENABLED" by default.
-      ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
-        ENABLED_ZNODE_BYTES);
-      // A peer is enabled by default
-    } catch (KeeperException e) {
-      throw new IOException("Unable to add peer", e);
-    }
-  }
-
-  /**
-   * @param clusterKey
-   * @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix
-   *         prepended suitable for use as content of a this.peersZNode; i.e.
-   *         the content of PEER_ID znode under /hbase/replication/peers/PEER_ID
-   */
-  static byte[] toByteArray(final String clusterKey) {
-    byte[] bytes = ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
-        .toByteArray();
-    return ProtobufUtil.prependPBMagic(bytes);
-  }
-
-  /**
-   * @param state
-   * @return Serialized protobuf of <code>state</code> with pb magic prefix
-   *         prepended suitable for use as content of either the cluster state
-   *         znode -- whether or not we should be replicating kept in
-   *         /hbase/replication/state -- or as content of a peer-state znode
-   *         under a peer cluster id as in
-   *         /hbase/replication/peers/PEER_ID/peer-state.
-   */
-  static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
-    byte[] bytes = ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build()
-        .toByteArray();
-    return ProtobufUtil.prependPBMagic(bytes);
-  }
-
-  /**
-   * @param position
-   * @return Serialized protobuf of <code>position</code> with pb magic prefix
-   *         prepended suitable for use as content of an hlog position in a
-   *         replication queue.
-   */
-  public static byte[] positionToByteArray(
-      final long position) {
-    return ZKUtil.positionToByteArray(position);
-  }
-
-  /**
-   * @param lockOwner
-   * @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix
-   *         prepended suitable for use as content of an replication lock during
-   *         region server fail over.
-   */
-  static byte[] lockToByteArray(
-      final String lockOwner) {
-    byte[] bytes = ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build()
-        .toByteArray();
-    return ProtobufUtil.prependPBMagic(bytes);
-  }
-
-  /**
-   * @param bytes Content of a peer znode.
-   * @return ClusterKey parsed from the passed bytes.
-   * @throws DeserializationException
-   */
-  static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
-    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
-      int pblen = ProtobufUtil.lengthOfPBMagic();
-      ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer
-          .newBuilder();
-      ZooKeeperProtos.ReplicationPeer peer;
-      try {
-        peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
-      } catch (InvalidProtocolBufferException e) {
-        throw new DeserializationException(e);
-      }
-      return peer.getClusterkey();
-    } else {
-      if (bytes.length > 0) {
-        return Bytes.toString(bytes);
-      }
-      return "";
-    }
-  }
-
-  /**
-   * @param bytes Content of a state znode.
-   * @return State parsed from the passed bytes.
-   * @throws DeserializationException
-   */
-  static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
-      throws DeserializationException {
-    ProtobufUtil.expectPBMagicPrefix(bytes);
-    int pblen = ProtobufUtil.lengthOfPBMagic();
-    ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
-        .newBuilder();
-    ZooKeeperProtos.ReplicationState state;
-    try {
-      state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
-      return state.getState();
-    } catch (InvalidProtocolBufferException e) {
-      throw new DeserializationException(e);
-    }
-  }
-
-  /**
-   * @param bytes - Content of a HLog position znode.
-   * @return long - The current HLog position.
-   * @throws DeserializationException
-   */
-  public static long parseHLogPositionFrom(
-      final byte[] bytes) throws DeserializationException {
-    return ZKUtil.parseHLogPositionFrom(bytes);
-  }
-
-  /**
-   * @param bytes - Content of a lock znode.
-   * @return String - The owner of the lock.
-   * @throws DeserializationException
-   */
-  static String parseLockOwnerFrom(
-      final byte[] bytes) throws DeserializationException {
-    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
-      int pblen = ProtobufUtil.lengthOfPBMagic();
-      ZooKeeperProtos.ReplicationLock.Builder builder = ZooKeeperProtos.ReplicationLock
-          .newBuilder();
-      ZooKeeperProtos.ReplicationLock lock;
-      try {
-        lock = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
-      } catch (InvalidProtocolBufferException e) {
-        throw new DeserializationException(e);
-      }
-      return lock.getLockOwner();
-    } else {
-      if (bytes.length > 0) {
-        return Bytes.toString(bytes);
-      }
-      return "";
-    }
+    this.replicationPeers.addPeer(id, clusterKey);
   }
 
   /**
@@ -546,8 +200,7 @@ public class ReplicationZookeeper extend
    *           Thrown when the peer doesn't exist
    */
   public void enablePeer(String id) throws IOException {
-    changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
-    LOG.info("peer " + id + " is enabled");
+    this.replicationPeers.enablePeer(id);
   }
 
   /**
@@ -558,28 +211,7 @@ public class ReplicationZookeeper extend
    *           Thrown when the peer doesn't exist
    */
   public void disablePeer(String id) throws IOException {
-    changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
-    LOG.info("peer " + id + " is disabled");
-  }
-
-  private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
-      throws IOException {
-    try {
-      if (!peerExists(id)) {
-        throw new IllegalArgumentException("peer " + id + " is not registered");
-      }
-      String peerStateZNode = getPeerStateNode(id);
-      byte[] stateBytes = (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
-          : DISABLED_ZNODE_BYTES;
-      if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
-        ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
-      } else {
-        ZKUtil.createAndWatch(zookeeper, peerStateZNode, stateBytes);
-      }
-      LOG.info("state of the peer " + id + " changed to " + state.name());
-    } catch (KeeperException e) {
-      throw new IOException("Unable to change state of the peer " + id, e);
-    }
+    this.replicationPeers.disablePeer(id);
   }
 
   /**
@@ -592,14 +224,7 @@ public class ReplicationZookeeper extend
    *           Thrown when the peer doesn't exist
    */
   public boolean getPeerEnabled(String id) {
-    if (!this.peerClusters.containsKey(id)) {
-      throw new IllegalArgumentException("peer " + id + " is not registered");
-    }
-    return this.peerClusters.get(id).getPeerEnabled().get();
-  }
-
-  private String getPeerStateNode(String id) {
-    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
+    return this.replicationPeers.getStatusOfConnectedPeer(id);
   }
 
   /**
@@ -683,8 +308,7 @@ public class ReplicationZookeeper extend
   public void deleteSource(String peerZnode, boolean closeConnection) {
     this.replicationQueues.removeQueue(peerZnode);
     if (closeConnection) {
-      this.peerClusters.get(peerZnode).getZkw().close();
-      this.peerClusters.remove(peerZnode);
+      this.replicationPeers.disconnectFromPeer(peerZnode);
     }
   }
 
@@ -714,40 +338,7 @@ public class ReplicationZookeeper extend
    * @return a UUID or null if there's a ZK connection issue
    */
   public UUID getPeerUUID(String peerId) {
-    ReplicationPeer peer = getPeerClusters().get(peerId);
-    UUID peerUUID = null;
-    try {
-      peerUUID = getUUIDForCluster(peer.getZkw());
-    } catch (KeeperException ke) {
-      reconnectPeer(ke, peer);
-    }
-    return peerUUID;
-  }
-
-  /**
-   * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
-   * @param zkw watcher connected to an ensemble
-   * @return the UUID read from zookeeper
-   * @throws KeeperException
-   */
-  public UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
-    return UUID.fromString(ZKClusterId.readClusterIdZNode(zkw));
-  }
-
-  private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
-    if (ke instanceof ConnectionLossException
-      || ke instanceof SessionExpiredException) {
-      LOG.warn(
-        "Lost the ZooKeeper connection for peer " + peer.getClusterKey(),
-        ke);
-      try {
-        peer.reloadZkWatcher();
-      } catch(IOException io) {
-        LOG.warn(
-          "Creation of ZookeeperWatcher failed for peer "
-            + peer.getClusterKey(), io);
-      }
-    }
+    return this.replicationPeers.getPeerUUID(peerId);
   }
 
   public void registerRegionServerListener(ZooKeeperListener listener) {
@@ -758,8 +349,8 @@ public class ReplicationZookeeper extend
    * Get a map of all peer clusters
    * @return map of peer cluster keyed by id
    */
-  public Map<String, ReplicationPeer> getPeerClusters() {
-    return this.peerClusters;
+  public Set<String> getPeerClusters() {
+    return this.replicationPeers.getConnectedPeers();
   }
 
   /**
@@ -802,36 +393,4 @@ public class ReplicationZookeeper extend
   public void close() throws IOException {
     if (replicationState != null) replicationState.close();
   }
-
-  /**
-   * Utility method to ensure an ENABLED znode is in place; if not present, we
-   * create it.
-   * @param zookeeper
-   * @param path Path to znode to check
-   * @return True if we created the znode.
-   * @throws NodeExistsException
-   * @throws KeeperException
-   */
-  static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
-      throws NodeExistsException, KeeperException {
-    if (ZKUtil.checkExists(zookeeper, path) == -1) {
-      // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
-      // peer-state znode. This happens while adding a peer.
-      // The peer state data is set as "ENABLED" by default.
-      ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path, ENABLED_ZNODE_BYTES);
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * @param bytes
-   * @return True if the passed in <code>bytes</code> are those of a pb
-   *         serialized ENABLED state.
-   * @throws DeserializationException
-   */
-  static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
-    ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
-    return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
-  }
 }

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java?rev=1484031&r1=1484030&r2=1484031&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java Sat May 18 00:00:20 2013
@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.hbase.zookeeper;
 
+import java.util.UUID;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.ClusterId;
@@ -77,4 +79,14 @@ public class ZKClusterId {
       throws KeeperException {
     ZKUtil.createSetData(watcher, watcher.clusterIdZNode, id.toByteArray());
   }
+
+  /**
+   * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
+   * @param zkw watcher connected to an ensemble
+   * @return the UUID read from zookeeper
+   * @throws KeeperException
+   */
+  public static UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
+    return UUID.fromString(readClusterIdZNode(zkw));
+  }
 }

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1484031&r1=1484030&r2=1484031&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Sat May 18 00:00:20 2013
@@ -1836,9 +1836,12 @@ public class ZKUtil {
    * @throws DeserializationException
    */
   public static long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException {
+    if (bytes == null) {
+      throw new DeserializationException("Unable to parse null HLog position.");
+    }
     if (ProtobufUtil.isPBMagicPrefix(bytes)) {
       int pblen = ProtobufUtil.lengthOfPBMagic();
-      ZooKeeperProtos.ReplicationHLogPosition.Builder builder = 
+      ZooKeeperProtos.ReplicationHLogPosition.Builder builder =
           ZooKeeperProtos.ReplicationHLogPosition.newBuilder();
       ZooKeeperProtos.ReplicationHLogPosition position;
       try {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1484031&r1=1484030&r2=1484031&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Sat May 18 00:00:20 2013
@@ -45,8 +45,10 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -176,17 +178,10 @@ public class Import {
       cfRenameMap = createCfRenameMap(conf);
       filter = instantiateFilter(conf);
       // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
-      ReplicationZookeeper zkHelper = null;
       ZooKeeperWatcher zkw = null;
       try {
-        HConnection connection = HConnectionManager.getConnection(conf);
         zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
-        zkHelper = new ReplicationZookeeper(connection, conf, zkw);
-        try {
-          this.clusterId = zkHelper.getUUIDForCluster(zkw);
-        } finally {
-          if (zkHelper != null) zkHelper.close();
-        }
+        clusterId = ZKClusterId.getUUIDForCluster(zkw);
       } catch (ZooKeeperConnectionException e) {
         LOG.error("Problem connecting to ZooKeper during task setup", e);
       } catch (KeeperException e) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java?rev=1484031&r1=1484030&r2=1484031&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java Sat May 18 00:00:20 2013
@@ -37,6 +37,8 @@ import org.apache.hadoop.hbase.mapreduce
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -117,11 +119,13 @@ public class VerifyReplication {
                 @Override public void abort(String why, Throwable e) {}
                 @Override public boolean isAborted() {return false;}
               });
-              zk = new ReplicationZookeeper(conn, conf, localZKW);
-              // Just verifying it we can connect
-              peer = zk.getPeer(peerId);
-              HTable replicatedTable = new HTable(peer.getConfiguration(),
-                  conf.get(NAME+".tableName"));
+              ReplicationPeers rp = new ReplicationPeersZKImpl(localZKW, conf, localZKW);
+              rp.init();
+              Configuration peerConf = rp.getPeerConf(peerId);
+              if (peerConf == null) {
+                throw new IOException("Couldn't get peer conf!");
+              }
+              HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
               scan.setStartRow(value.getRow());
               replicatedScanner = replicatedTable.getScanner(scan);
             } catch (KeeperException e) {
@@ -175,42 +179,6 @@ public class VerifyReplication {
     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
       throw new IOException("Replication needs to be enabled to verify it.");
     }
-    HConnectionManager.execute(new HConnectable<Void>(conf) {
-      @Override
-      public Void connect(HConnection conn) throws IOException {
-        ZooKeeperWatcher localZKW = null;
-        ReplicationZookeeper zk = null;
-        ReplicationPeer peer = null;
-        try {
-          localZKW = new ZooKeeperWatcher(
-            conf, "VerifyReplication", new Abortable() {
-            @Override public void abort(String why, Throwable e) {}
-            @Override public boolean isAborted() {return false;}
-          });
-          zk = new ReplicationZookeeper(conn, conf, localZKW);
-          // Just verifying it we can connect
-          peer = zk.getPeer(peerId);
-          if (peer == null) {
-            throw new IOException("Couldn't get access to the slave cluster," +
-                "please see the log");
-          }
-        } catch (KeeperException ex) {
-          throw new IOException("Couldn't get access to the slave cluster" +
-              " because: ", ex);
-        } finally {
-          if (peer != null){
-             peer.close();
-          }
-          if (zk != null){
-            zk.close();
-          }
-          if (localZKW != null){
-            localZKW.close();
-          }
-        }
-        return null;
-      }
-    });
     conf.set(NAME+".peerId", peerId);
     conf.set(NAME+".tableName", tableName);
     conf.setLong(NAME+".startTime", startTime);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1484031&r1=1484030&r2=1484031&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Sat May 18 00:00:20 2013
@@ -136,6 +136,7 @@ public class ReplicationLogCleaner exten
       this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
       this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this);
       this.replicationState = new ReplicationStateImpl(zkw, conf, this);
+      this.replicationState.init();
     } catch (KeeperException e) {
       LOG.error("Error while configuring " + this.getClass().getName(), e);
     } catch (IOException e) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1484031&r1=1484030&r2=1484031&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Sat May 18 00:00:20 2013
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.replicati
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
@@ -189,7 +190,7 @@ public class ReplicationSource extends T
     this.metrics = new MetricsSource(peerClusterZnode);
     this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
     try {
-      this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());
+      this.clusterId = ZKClusterId.getUUIDForCluster(zkHelper.getZookeeperWatcher());
     } catch (KeeperException ke) {
       throw new IOException("Could not read cluster id", ke);
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1484031&r1=1484030&r2=1484031&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Sat May 18 00:00:20 2013
@@ -177,7 +177,7 @@ public class ReplicationSourceManager {
    * old region server hlog queues
    */
   public void init() throws IOException {
-    for (String id : this.zkHelper.getPeerClusters().keySet()) {
+    for (String id : this.zkHelper.getPeerClusters()) {
       addSource(id);
     }
     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
@@ -601,7 +601,7 @@ public class ReplicationSourceManager {
         try {
           ReplicationSourceInterface src = getReplicationSource(conf,
               fs, ReplicationSourceManager.this, stopper, replicating, peerId);
-          if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
+          if (!zkHelper.getPeerClusters().contains((src.getPeerClusterId()))) {
             src.terminate("Recovered queue doesn't belong to any current peer");
             break;
           }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java?rev=1484031&r1=1484030&r2=1484031&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java Sat May 18 00:00:20 2013
@@ -20,12 +20,17 @@ package org.apache.hadoop.hbase.replicat
 
 import static org.junit.Assert.*;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.SortedSet;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.zookeeper.KeeperException;
+import org.junit.Before;
 import org.junit.Test;
 
 /**
@@ -41,6 +46,26 @@ public abstract class TestReplicationSta
   protected String server1 = new ServerName("hostname1.example.org", 1234, -1L).toString();
   protected String server2 = new ServerName("hostname2.example.org", 1234, -1L).toString();
   protected String server3 = new ServerName("hostname3.example.org", 1234, -1L).toString();
+  protected ReplicationPeers rp;
+  protected static final String ID_ONE = "1";
+  protected static final String ID_TWO = "2";
+  protected static String KEY_ONE;
+  protected static String KEY_TWO;
+
+  // For testing when we try to replicate to ourself
+  protected String OUR_ID = "3";
+  protected String OUR_KEY;
+
+  protected static int zkTimeoutCount;
+  protected static final int ZK_MAX_COUNT = 300;
+  protected static final int ZK_SLEEP_INTERVAL = 100; // millis
+
+  private static final Log LOG = LogFactory.getLog(TestReplicationStateBasic.class);
+
+  @Before
+  public void setUp() {
+    zkTimeoutCount = 0;
+  }
 
   @Test
   public void testReplicationQueuesClient() throws KeeperException {
@@ -83,13 +108,15 @@ public abstract class TestReplicationSta
   }
 
   @Test
-  public void testReplicationQueues() throws KeeperException {
+  public void testReplicationQueues() throws KeeperException, IOException {
     rq1.init(server1);
     rq2.init(server2);
     rq3.init(server3);
+    //Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
+    rp.init();
 
-    // Zero queues or replicators exist
-    assertEquals(0, rq1.getListOfReplicators().size());
+    // 3 replicators should exist
+    assertEquals(3, rq1.getListOfReplicators().size());
     rq1.removeQueue("bogus");
     rq1.removeLog("bogus", "bogus");
     rq1.removeAllQueues();
@@ -132,11 +159,102 @@ public abstract class TestReplicationSta
     assertEquals(0, rq2.getListOfReplicators().size());
   }
 
+  @Test
+  public void testReplicationPeers() throws Exception {
+    rp.init();
+
+    // Test methods with non-existent peer ids
+    try {
+      rp.removePeer("bogus");
+      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
+    } catch (IllegalArgumentException e) {
+    }
+    try {
+      rp.enablePeer("bogus");
+      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
+    } catch (IllegalArgumentException e) {
+    }
+    try {
+      rp.disablePeer("bogus");
+      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
+    } catch (IllegalArgumentException e) {
+    }
+    try {
+      rp.getStatusOfConnectedPeer("bogus");
+      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
+    } catch (IllegalArgumentException e) {
+    }
+    assertFalse(rp.connectToPeer("bogus"));
+    rp.disconnectFromPeer("bogus");
+    assertEquals(0, rp.getRegionServersOfConnectedPeer("bogus").size());
+    assertNull(rp.getPeerUUID("bogus"));
+    assertNull(rp.getPeerConf("bogus"));
+    assertNumberOfPeers(0, 0);
+
+    // Add some peers
+    rp.addPeer(ID_ONE, KEY_ONE);
+    assertNumberOfPeers(0, 1);
+    rp.addPeer(ID_TWO, KEY_TWO);
+    assertNumberOfPeers(0, 2);
+
+    // Test methods with a peer that is added but not connected
+    try {
+      rp.getStatusOfConnectedPeer(ID_ONE);
+      fail("There are no connected peers, should have thrown an IllegalArgumentException");
+    } catch (IllegalArgumentException e) {
+    }
+    assertNull(rp.getPeerUUID(ID_ONE));
+    assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE)));
+    rp.disconnectFromPeer(ID_ONE);
+    assertEquals(0, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
+
+    // Connect to one peer
+    rp.connectToPeer(ID_ONE);
+    assertNumberOfPeers(1, 2);
+    assertTrue(rp.getStatusOfConnectedPeer(ID_ONE));
+    rp.disablePeer(ID_ONE);
+    assertConnectedPeerStatus(false, ID_ONE);
+    rp.enablePeer(ID_ONE);
+    assertConnectedPeerStatus(true, ID_ONE);
+    assertEquals(1, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
+    assertNotNull(rp.getPeerUUID(ID_ONE).toString());
+
+    // Disconnect peer
+    rp.disconnectFromPeer(ID_ONE);
+    assertNumberOfPeers(0, 2);
+    try {
+      rp.getStatusOfConnectedPeer(ID_ONE);
+      fail("There are no connected peers, should have thrown an IllegalArgumentException");
+    } catch (IllegalArgumentException e) {
+    }
+  }
+
+  protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
+    while (true) {
+      if (status == rp.getStatusOfConnectedPeer(peerId)) {
+        return;
+      }
+      if (zkTimeoutCount < ZK_MAX_COUNT) {
+        LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
+            + ", sleeping and trying again.");
+        Thread.sleep(ZK_SLEEP_INTERVAL);
+      } else {
+        fail("Timed out waiting for ConnectedPeerStatus to be " + status);
+      }
+    }
+  }
+
+  protected void assertNumberOfPeers(int connected, int total) {
+    assertEquals(total, rp.getAllPeerClusterKeys().size());
+    assertEquals(connected, rp.getConnectedPeers().size());
+    assertEquals(total, rp.getAllPeerIds().size());
+  }
+
   /*
    * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
    * 3, 4, 5 log files respectively
    */
-  protected void populateQueues() throws KeeperException {
+  protected void populateQueues() throws KeeperException, IOException {
     rq1.addLog("trash", "trash");
     rq1.removeQueue("trash");
 
@@ -147,6 +265,8 @@ public abstract class TestReplicationSta
       for (int j = 0; j < i; j++) {
         rq3.addLog("qId" + i, "filename" + j);
       }
+      //Add peers for the corresponding queues so they are not orphans
+      rp.addPeer("qId" + i, "bogus" + i);
     }
   }
 }