You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/08/29 22:48:02 UTC

svn commit: r1378714 [2/2] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/client/replication/ main/java/org/apache/hadoop/hbase/protobuf/generated/ main/java/org/apache/hadoop/hbase/replication/ main/java/org/apache/hadoop/hbase/...

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1378714&r1=1378713&r2=1378714&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Wed Aug 29 20:48:02 2012
@@ -37,9 +37,12 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.DeserializationException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
@@ -47,8 +50,11 @@ import org.apache.hadoop.hbase.zookeeper
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
 
+import com.google.protobuf.InvalidProtocolBufferException;
+
 /**
  * This class serves as a helper for all things related to zookeeper in
  * replication.
@@ -85,11 +91,6 @@ public class ReplicationZookeeper implem
   // Name of znode we use to lock when failover
   private final static String RS_LOCK_ZNODE = "lock";
 
-  // Values of znode which stores state of a peer
-  public static enum PeerState {
-    ENABLED, DISABLED
-  };
-
   // Our handle on zookeeper
   private final ZooKeeperWatcher zookeeper;
   // Map of peer clusters keyed by their id
@@ -104,7 +105,8 @@ public class ReplicationZookeeper implem
   private String rsServerNameZnode;
   // Name node if the replicationState znode
   private String replicationStateNodeName;
-  // Name of zk node which stores peer state
+  // Name of zk node which stores peer state. The peer-state znode is under a
+  // peers' id node; e.g. /hbase/replication/peers/PEER_ID/peer-state
   private String peerStateNodeName;
   private final Configuration conf;
   // Is this cluster replicating at the moment?
@@ -116,15 +118,24 @@ public class ReplicationZookeeper implem
   private ReplicationStatusTracker statusTracker;
 
   /**
+   * ZNode content if enabled state.
+   */
+  // Public so it can be seen by test code.
+  public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
+
+  /**
+   * ZNode content if disabled state.
+   */
+  static final byte[] DISABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
+
+  /**
    * Constructor used by clients of replication (like master and HBase clients)
    * @param conf  conf to use
    * @param zk    zk connection to use
    * @throws IOException
    */
   public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
-                              final ZooKeeperWatcher zk)
-    throws KeeperException {
-
+      final ZooKeeperWatcher zk) throws KeeperException {
     this.conf = conf;
     this.zookeeper = zk;
     this.replicating = new AtomicBoolean();
@@ -156,27 +167,20 @@ public class ReplicationZookeeper implem
   }
 
   private void setZNodes(Abortable abortable) throws KeeperException {
-    String replicationZNodeName =
-        conf.get("zookeeper.znode.replication", "replication");
-    String peersZNodeName =
-        conf.get("zookeeper.znode.replication.peers", "peers");
-    this.peerStateNodeName = conf.get(
-        "zookeeper.znode.replication.peers.state", "peer-state");
-    this.replicationStateNodeName =
-        conf.get("zookeeper.znode.replication.state", "state");
-    String rsZNodeName =
-        conf.get("zookeeper.znode.replication.rs", "rs");
+    String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
+    String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
+    this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
+    this.replicationStateNodeName = conf.get("zookeeper.znode.replication.state", "state");
+    String rsZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
     this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
-    this.replicationZNode =
-      ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
+    this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
     this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
     ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
     this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
     ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
 
     // Set a tracker on replicationStateNodeNode
-    this.statusTracker =
-        new ReplicationStatusTracker(this.zookeeper, abortable);
+    this.statusTracker = new ReplicationStatusTracker(this.zookeeper, abortable);
     statusTracker.start();
     readReplicationStateZnode();
   }
@@ -214,14 +218,22 @@ public class ReplicationZookeeper implem
     try {
       ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
       for (String id : ids) {
-        peers.put(id, Bytes.toString(ZKUtil.getData(this.zookeeper,
-            ZKUtil.joinZNode(this.peersZNode, id))));
+        byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+        String clusterKey = null;
+        try {
+          clusterKey = parsePeerFrom(bytes);
+        } catch (DeserializationException de) {
+          LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
+          continue;
+        }
+        peers.put(id, clusterKey);
       }
     } catch (KeeperException e) {
       this.abortable.abort("Cannot get the list of peers ", e);
     }
     return peers;
   }
+
   /**
    * Returns all region servers from given peer
    *
@@ -337,7 +349,13 @@ public class ReplicationZookeeper implem
   public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
     String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
     byte [] data = ZKUtil.getData(this.zookeeper, znode);
-    String otherClusterKey = Bytes.toString(data);
+    String otherClusterKey = "";
+    try {
+      otherClusterKey = parsePeerFrom(data);
+    } catch (DeserializationException e) {
+      LOG.warn("Failed parse of cluster key from peerId=" + peerId
+          + ", specifically the content from the following znode: " + znode);
+    }
     if (this.ourClusterKey.equals(otherClusterKey)) {
       LOG.debug("Not connecting to " + peerId + " because it's us");
       return null;
@@ -364,9 +382,9 @@ public class ReplicationZookeeper implem
   public void setReplicating(boolean newState) throws KeeperException {
     ZKUtil.createWithParents(this.zookeeper,
         ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
+    byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
     ZKUtil.setData(this.zookeeper,
-        ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName),
-        Bytes.toBytes(Boolean.toString(newState)));
+      ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName), stateBytes);
   }
 
   /**
@@ -401,15 +419,165 @@ public class ReplicationZookeeper implem
         throw new IllegalArgumentException("Cannot add existing peer");
       }
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
-      ZKUtil.createAndWatch(this.zookeeper,
-          ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey));
-      ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id),
-          Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default
+      ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
+        toByteArray(clusterKey));
+      // A peer is enabled by default
+      ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id), ENABLED_ZNODE_BYTES);
     } catch (KeeperException e) {
       throw new IOException("Unable to add peer", e);
     }
   }
 
+  /**
+   * @param clusterKey
+   * @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix
+   *         prepended suitable for use as content of a this.peersZNode; i.e.
+   *         the content of PEER_ID znode under /hbase/replication/peers/PEER_ID
+   */
+  static byte[] toByteArray(final String clusterKey) {
+    byte[] bytes = ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
+        .toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+
+  /**
+   * @param state
+   * @return Serialized protobuf of <code>state</code> with pb magic prefix
+   *         prepended suitable for use as content of either the cluster state
+   *         znode -- whether or not we should be replicating kept in
+   *         /hbase/replication/state -- or as content of a peer-state znode
+   *         under a peer cluster id as in
+   *         /hbase/replication/peers/PEER_ID/peer-state.
+   */
+  static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
+    byte[] bytes = ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build()
+        .toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+
+  /**
+   * @param position
+   * @return Serialized protobuf of <code>position</code> with pb magic prefix
+   *         prepended suitable for use as content of an hlog position in a
+   *         replication queue.
+   */
+  static byte[] toByteArray(
+      final long position) {
+    byte[] bytes = ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position)
+        .build().toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+
+  /**
+   * @param lockOwner
+   * @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix
+   *         prepended suitable for use as content of an replication lock during
+   *         region server fail over.
+   */
+  static byte[] lockToByteArray(
+      final String lockOwner) {
+    byte[] bytes = ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build()
+        .toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+
+  /**
+   * @param bytes Content of a peer znode.
+   * @return ClusterKey parsed from the passed bytes.
+   * @throws DeserializationException
+   */
+  static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
+    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+      int pblen = ProtobufUtil.lengthOfPBMagic();
+      ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer
+          .newBuilder();
+      ZooKeeperProtos.ReplicationPeer peer;
+      try {
+        peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+      } catch (InvalidProtocolBufferException e) {
+        throw new DeserializationException(e);
+      }
+      return peer.getClusterkey();
+    } else {
+      if (bytes.length > 0) {
+        return Bytes.toString(bytes);
+      }
+      return "";
+    }
+  }
+
+  /**
+   * @param bytes Content of a state znode.
+   * @return State parsed from the passed bytes.
+   * @throws DeserializationException
+   */
+  static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+      throws DeserializationException {
+    ProtobufUtil.expectPBMagicPrefix(bytes);
+    int pblen = ProtobufUtil.lengthOfPBMagic();
+    ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
+        .newBuilder();
+    ZooKeeperProtos.ReplicationState state;
+    try {
+      state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+      return state.getState();
+    } catch (InvalidProtocolBufferException e) {
+      throw new DeserializationException(e);
+    }
+  }
+
+  /**
+   * @param bytes - Content of a HLog position znode.
+   * @return long - The current HLog position.
+   * @throws DeserializationException
+   */
+  static long parseHLogPositionFrom(
+      final byte[] bytes) throws DeserializationException {
+    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+      int pblen = ProtobufUtil.lengthOfPBMagic();
+      ZooKeeperProtos.ReplicationHLogPosition.Builder builder = ZooKeeperProtos.ReplicationHLogPosition
+          .newBuilder();
+      ZooKeeperProtos.ReplicationHLogPosition position;
+      try {
+        position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+      } catch (InvalidProtocolBufferException e) {
+        throw new DeserializationException(e);
+      }
+      return position.getPosition();
+    } else {
+      if (bytes.length > 0) {
+        return Bytes.toLong(bytes);
+      }
+      return 0;
+    }
+  }
+
+  /**
+   * @param bytes - Content of a lock znode.
+   * @return String - The owner of the lock.
+   * @throws DeserializationException
+   */
+  static String parseLockOwnerFrom(
+      final byte[] bytes) throws DeserializationException {
+    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+      int pblen = ProtobufUtil.lengthOfPBMagic();
+      ZooKeeperProtos.ReplicationLock.Builder builder = ZooKeeperProtos.ReplicationLock
+          .newBuilder();
+      ZooKeeperProtos.ReplicationLock lock;
+      try {
+        lock = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+      } catch (InvalidProtocolBufferException e) {
+        throw new DeserializationException(e);
+      }
+      return lock.getLockOwner();
+    } else {
+      if (bytes.length > 0) {
+        return Bytes.toString(bytes);
+      }
+      return "";
+    }
+  }
+
   private boolean peerExists(String id) throws KeeperException {
     return ZKUtil.checkExists(this.zookeeper,
           ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
@@ -423,7 +591,7 @@ public class ReplicationZookeeper implem
    *           Thrown when the peer doesn't exist
    */
   public void enablePeer(String id) throws IOException {
-    changePeerState(id, PeerState.ENABLED);
+    changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
     LOG.info("peer " + id + " is enabled");
   }
 
@@ -435,22 +603,23 @@ public class ReplicationZookeeper implem
    *           Thrown when the peer doesn't exist
    */
   public void disablePeer(String id) throws IOException {
-    changePeerState(id, PeerState.DISABLED);
+    changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
     LOG.info("peer " + id + " is disabled");
   }
 
-  private void changePeerState(String id, PeerState state) throws IOException {
+  private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
+      throws IOException {
     try {
       if (!peerExists(id)) {
         throw new IllegalArgumentException("peer " + id + " is not registered");
       }
       String peerStateZNode = getPeerStateNode(id);
+      byte[] stateBytes = (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
+          : DISABLED_ZNODE_BYTES;
       if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
-        ZKUtil.setData(this.zookeeper, peerStateZNode,
-          Bytes.toBytes(state.name()));
+        ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
       } else {
-        ZKUtil.createAndWatch(zookeeper, peerStateZNode,
-            Bytes.toBytes(state.name()));
+        ZKUtil.createAndWatch(zookeeper, peerStateZNode, stateBytes);
       }
       LOG.info("state of the peer " + id + " changed to " + state.name());
     } catch (KeeperException e) {
@@ -459,18 +628,6 @@ public class ReplicationZookeeper implem
   }
 
   /**
-   * Get state of the peer. This method checks the state by connecting to ZK.
-   *
-   * @param id peer's identifier
-   * @return current state of the peer
-   */
-  public PeerState getPeerState(String id) throws KeeperException {
-    byte[] peerStateBytes = ZKUtil
-        .getData(this.zookeeper, getPeerStateNode(id));
-    return PeerState.valueOf(Bytes.toString(peerStateBytes));
-  }
-
-  /**
    * Check whether the peer is enabled or not. This method checks the atomic
    * boolean of ReplicationPeer locally.
    *
@@ -487,8 +644,7 @@ public class ReplicationZookeeper implem
   }
 
   private String getPeerStateNode(String id) {
-    return ZKUtil.joinZNode(this.peersZNode,
-        ZKUtil.joinZNode(id, this.peerStateNodeName));
+    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
   }
 
   /**
@@ -516,7 +672,11 @@ public class ReplicationZookeeper implem
       setReplicating(true);
       return true;
     }
-    return Boolean.parseBoolean(Bytes.toString(data));
+    try {
+      return isPeerEnabled(data);
+    } catch (DeserializationException e) {
+      throw ZKUtil.convert(e);
+    }
   }
 
   private String getRepStateNode() {
@@ -563,8 +723,7 @@ public class ReplicationZookeeper implem
       String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
       znode = ZKUtil.joinZNode(znode, filename);
       // Why serialize String of Long and note Long as bytes?
-      ZKUtil.setData(this.zookeeper, znode,
-        Bytes.toBytes(Long.toString(position)));
+      ZKUtil.setData(this.zookeeper, znode, toByteArray(position));
     } catch (KeeperException e) {
       this.abortable.abort("Writing replication status", e);
     }
@@ -648,7 +807,7 @@ public class ReplicationZookeeper implem
         return false;
       }
       String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
-      ZKUtil.createAndWatch(this.zookeeper, p, Bytes.toBytes(rsServerNameZnode));
+      ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(rsServerNameZnode));
     } catch (KeeperException e) {
       // This exception will pop up if the znode under which we're trying to
       // create the lock is already deleted by another region server, meaning
@@ -707,10 +866,18 @@ public class ReplicationZookeeper implem
         queues.put(newCluster, logQueue);
         for (String hlog : hlogs) {
           String z = ZKUtil.joinZNode(clusterPath, hlog);
-          byte [] position = ZKUtil.getData(this.zookeeper, z);
-          LOG.debug("Creating " + hlog + " with data " + Bytes.toString(position));
+          byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
+          long position = 0;
+          try {
+            position = parseHLogPositionFrom(positionBytes);
+          } catch (DeserializationException e) {
+            LOG.warn("Failed parse of hlog position from the following znode: " + z);
+          }
+          LOG.debug("Creating " + hlog + " with data " + position);
           String child = ZKUtil.joinZNode(newClusterZnode, hlog);
-          ZKUtil.createAndWatch(this.zookeeper, child, position);
+          // Position doesn't actually change, we are just deserializing it for
+          // logging, so just use the already serialized version
+          ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
           logQueue.add(hlog);
         }
       }
@@ -797,8 +964,16 @@ public class ReplicationZookeeper implem
   throws KeeperException {
     String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId);
     String znode = ZKUtil.joinZNode(clusterZnode, hlog);
-    String data = Bytes.toString(ZKUtil.getData(this.zookeeper, znode));
-    return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
+    byte[] bytes = ZKUtil.getData(this.zookeeper, znode);
+    try {
+      return parseHLogPositionFrom(bytes);
+    } catch (DeserializationException de) {
+      LOG.warn("Failed parse of HLogPosition for peerId=" + peerId + " and hlog=" + hlog
+          + "znode content, continuing.");
+    }
+    // if we can not parse the position, start at the beginning of the hlog file
+    // again
+    return 0;
   }
 
   public void registerRegionServerListener(ZooKeeperListener listener) {
@@ -847,6 +1022,35 @@ public class ReplicationZookeeper implem
   }
 
   /**
+   * Utility method to ensure an ENABLED znode is in place; if not present, we
+   * create it.
+   * @param zookeeper
+   * @param path Path to znode to check
+   * @return True if we created the znode.
+   * @throws NodeExistsException
+   * @throws KeeperException
+   */
+  static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
+      throws NodeExistsException, KeeperException {
+    if (ZKUtil.checkExists(zookeeper, path) == -1) {
+      ZKUtil.createAndWatch(zookeeper, path, ENABLED_ZNODE_BYTES);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * @param bytes
+   * @return True if the passed in <code>bytes</code> are those of a pb
+   *         serialized ENABLED state.
+   * @throws DeserializationException
+   */
+  static boolean isPeerEnabled(final byte[] bytes) throws DeserializationException {
+    ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
+    return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
+  }
+
+  /**
    * Tracker for status of the replication
    */
   public class ReplicationStatusTracker extends ZooKeeperNodeTracker {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1378714&r1=1378713&r2=1378714&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Wed Aug 29 20:48:02 2012
@@ -93,8 +93,8 @@ public class Replication implements WALA
         throw new IOException("Failed replication handler create " +
            "(replicating=" + this.replicating, ke);
       }
-      this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
-          this.server, fs, this.replicating, logDir, oldLogDir) ;
+      this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs,
+          this.replicating, logDir, oldLogDir);
     } else {
       this.replicationManager = null;
       this.zkHelper = null;

Modified: hbase/trunk/hbase-server/src/main/protobuf/ZooKeeper.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/ZooKeeper.proto?rev=1378714&r1=1378713&r2=1378714&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/ZooKeeper.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/ZooKeeper.proto Wed Aug 29 20:48:02 2012
@@ -98,3 +98,37 @@ message Table {
   // for more.
   required State state = 1 [default = ENABLED];
 }
+
+/**
+ * Used by replication. Holds a replication peer key.
+ */
+message ReplicationPeer {
+  // clusterKey is the concatenation of the slave cluster's
+  // hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
+  required string clusterkey = 1;
+}
+
+/**
+ * Used by replication. Holds whether enabled or disabled
+ */
+message ReplicationState {
+  enum State {
+    ENABLED = 0;
+    DISABLED = 1;
+  }
+  required State state = 1;
+}
+
+/**
+ * Used by replication. Holds the current position in an HLog file.
+ */
+message ReplicationHLogPosition {
+  required int64 position = 1;
+}
+
+/**
+ * Used by replication. Used to lock a region server during failover.
+ */
+message ReplicationLock {
+  required string lockOwner = 1;
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1378714&r1=1378713&r2=1378714&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Wed Aug 29 20:48:02 2012
@@ -112,9 +112,9 @@ public class TestReplicationSourceManage
             + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
     ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
     ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
-        Bytes.toBytes(ReplicationZookeeper.PeerState.ENABLED.name()));
+      ReplicationZookeeper.ENABLED_ZNODE_BYTES);
     ZKUtil.createWithParents(zkw, "/hbase/replication/state");
-    ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true"));
+    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationZookeeper.ENABLED_ZNODE_BYTES);
 
     replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
     manager = replication.getReplicationManager();
@@ -135,8 +135,6 @@ public class TestReplicationSourceManage
     htd.addFamily(col);
 
     hri = new HRegionInfo(htd.getName(), r1, r2);
-
-
   }
 
   @AfterClass