You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/28 02:31:19 UTC

svn commit: r990310 - in /hbase/branches/0.90_master_rewrite/src: main/java/org/apache/hadoop/hbase/replication/ main/java/org/apache/hadoop/hbase/replication/master/ main/java/org/apache/hadoop/hbase/replication/regionserver/ main/java/org/apache/hado...

Author: stack
Date: Sat Aug 28 00:31:18 2010
New Revision: 990310

URL: http://svn.apache.org/viewvc?rev=990310&view=rev
Log:

Bringing over ReplicationZooKeeperWatcher to use new ZK regime.
I renamed RZW as RZ.

M src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java
M src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
  Renamed RZW as RZ.
M src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
  Javadoc.
M src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
  Added getData.
M src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
  javadoc.
M src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
M src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
M src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
M src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
  Renamed RZW as RZ.
   private float ratio;
A src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
D src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
  Renamed

Added:
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
Removed:
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
Modified:
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=990310&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Sat Aug 28 00:31:18 2010
@@ -0,0 +1,558 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+
+/**
+ * This class serves as a helper for all things related to zookeeper
+ * in replication.
+ * <p/>
+ * The layout looks something like this under zookeeper.znode.parent
+ * for the master cluster:
+ * <p/>
+ * <pre>
+ * replication/
+ *  master     {contains a full cluster address}
+ *  state      {contains true or false}
+ *  clusterId  {contains a byte}
+ *  peers/
+ *    1/   {contains a full cluster address}
+ *    2/
+ *    ...
+ *  rs/ {lists all RS that replicate}
+ *    startcode1/ {lists all peer clusters}
+ *      1/ {lists hlogs to process}
+ *        10.10.1.76%3A53488.123456789 {contains nothing or a position}
+ *        10.10.1.76%3A53488.123456790
+ *        ...
+ *      2/
+ *      ...
+ *    startcode2/
+ *    ...
+ * </pre>
+ */
+public class ReplicationZookeeper {
+  private static final Log LOG =
+    LogFactory.getLog(ReplicationZookeeper.class);
+  // Name of znode we use to lock when failover
+  private final static String RS_LOCK_ZNODE = "lock";
+  // Our handle on zookeeper
+  private final ZooKeeperWatcher zookeeper;
+  // Map of addresses of peer clusters with their ZKW
+  private final Map<String, ReplicationZookeeper> peerClusters;
+  // Path to the root replication znode
+  private final String replicationZNode;
+  // Path to the peer clusters znode
+  private final String peersZNode;
+  // Path to the znode that contains all RS that replicates
+  private final String rsZNode;
+  // Path to this region server's name under rsZNode
+  private final String rsServerNameZnode;
+  // Name node if the replicationState znode
+  private final String replicationStateNodeName;
+  // If this RS is part of a master cluster
+  private final boolean replicationMaster;
+  private final Configuration conf;
+  // Is this cluster replicating at the moment?
+  private final AtomicBoolean replicating;
+  // Byte (stored as string here) that identifies this cluster
+  private final String clusterId;
+  // Abortable
+  private final Abortable abortable;
+
+  /**
+   * Constructor used by region servers, connects to the peer cluster right away.
+   *
+   * @param zookeeper
+   * @param conf             conf to use
+   * @param replicating    atomic boolean to start/stop replication
+   * @param rsName      the name of this region server, null if
+   *                         using RZH only to use the helping methods
+   * @throws IOException
+   * @throws KeeperException 
+   */
+  public ReplicationZookeeper(final Server server,
+      final Configuration conf, final AtomicBoolean replicating, String rsName)
+  throws IOException, KeeperException {
+    this.abortable = server;
+    this.zookeeper = server.getZooKeeper();
+    this.conf = conf;
+    String replicationZNodeName =
+        conf.get("zookeeper.znode.replication", "replication");
+    String peersZNodeName =
+        conf.get("zookeeper.znode.replication.peers", "peers");
+    String repMasterZNodeName =
+        conf.get("zookeeper.znode.replication.master", "master");
+    this.replicationStateNodeName =
+        conf.get("zookeeper.znode.replication.state", "state");
+    String clusterIdZNodeName =
+        conf.get("zookeeper.znode.replication.clusterId", "clusterId");
+    String rsZNodeName =
+        conf.get("zookeeper.znode.replication.rs", "rs");
+    String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
+          this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
+          this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+
+    this.peerClusters = new HashMap<String, ReplicationZookeeper>();
+    this.replicationZNode =
+      ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
+    this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
+    this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
+
+    this.replicating = replicating;
+    setReplicating();
+    String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName);
+    byte [] data = ZKUtil.getData(this.zookeeper, znode);
+    String idResult = Bytes.toString(data);
+    this.clusterId = idResult == null?
+      Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult;
+
+    znode = ZKUtil.joinZNode(this.replicationZNode, repMasterZNodeName);
+    data = ZKUtil.getData(this.zookeeper, znode);
+    String address = Bytes.toString(data);
+    this.replicationMaster = thisCluster.equals(address);
+    LOG.info("This cluster (" + thisCluster + ") is a " +
+      (this.replicationMaster ? "master" : "slave") + " for replication" +
+        ", compared with (" + address + ")");
+
+    if (rsName != null) {
+      this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, rsName);
+      // Set a tracker on replicationStateNodeNode
+      ReplicationStatusTracker tracker =
+        new ReplicationStatusTracker(this.zookeeper, getRepStateNode(), server);
+      tracker.start();
+
+      List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+      if (znodes != null) {
+        for (String z : znodes) {
+          connectToPeer(z);
+        }
+      }
+    } else {
+      this.rsServerNameZnode = null;
+    }
+
+  }
+
+  /**
+   * Returns all region servers from given peer
+   *
+   * @param peerClusterId (byte) the cluster to interrogate
+   * @return addresses of all region servers
+   */
+  public List<HServerAddress> getPeersAddresses(String peerClusterId) {
+    if (this.peerClusters.size() == 0) {
+      return new ArrayList<HServerAddress>(0);
+    }
+    ReplicationZookeeper zkw = this.peerClusters.get(peerClusterId);
+    return zkw == null?
+      new ArrayList<HServerAddress>(0):
+      zkw.scanAddressDirectory(this.zookeeper.rsZNode);
+  }
+
+  /**
+   * Scan a directory of address data.
+   * @param znode The parent node
+   * @return The directory contents as HServerAddresses
+   */
+  public List<HServerAddress> scanAddressDirectory(String znode) {
+    List<HServerAddress> list = new ArrayList<HServerAddress>();
+    List<String> nodes = null;
+    try {
+      nodes = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Scanning " + znode, e);
+    }
+    if (nodes == null) {
+      return list;
+    }
+    for (String node : nodes) {
+      String path = ZKUtil.joinZNode(znode, node);
+      list.add(readAddress(path));
+    }
+    return list;
+  }
+
+  private HServerAddress readAddress(String znode) {
+    byte [] data = null;
+    try {
+      data = ZKUtil.getData(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Getting address", e);
+    }
+    return new HServerAddress(Bytes.toString(data));
+  }
+
+  /**
+   * This method connects this cluster to another one and registers it
+   * in this region server's replication znode
+   * @param peerId id of the peer cluster
+   * @throws KeeperException 
+   */
+  private void connectToPeer(String peerId) throws IOException, KeeperException {
+    String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
+    byte [] data = ZKUtil.getData(this.zookeeper, znode);
+    String [] ensemble = Bytes.toString(data).split(":");
+    if (ensemble.length != 3) {
+      throw new IllegalArgumentException("Wrong format of cluster address: " +
+        Bytes.toStringBinary(data));
+    }
+    Configuration otherConf = new Configuration(this.conf);
+    otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
+    otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
+    otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
+    // REENABLE -- FIX!!!!
+    /*
+    ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf,
+        "connection to cluster: " + peerId);
+    zkw.registerListener(new ReplicationStatusWatcher());
+    this.peerClusters.put(peerId, zkw);
+    this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode(
+        this.rsServerNameZnode, peerId));
+        */
+    LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
+  }
+
+  /**
+   * This reads the state znode for replication and sets the atomic boolean
+   */
+  private void setReplicating() {
+    try {
+      byte [] data = ZKUtil.getDataAndWatch(this.zookeeper, getRepStateNode());
+      String value = Bytes.toString(data);
+      if (value == null) LOG.info(getRepStateNode() + " data is null");
+      else {
+        this.replicating.set(Boolean.parseBoolean(value));
+        LOG.info("Replication is now " + (this.replicating.get()?
+          "started" : "stopped"));
+      }
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed getting data on from " + getRepStateNode(), e);
+    }
+  }
+
+  private String getRepStateNode() {
+    return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
+  }
+
+  /**
+   * Add a new log to the list of hlogs in zookeeper
+   * @param filename name of the hlog's znode
+   * @param clusterId name of the cluster's znode
+   */
+  public void addLogToList(String filename, String clusterId) {
+    try {
+      String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
+      znode = ZKUtil.joinZNode(znode, filename);
+      ZKUtil.createAndWatch(this.zookeeper, znode, Bytes.toBytes(""));
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed add log to list", e);
+    }
+  }
+
+  /**
+   * Remove a log from the list of hlogs in zookeeper
+   * @param filename name of the hlog's znode
+   * @param clusterId name of the cluster's znode
+   */
+  public void removeLogFromList(String filename, String clusterId) {
+    try {
+      String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId);
+      znode = ZKUtil.joinZNode(znode, filename);
+      ZKUtil.deleteChildrenRecursively(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed remove from list", e);
+    }
+  }
+
+  /**
+   * Set the current position of the specified cluster in the current hlog
+   * @param filename filename name of the hlog's znode
+   * @param clusterId clusterId name of the cluster's znode
+   * @param position the position in the file
+   * @throws IOException
+   */
+  public void writeReplicationStatus(String filename, String clusterId,
+      long position) {
+    try {
+      String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
+      znode = ZKUtil.joinZNode(znode, filename);
+      // Why serialize String of Long and note Long as bytes?
+      ZKUtil.createAndWatch(this.zookeeper, znode,
+        Bytes.toBytes(Long.toString(position)));
+    } catch (KeeperException e) {
+      this.abortable.abort("Writing replication status", e);
+    }
+  }
+
+  /**
+   * Get a list of all the other region servers in this cluster
+   * and set a watch
+   * @param watch the watch to set
+   * @return a list of server nanes
+   */
+  public List<String> getRegisteredRegionServers(Watcher watch) {
+    List<String> result = null;
+    try {
+      // TODO: This is rsZNode from zk which is like getListOfReplicators
+      // but maybe these are from different zk instances?
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Get list of registered region servers", e);
+    }
+    return result;
+  }
+
+  /**
+   * Get the list of the replicators that have queues, they can be alive, dead
+   * or simply from a previous run
+   * @param watch the watche to set
+   * @return a list of server names
+   */
+  public List<String> getListOfReplicators() {
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Get list of replicators", e);
+    }
+    return result;
+  }
+
+  /**
+   * Get the list of peer clusters for the specified server names
+   * @param rs server names of the rs
+   * @param watch the watch to set
+   * @return a list of peer cluster
+   */
+  public List<String> getListPeersForRS(String rs) {
+    String znode = ZKUtil.joinZNode(rsZNode, rs);
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Get list of peers for rs", e);
+    }
+    return result;
+  }
+
+  /**
+   * Get the list of hlogs for the specified region server and peer cluster
+   * @param rs server names of the rs
+   * @param id peer cluster
+   * @param watch the watch to set
+   * @return a list of hlogs
+   */
+  public List<String> getListHLogsForPeerForRS(String rs, String id) {
+    String znode = ZKUtil.joinZNode(rsZNode, rs);
+    znode = ZKUtil.joinZNode(znode, id);
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Get list of hlogs for peer", e);
+    }
+    return result;
+  }
+
+  /**
+   * Try to set a lock in another server's znode.
+   * @param znode the server names of the other server
+   * @return true if the lock was acquired, false in every other cases
+   */
+  public boolean lockOtherRS(String znode) {
+    try {
+      String parent = ZKUtil.joinZNode(this.rsZNode, znode);
+      String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
+      ZKUtil.createAndWatch(this.zookeeper, p, Bytes.toBytes(rsServerNameZnode));
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed lock other rs", e);
+    }
+    return true;
+  }
+
+  /**
+   * This methods copies all the hlogs queues from another region server
+   * and returns them all sorted per peer cluster (appended with the dead
+   * server's znode)
+   * @param znode server names to copy
+   * @return all hlogs for all peers of that cluster, null if an error occurred
+   */
+  public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
+    // TODO this method isn't atomic enough, we could start copying and then
+    // TODO fail for some reason and we would end up with znodes we don't want.
+    SortedMap<String,SortedSet<String>> queues =
+        new TreeMap<String,SortedSet<String>>();
+    try {
+      String nodePath = ZKUtil.joinZNode(rsZNode, znode);
+      List<String> clusters =
+        ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
+      // We have a lock znode in there, it will count as one.
+      if (clusters == null || clusters.size() <= 1) {
+        return queues;
+      }
+      // The lock isn't a peer cluster, remove it
+      clusters.remove(RS_LOCK_ZNODE);
+      for (String cluster : clusters) {
+        // We add the name of the recovered RS to the new znode, we can even
+        // do that for queues that were recovered 10 times giving a znode like
+        // number-startcode-number-otherstartcode-number-anotherstartcode-etc
+        String newCluster = cluster+"-"+znode;
+        String newClusterZnode = ZKUtil.joinZNode(rsServerNameZnode, newCluster);
+        ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
+          HConstants.EMPTY_BYTE_ARRAY);
+        String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
+        List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
+        // That region server didn't have anything to replicate for this cluster
+        if (hlogs == null || hlogs.size() == 0) {
+          continue;
+        }
+        SortedSet<String> logQueue = new TreeSet<String>();
+        queues.put(newCluster, logQueue);
+        for (String hlog : hlogs) {
+          String z = ZKUtil.joinZNode(clusterPath, hlog);
+          byte [] position = ZKUtil.getData(this.zookeeper, z);
+          LOG.debug("Creating " + hlog + " with data " + Bytes.toString(position));
+          String child = ZKUtil.joinZNode(newClusterZnode, hlog);
+          ZKUtil.createAndWatch(this.zookeeper, child, position);
+          logQueue.add(hlog);
+        }
+      }
+    } catch (KeeperException e) {
+      this.abortable.abort("Copy queues from rs", e);
+    }
+    return queues;
+  }
+
+  /**
+   * Delete a complete queue of hlogs
+   * @param peerZnode znode of the peer cluster queue of hlogs to delete
+   */
+  public void deleteSource(String peerZnode) {
+    try {
+      ZKUtil.deleteChildrenRecursively(this.zookeeper,
+          ZKUtil.joinZNode(rsServerNameZnode, peerZnode));
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed delete of " + peerZnode, e);
+    }
+  }
+
+  /**
+   * Recursive deletion of all znodes in specified rs' znode
+   * @param znode
+   */
+  public void deleteRsQueues(String znode) {
+    try {
+      ZKUtil.deleteChildrenRecursively(this.zookeeper,
+          ZKUtil.joinZNode(rsZNode, znode));
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed delete of " + znode, e);
+    }
+  }
+
+  /**
+   * Delete this cluster's queues
+   */
+  public void deleteOwnRSZNode() {
+    deleteRsQueues(this.rsServerNameZnode);
+  }
+
+  /**
+   * Get the position of the specified hlog in the specified peer znode
+   * @param peerId znode of the peer cluster
+   * @param hlog name of the hlog
+   * @return the position in that hlog
+   * @throws KeeperException 
+   */
+  public long getHLogRepPosition(String peerId, String hlog)
+  throws KeeperException {
+    String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId);
+    String znode = ZKUtil.joinZNode(clusterZnode, hlog);
+    String data = Bytes.toString(ZKUtil.getData(this.zookeeper, znode));
+    return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
+  }
+
+  /**
+   * Tells if this cluster replicates or not
+   *
+   * @return if this is a master
+   */
+  public boolean isReplicationMaster() {
+    return this.replicationMaster;
+  }
+
+  /**
+   * Get the identification of the cluster
+   *
+   * @return the id for the cluster
+   */
+  public String getClusterId() {
+    return this.clusterId;
+  }
+
+  /**
+   * Get a map of all peer clusters
+   * @return map of peer cluster, zk address to ZKW
+   */
+  public Map<String, ReplicationZookeeper> getPeerClusters() {
+    return this.peerClusters;
+  }
+
+  /**
+   * Tracker for status of the replication
+   */
+  public class ReplicationStatusTracker extends ZooKeeperNodeTracker {
+    public ReplicationStatusTracker(ZooKeeperWatcher watcher, String node,
+        Abortable abortable) {
+      super(watcher, node, abortable);
+    }
+
+    @Override
+    public synchronized void nodeDataChanged(String path) {
+      super.nodeDataChanged(path);
+      setReplicating();
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Sat Aug 28 00:31:18 2010
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.LogCleanerDelegate;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 // REENALBE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -45,7 +45,7 @@ public class ReplicationLogCleaner imple
   private static final Log LOG =
     LogFactory.getLog(ReplicationLogCleaner.class);
   private Configuration conf;
-  private ReplicationZookeeperWrapper zkHelper;
+  private ReplicationZookeeper zkHelper;
   private Set<String> hlogs = new HashSet<String>();
 
   /**

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Sat Aug 28 00:31:18 2010
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -47,7 +47,7 @@ public class Replication implements WALO
   private final ReplicationSourceManager replicationManager;
   private boolean replicationMaster;
   private final AtomicBoolean replicating = new AtomicBoolean(true);
-  private final ReplicationZookeeperWrapper zkHelper;
+  private final ReplicationZookeeper zkHelper;
   private final Configuration conf;
   private ReplicationSink replicationSink;
   // Hosting server
@@ -68,7 +68,7 @@ public class Replication implements WALO
     this.conf = this.server.getConfiguration();
     this.replication = isReplication(this.conf);
     if (replication) {
-      this.zkHelper = new ReplicationZookeeperWrapper(server.getZooKeeper(),
+      this.zkHelper = new ReplicationZookeeper(server.getZooKeeper(),
         this.conf, this.replicating, this.server.getServerName());
       this.replicationMaster = zkHelper.isReplicationMaster();
       this.replicationManager = this.replicationMaster ?

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Sat Aug 28 00:31:18 2010
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.ipc.HRegi
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 
@@ -76,7 +76,7 @@ public class ReplicationSource extends T
   private HLog.Entry[] entriesArray;
   private HConnection conn;
   // Helper class for zookeeper
-  private ReplicationZookeeperWrapper zkHelper;
+  private ReplicationZookeeper zkHelper;
   private Configuration conf;
   // ratio of region servers to chose from a slave cluster
   private float ratio;

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Sat Aug 28 00:31:18 2010
@@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.Stoppable
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 
@@ -64,7 +64,7 @@ public class ReplicationSourceManager {
   // Indicates if we are currently replicating
   private final AtomicBoolean replicating;
   // Helper for zookeeper
-  private final ReplicationZookeeperWrapper zkHelper;
+  private final ReplicationZookeeper zkHelper;
   // All about stopping
   private final Stoppable stopper;
   // All logs we are currently trackign
@@ -91,7 +91,7 @@ public class ReplicationSourceManager {
    * @param logDir the directory that contains all hlog directories of live RSs
    * @param oldLogDir the directory where old logs are archived
    */
-  public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper,
+  public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
                                   final Configuration conf,
                                   final Stoppable stopper,
                                   final FileSystem fs,
@@ -231,7 +231,7 @@ public class ReplicationSourceManager {
    * Get the ZK help of this manager
    * @return the helper
    */
-  public ReplicationZookeeperWrapper getRepZkWrapper() {
+  public ReplicationZookeeper getRepZkWrapper() {
     return zkHelper;
   }
 

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java Sat Aug 28 00:31:18 2010
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HBaseConfiguration;
 
 /**
- * Tool for reading ZooKeeper servers from HBase XML configuation and producing
+ * Tool for reading ZooKeeper servers from HBase XML configuration and producing
  * a line-by-line list for use by bash scripts.
  */
 public class ZKServerTool {

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Sat Aug 28 00:31:18 2010
@@ -443,6 +443,31 @@ public class ZKUtil {
   //
 
   /**
+   * Get znode data. Does not set a watcher.
+   * @return ZNode data
+   */
+  public static byte [] getData(ZooKeeperWatcher zkw, String znode)
+  throws KeeperException {
+    try {
+      byte [] data = zkw.getZooKeeper().getData(znode, null, null);
+      zkw.debug("Retrieved " + data.length + " bytes of data from znode " + znode);
+      return data;
+    } catch (KeeperException.NoNodeException e) {
+      zkw.debug("Unable to get data of znode " + znode + " " +
+          "because node does not exist (not an error)");
+      return null;
+    } catch (KeeperException e) {
+      zkw.warn("Unable to get data of znode " + znode, e);
+      zkw.keeperException(e);
+      return null;
+    } catch (InterruptedException e) {
+      zkw.warn("Unable to get data of znode " + znode, e);
+      zkw.interruptedException(e);
+      return null;
+    }
+  }
+
+  /**
    * Get the data at the specified znode and set a watch.
    *
    * Returns the data and sets a watch if the node exists.  Returns null and no

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Sat Aug 28 00:31:18 2010
@@ -38,11 +38,11 @@ import org.apache.zookeeper.ZooKeeper;
  * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
  * for each Master, RegionServer, and client process.
  *
- * This is the only class that implements {@link Watcher}.  Other internal
+ * <p>This is the only class that implements {@link Watcher}.  Other internal
  * classes which need to be notified of ZooKeeper events must register with
  * the local instance of this watcher via {@link #registerListener}.
  *
- * This class also holds and manages the connection to ZooKeeper.  Code to deal
+ * <p>This class also holds and manages the connection to ZooKeeper.  Code to deal
  * with connection related events and exceptions are handled here.
  */
 public class ZooKeeperWatcher implements Watcher {

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java Sat Aug 28 00:31:18 2010
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -41,7 +41,7 @@ public class TestLogsCleaner {
 
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
-  private ReplicationZookeeperWrapper zkHelper;
+  private ReplicationZookeeper zkHelper;
 
   /**
    * @throws java.lang.Exception

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Sat Aug 28 00:31:18 2010
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
 // REENABLE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
 import org.junit.After;