You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/08/29 22:48:02 UTC
svn commit: r1378714 [2/2] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/client/replication/
main/java/org/apache/hadoop/hbase/protobuf/generated/
main/java/org/apache/hadoop/hbase/replication/
main/java/org/apache/hadoop/hbase/...
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1378714&r1=1378713&r2=1378714&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Wed Aug 29 20:48:02 2012
@@ -37,9 +37,12 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
@@ -47,8 +50,11 @@ import org.apache.hadoop.hbase.zookeeper
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import com.google.protobuf.InvalidProtocolBufferException;
+
/**
* This class serves as a helper for all things related to zookeeper in
* replication.
@@ -85,11 +91,6 @@ public class ReplicationZookeeper implem
// Name of znode we use to lock when failover
private final static String RS_LOCK_ZNODE = "lock";
- // Values of znode which stores state of a peer
- public static enum PeerState {
- ENABLED, DISABLED
- };
-
// Our handle on zookeeper
private final ZooKeeperWatcher zookeeper;
// Map of peer clusters keyed by their id
@@ -104,7 +105,8 @@ public class ReplicationZookeeper implem
private String rsServerNameZnode;
// Name node if the replicationState znode
private String replicationStateNodeName;
- // Name of zk node which stores peer state
+ // Name of zk node which stores peer state. The peer-state znode is under a
+ // peers' id node; e.g. /hbase/replication/peers/PEER_ID/peer-state
private String peerStateNodeName;
private final Configuration conf;
// Is this cluster replicating at the moment?
@@ -116,15 +118,24 @@ public class ReplicationZookeeper implem
private ReplicationStatusTracker statusTracker;
/**
+ * ZNode content if enabled state.
+ */
+ // Public so it can be seen by test code.
+ public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
+
+ /**
+ * ZNode content if disabled state.
+ */
+ static final byte[] DISABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
+
+ /**
* Constructor used by clients of replication (like master and HBase clients)
* @param conf conf to use
* @param zk zk connection to use
* @throws IOException
*/
public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
- final ZooKeeperWatcher zk)
- throws KeeperException {
-
+ final ZooKeeperWatcher zk) throws KeeperException {
this.conf = conf;
this.zookeeper = zk;
this.replicating = new AtomicBoolean();
@@ -156,27 +167,20 @@ public class ReplicationZookeeper implem
}
private void setZNodes(Abortable abortable) throws KeeperException {
- String replicationZNodeName =
- conf.get("zookeeper.znode.replication", "replication");
- String peersZNodeName =
- conf.get("zookeeper.znode.replication.peers", "peers");
- this.peerStateNodeName = conf.get(
- "zookeeper.znode.replication.peers.state", "peer-state");
- this.replicationStateNodeName =
- conf.get("zookeeper.znode.replication.state", "state");
- String rsZNodeName =
- conf.get("zookeeper.znode.replication.rs", "rs");
+ String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
+ String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
+ this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
+ this.replicationStateNodeName = conf.get("zookeeper.znode.replication.state", "state");
+ String rsZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
- this.replicationZNode =
- ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
+ this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
// Set a tracker on replicationStateNodeNode
- this.statusTracker =
- new ReplicationStatusTracker(this.zookeeper, abortable);
+ this.statusTracker = new ReplicationStatusTracker(this.zookeeper, abortable);
statusTracker.start();
readReplicationStateZnode();
}
@@ -214,14 +218,22 @@ public class ReplicationZookeeper implem
try {
ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
for (String id : ids) {
- peers.put(id, Bytes.toString(ZKUtil.getData(this.zookeeper,
- ZKUtil.joinZNode(this.peersZNode, id))));
+ byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+ String clusterKey = null;
+ try {
+ clusterKey = parsePeerFrom(bytes);
+ } catch (DeserializationException de) {
+ LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
+ continue;
+ }
+ peers.put(id, clusterKey);
}
} catch (KeeperException e) {
this.abortable.abort("Cannot get the list of peers ", e);
}
return peers;
}
+
/**
* Returns all region servers from given peer
*
@@ -337,7 +349,13 @@ public class ReplicationZookeeper implem
public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
byte [] data = ZKUtil.getData(this.zookeeper, znode);
- String otherClusterKey = Bytes.toString(data);
+ String otherClusterKey = "";
+ try {
+ otherClusterKey = parsePeerFrom(data);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse of cluster key from peerId=" + peerId
+ + ", specifically the content from the following znode: " + znode);
+ }
if (this.ourClusterKey.equals(otherClusterKey)) {
LOG.debug("Not connecting to " + peerId + " because it's us");
return null;
@@ -364,9 +382,9 @@ public class ReplicationZookeeper implem
public void setReplicating(boolean newState) throws KeeperException {
ZKUtil.createWithParents(this.zookeeper,
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
+ byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
ZKUtil.setData(this.zookeeper,
- ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName),
- Bytes.toBytes(Boolean.toString(newState)));
+ ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName), stateBytes);
}
/**
@@ -401,15 +419,165 @@ public class ReplicationZookeeper implem
throw new IllegalArgumentException("Cannot add existing peer");
}
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
- ZKUtil.createAndWatch(this.zookeeper,
- ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey));
- ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id),
- Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default
+ ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
+ toByteArray(clusterKey));
+ // A peer is enabled by default
+ ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id), ENABLED_ZNODE_BYTES);
} catch (KeeperException e) {
throw new IOException("Unable to add peer", e);
}
}
+ /**
+ * @param clusterKey
+ * @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix
+ * prepended suitable for use as content of a this.peersZNode; i.e.
+ * the content of PEER_ID znode under /hbase/replication/peers/PEER_ID
+ */
+ static byte[] toByteArray(final String clusterKey) {
+ byte[] bytes = ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
+ .toByteArray();
+ return ProtobufUtil.prependPBMagic(bytes);
+ }
+
+ /**
+ * @param state
+ * @return Serialized protobuf of <code>state</code> with pb magic prefix
+ * prepended suitable for use as content of either the cluster state
+ * znode -- whether or not we should be replicating kept in
+ * /hbase/replication/state -- or as content of a peer-state znode
+ * under a peer cluster id as in
+ * /hbase/replication/peers/PEER_ID/peer-state.
+ */
+ static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
+ byte[] bytes = ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build()
+ .toByteArray();
+ return ProtobufUtil.prependPBMagic(bytes);
+ }
+
+ /**
+ * @param position
+ * @return Serialized protobuf of <code>position</code> with pb magic prefix
+ * prepended suitable for use as content of an hlog position in a
+ * replication queue.
+ */
+ static byte[] toByteArray(
+ final long position) {
+ byte[] bytes = ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position)
+ .build().toByteArray();
+ return ProtobufUtil.prependPBMagic(bytes);
+ }
+
+ /**
+ * @param lockOwner
+ * @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix
+ * prepended suitable for use as content of an replication lock during
+ * region server fail over.
+ */
+ static byte[] lockToByteArray(
+ final String lockOwner) {
+ byte[] bytes = ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build()
+ .toByteArray();
+ return ProtobufUtil.prependPBMagic(bytes);
+ }
+
+ /**
+ * @param bytes Content of a peer znode.
+ * @return ClusterKey parsed from the passed bytes.
+ * @throws DeserializationException
+ */
+ static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
+ if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer
+ .newBuilder();
+ ZooKeeperProtos.ReplicationPeer peer;
+ try {
+ peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ return peer.getClusterkey();
+ } else {
+ if (bytes.length > 0) {
+ return Bytes.toString(bytes);
+ }
+ return "";
+ }
+ }
+
+ /**
+ * @param bytes Content of a state znode.
+ * @return State parsed from the passed bytes.
+ * @throws DeserializationException
+ */
+ static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+ throws DeserializationException {
+ ProtobufUtil.expectPBMagicPrefix(bytes);
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
+ .newBuilder();
+ ZooKeeperProtos.ReplicationState state;
+ try {
+ state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+ return state.getState();
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ }
+
+ /**
+ * @param bytes - Content of a HLog position znode.
+ * @return long - The current HLog position.
+ * @throws DeserializationException
+ */
+ static long parseHLogPositionFrom(
+ final byte[] bytes) throws DeserializationException {
+ if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.ReplicationHLogPosition.Builder builder = ZooKeeperProtos.ReplicationHLogPosition
+ .newBuilder();
+ ZooKeeperProtos.ReplicationHLogPosition position;
+ try {
+ position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ return position.getPosition();
+ } else {
+ if (bytes.length > 0) {
+ return Bytes.toLong(bytes);
+ }
+ return 0;
+ }
+ }
+
+ /**
+ * @param bytes - Content of a lock znode.
+ * @return String - The owner of the lock.
+ * @throws DeserializationException
+ */
+ static String parseLockOwnerFrom(
+ final byte[] bytes) throws DeserializationException {
+ if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.ReplicationLock.Builder builder = ZooKeeperProtos.ReplicationLock
+ .newBuilder();
+ ZooKeeperProtos.ReplicationLock lock;
+ try {
+ lock = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ return lock.getLockOwner();
+ } else {
+ if (bytes.length > 0) {
+ return Bytes.toString(bytes);
+ }
+ return "";
+ }
+ }
+
private boolean peerExists(String id) throws KeeperException {
return ZKUtil.checkExists(this.zookeeper,
ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
@@ -423,7 +591,7 @@ public class ReplicationZookeeper implem
* Thrown when the peer doesn't exist
*/
public void enablePeer(String id) throws IOException {
- changePeerState(id, PeerState.ENABLED);
+ changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
LOG.info("peer " + id + " is enabled");
}
@@ -435,22 +603,23 @@ public class ReplicationZookeeper implem
* Thrown when the peer doesn't exist
*/
public void disablePeer(String id) throws IOException {
- changePeerState(id, PeerState.DISABLED);
+ changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
LOG.info("peer " + id + " is disabled");
}
- private void changePeerState(String id, PeerState state) throws IOException {
+ private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
+ throws IOException {
try {
if (!peerExists(id)) {
throw new IllegalArgumentException("peer " + id + " is not registered");
}
String peerStateZNode = getPeerStateNode(id);
+ byte[] stateBytes = (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
+ : DISABLED_ZNODE_BYTES;
if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
- ZKUtil.setData(this.zookeeper, peerStateZNode,
- Bytes.toBytes(state.name()));
+ ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
} else {
- ZKUtil.createAndWatch(zookeeper, peerStateZNode,
- Bytes.toBytes(state.name()));
+ ZKUtil.createAndWatch(zookeeper, peerStateZNode, stateBytes);
}
LOG.info("state of the peer " + id + " changed to " + state.name());
} catch (KeeperException e) {
@@ -459,18 +628,6 @@ public class ReplicationZookeeper implem
}
/**
- * Get state of the peer. This method checks the state by connecting to ZK.
- *
- * @param id peer's identifier
- * @return current state of the peer
- */
- public PeerState getPeerState(String id) throws KeeperException {
- byte[] peerStateBytes = ZKUtil
- .getData(this.zookeeper, getPeerStateNode(id));
- return PeerState.valueOf(Bytes.toString(peerStateBytes));
- }
-
- /**
* Check whether the peer is enabled or not. This method checks the atomic
* boolean of ReplicationPeer locally.
*
@@ -487,8 +644,7 @@ public class ReplicationZookeeper implem
}
private String getPeerStateNode(String id) {
- return ZKUtil.joinZNode(this.peersZNode,
- ZKUtil.joinZNode(id, this.peerStateNodeName));
+ return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
}
/**
@@ -516,7 +672,11 @@ public class ReplicationZookeeper implem
setReplicating(true);
return true;
}
- return Boolean.parseBoolean(Bytes.toString(data));
+ try {
+ return isPeerEnabled(data);
+ } catch (DeserializationException e) {
+ throw ZKUtil.convert(e);
+ }
}
private String getRepStateNode() {
@@ -563,8 +723,7 @@ public class ReplicationZookeeper implem
String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
znode = ZKUtil.joinZNode(znode, filename);
// Why serialize String of Long and note Long as bytes?
- ZKUtil.setData(this.zookeeper, znode,
- Bytes.toBytes(Long.toString(position)));
+ ZKUtil.setData(this.zookeeper, znode, toByteArray(position));
} catch (KeeperException e) {
this.abortable.abort("Writing replication status", e);
}
@@ -648,7 +807,7 @@ public class ReplicationZookeeper implem
return false;
}
String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
- ZKUtil.createAndWatch(this.zookeeper, p, Bytes.toBytes(rsServerNameZnode));
+ ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(rsServerNameZnode));
} catch (KeeperException e) {
// This exception will pop up if the znode under which we're trying to
// create the lock is already deleted by another region server, meaning
@@ -707,10 +866,18 @@ public class ReplicationZookeeper implem
queues.put(newCluster, logQueue);
for (String hlog : hlogs) {
String z = ZKUtil.joinZNode(clusterPath, hlog);
- byte [] position = ZKUtil.getData(this.zookeeper, z);
- LOG.debug("Creating " + hlog + " with data " + Bytes.toString(position));
+ byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
+ long position = 0;
+ try {
+ position = parseHLogPositionFrom(positionBytes);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse of hlog position from the following znode: " + z);
+ }
+ LOG.debug("Creating " + hlog + " with data " + position);
String child = ZKUtil.joinZNode(newClusterZnode, hlog);
- ZKUtil.createAndWatch(this.zookeeper, child, position);
+ // Position doesn't actually change, we are just deserializing it for
+ // logging, so just use the already serialized version
+ ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
logQueue.add(hlog);
}
}
@@ -797,8 +964,16 @@ public class ReplicationZookeeper implem
throws KeeperException {
String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId);
String znode = ZKUtil.joinZNode(clusterZnode, hlog);
- String data = Bytes.toString(ZKUtil.getData(this.zookeeper, znode));
- return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
+ byte[] bytes = ZKUtil.getData(this.zookeeper, znode);
+ try {
+ return parseHLogPositionFrom(bytes);
+ } catch (DeserializationException de) {
+ LOG.warn("Failed parse of HLogPosition for peerId=" + peerId + " and hlog=" + hlog
+ + "znode content, continuing.");
+ }
+ // if we can not parse the position, start at the beginning of the hlog file
+ // again
+ return 0;
}
public void registerRegionServerListener(ZooKeeperListener listener) {
@@ -847,6 +1022,35 @@ public class ReplicationZookeeper implem
}
/**
+ * Utility method to ensure an ENABLED znode is in place; if not present, we
+ * create it.
+ * @param zookeeper
+ * @param path Path to znode to check
+ * @return True if we created the znode.
+ * @throws NodeExistsException
+ * @throws KeeperException
+ */
+ static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
+ throws NodeExistsException, KeeperException {
+ if (ZKUtil.checkExists(zookeeper, path) == -1) {
+ ZKUtil.createAndWatch(zookeeper, path, ENABLED_ZNODE_BYTES);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * @param bytes
+ * @return True if the passed in <code>bytes</code> are those of a pb
+ * serialized ENABLED state.
+ * @throws DeserializationException
+ */
+ static boolean isPeerEnabled(final byte[] bytes) throws DeserializationException {
+ ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
+ return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
+ }
+
+ /**
* Tracker for status of the replication
*/
public class ReplicationStatusTracker extends ZooKeeperNodeTracker {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1378714&r1=1378713&r2=1378714&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Wed Aug 29 20:48:02 2012
@@ -93,8 +93,8 @@ public class Replication implements WALA
throw new IOException("Failed replication handler create " +
"(replicating=" + this.replicating, ke);
}
- this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
- this.server, fs, this.replicating, logDir, oldLogDir) ;
+ this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs,
+ this.replicating, logDir, oldLogDir);
} else {
this.replicationManager = null;
this.zkHelper = null;
Modified: hbase/trunk/hbase-server/src/main/protobuf/ZooKeeper.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/ZooKeeper.proto?rev=1378714&r1=1378713&r2=1378714&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/ZooKeeper.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/ZooKeeper.proto Wed Aug 29 20:48:02 2012
@@ -98,3 +98,37 @@ message Table {
// for more.
required State state = 1 [default = ENABLED];
}
+
+/**
+ * Used by replication. Holds a replication peer key.
+ */
+message ReplicationPeer {
+ // clusterKey is the concatenation of the slave cluster's
+ // hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
+ required string clusterkey = 1;
+}
+
+/**
+ * Used by replication. Holds whether enabled or disabled
+ */
+message ReplicationState {
+ enum State {
+ ENABLED = 0;
+ DISABLED = 1;
+ }
+ required State state = 1;
+}
+
+/**
+ * Used by replication. Holds the current position in an HLog file.
+ */
+message ReplicationHLogPosition {
+ required int64 position = 1;
+}
+
+/**
+ * Used by replication. Used to lock a region server during failover.
+ */
+message ReplicationLock {
+ required string lockOwner = 1;
+}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1378714&r1=1378713&r2=1378714&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Wed Aug 29 20:48:02 2012
@@ -112,9 +112,9 @@ public class TestReplicationSourceManage
+ conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
- Bytes.toBytes(ReplicationZookeeper.PeerState.ENABLED.name()));
+ ReplicationZookeeper.ENABLED_ZNODE_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
- ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true"));
+ ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationZookeeper.ENABLED_ZNODE_BYTES);
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
manager = replication.getReplicationManager();
@@ -135,8 +135,6 @@ public class TestReplicationSourceManage
htd.addFamily(col);
hri = new HRegionInfo(htd.getName(), r1, r2);
-
-
}
@AfterClass