You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/28 02:31:19 UTC
svn commit: r990310 - in /hbase/branches/0.90_master_rewrite/src:
main/java/org/apache/hadoop/hbase/replication/
main/java/org/apache/hadoop/hbase/replication/master/
main/java/org/apache/hadoop/hbase/replication/regionserver/
main/java/org/apache/hado...
Author: stack
Date: Sat Aug 28 00:31:18 2010
New Revision: 990310
URL: http://svn.apache.org/viewvc?rev=990310&view=rev
Log:
Bringing over ReplicationZooKeeperWatcher to use new ZK regime.
I renamed RZW as RZ.
M src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java
M src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Renamed RZW as RZ.
M src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
Javadoc.
M src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
Added getData.
M src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
javadoc.
M src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
M src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
M src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
M src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
Renamed RZW as RZ.
private float ratio;
A src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
D src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
Renamed
Added:
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
Removed:
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
Modified:
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=990310&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Sat Aug 28 00:31:18 2010
@@ -0,0 +1,558 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+
+/**
+ * This class serves as a helper for all things related to zookeeper
+ * in replication.
+ * <p/>
+ * The layout looks something like this under zookeeper.znode.parent
+ * for the master cluster:
+ * <p/>
+ * <pre>
+ * replication/
+ * master {contains a full cluster address}
+ * state {contains true or false}
+ * clusterId {contains a byte}
+ * peers/
+ * 1/ {contains a full cluster address}
+ * 2/
+ * ...
+ * rs/ {lists all RS that replicate}
+ * startcode1/ {lists all peer clusters}
+ * 1/ {lists hlogs to process}
+ * 10.10.1.76%3A53488.123456789 {contains nothing or a position}
+ * 10.10.1.76%3A53488.123456790
+ * ...
+ * 2/
+ * ...
+ * startcode2/
+ * ...
+ * </pre>
+ */
+public class ReplicationZookeeper {
+ private static final Log LOG =
+ LogFactory.getLog(ReplicationZookeeper.class);
+ // Name of znode we use to lock when failover
+ private final static String RS_LOCK_ZNODE = "lock";
+ // Our handle on zookeeper
+ private final ZooKeeperWatcher zookeeper;
+ // Map of addresses of peer clusters with their ZKW
+ private final Map<String, ReplicationZookeeper> peerClusters;
+ // Path to the root replication znode
+ private final String replicationZNode;
+ // Path to the peer clusters znode
+ private final String peersZNode;
+ // Path to the znode that contains all RS that replicates
+ private final String rsZNode;
+ // Path to this region server's name under rsZNode
+ private final String rsServerNameZnode;
+ // Name node if the replicationState znode
+ private final String replicationStateNodeName;
+ // If this RS is part of a master cluster
+ private final boolean replicationMaster;
+ private final Configuration conf;
+ // Is this cluster replicating at the moment?
+ private final AtomicBoolean replicating;
+ // Byte (stored as string here) that identifies this cluster
+ private final String clusterId;
+ // Abortable
+ private final Abortable abortable;
+
+ /**
+ * Constructor used by region servers, connects to the peer cluster right away.
+ *
+ * @param zookeeper
+ * @param conf conf to use
+ * @param replicating atomic boolean to start/stop replication
+ * @param rsName the name of this region server, null if
+ * using RZH only to use the helping methods
+ * @throws IOException
+ * @throws KeeperException
+ */
+ public ReplicationZookeeper(final Server server,
+ final Configuration conf, final AtomicBoolean replicating, String rsName)
+ throws IOException, KeeperException {
+ this.abortable = server;
+ this.zookeeper = server.getZooKeeper();
+ this.conf = conf;
+ String replicationZNodeName =
+ conf.get("zookeeper.znode.replication", "replication");
+ String peersZNodeName =
+ conf.get("zookeeper.znode.replication.peers", "peers");
+ String repMasterZNodeName =
+ conf.get("zookeeper.znode.replication.master", "master");
+ this.replicationStateNodeName =
+ conf.get("zookeeper.znode.replication.state", "state");
+ String clusterIdZNodeName =
+ conf.get("zookeeper.znode.replication.clusterId", "clusterId");
+ String rsZNodeName =
+ conf.get("zookeeper.znode.replication.rs", "rs");
+ String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
+ this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
+ this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+
+ this.peerClusters = new HashMap<String, ReplicationZookeeper>();
+ this.replicationZNode =
+ ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
+ this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
+ this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
+
+ this.replicating = replicating;
+ setReplicating();
+ String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName);
+ byte [] data = ZKUtil.getData(this.zookeeper, znode);
+ String idResult = Bytes.toString(data);
+ this.clusterId = idResult == null?
+ Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult;
+
+ znode = ZKUtil.joinZNode(this.replicationZNode, repMasterZNodeName);
+ data = ZKUtil.getData(this.zookeeper, znode);
+ String address = Bytes.toString(data);
+ this.replicationMaster = thisCluster.equals(address);
+ LOG.info("This cluster (" + thisCluster + ") is a " +
+ (this.replicationMaster ? "master" : "slave") + " for replication" +
+ ", compared with (" + address + ")");
+
+ if (rsName != null) {
+ this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, rsName);
+ // Set a tracker on replicationStateNodeNode
+ ReplicationStatusTracker tracker =
+ new ReplicationStatusTracker(this.zookeeper, getRepStateNode(), server);
+ tracker.start();
+
+ List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+ if (znodes != null) {
+ for (String z : znodes) {
+ connectToPeer(z);
+ }
+ }
+ } else {
+ this.rsServerNameZnode = null;
+ }
+
+ }
+
+ /**
+ * Returns all region servers from given peer
+ *
+ * @param peerClusterId (byte) the cluster to interrogate
+ * @return addresses of all region servers
+ */
+ public List<HServerAddress> getPeersAddresses(String peerClusterId) {
+ if (this.peerClusters.size() == 0) {
+ return new ArrayList<HServerAddress>(0);
+ }
+ ReplicationZookeeper zkw = this.peerClusters.get(peerClusterId);
+ return zkw == null?
+ new ArrayList<HServerAddress>(0):
+ zkw.scanAddressDirectory(this.zookeeper.rsZNode);
+ }
+
+ /**
+ * Scan a directory of address data.
+ * @param znode The parent node
+ * @return The directory contents as HServerAddresses
+ */
+ public List<HServerAddress> scanAddressDirectory(String znode) {
+ List<HServerAddress> list = new ArrayList<HServerAddress>();
+ List<String> nodes = null;
+ try {
+ nodes = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Scanning " + znode, e);
+ }
+ if (nodes == null) {
+ return list;
+ }
+ for (String node : nodes) {
+ String path = ZKUtil.joinZNode(znode, node);
+ list.add(readAddress(path));
+ }
+ return list;
+ }
+
+ private HServerAddress readAddress(String znode) {
+ byte [] data = null;
+ try {
+ data = ZKUtil.getData(this.zookeeper, znode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Getting address", e);
+ }
+ return new HServerAddress(Bytes.toString(data));
+ }
+
+ /**
+ * This method connects this cluster to another one and registers it
+ * in this region server's replication znode
+ * @param peerId id of the peer cluster
+ * @throws KeeperException
+ */
+ private void connectToPeer(String peerId) throws IOException, KeeperException {
+ String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
+ byte [] data = ZKUtil.getData(this.zookeeper, znode);
+ String [] ensemble = Bytes.toString(data).split(":");
+ if (ensemble.length != 3) {
+ throw new IllegalArgumentException("Wrong format of cluster address: " +
+ Bytes.toStringBinary(data));
+ }
+ Configuration otherConf = new Configuration(this.conf);
+ otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
+ otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
+ otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
+ // REENABLE -- FIX!!!!
+ /*
+ ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf,
+ "connection to cluster: " + peerId);
+ zkw.registerListener(new ReplicationStatusWatcher());
+ this.peerClusters.put(peerId, zkw);
+ this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode(
+ this.rsServerNameZnode, peerId));
+ */
+ LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
+ }
+
+ /**
+ * This reads the state znode for replication and sets the atomic boolean
+ */
+ private void setReplicating() {
+ try {
+ byte [] data = ZKUtil.getDataAndWatch(this.zookeeper, getRepStateNode());
+ String value = Bytes.toString(data);
+ if (value == null) LOG.info(getRepStateNode() + " data is null");
+ else {
+ this.replicating.set(Boolean.parseBoolean(value));
+ LOG.info("Replication is now " + (this.replicating.get()?
+ "started" : "stopped"));
+ }
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed getting data on from " + getRepStateNode(), e);
+ }
+ }
+
+ private String getRepStateNode() {
+ return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
+ }
+
+ /**
+ * Add a new log to the list of hlogs in zookeeper
+ * @param filename name of the hlog's znode
+ * @param clusterId name of the cluster's znode
+ */
+ public void addLogToList(String filename, String clusterId) {
+ try {
+ String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
+ znode = ZKUtil.joinZNode(znode, filename);
+ ZKUtil.createAndWatch(this.zookeeper, znode, Bytes.toBytes(""));
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed add log to list", e);
+ }
+ }
+
+ /**
+ * Remove a log from the list of hlogs in zookeeper
+ * @param filename name of the hlog's znode
+ * @param clusterId name of the cluster's znode
+ */
+ public void removeLogFromList(String filename, String clusterId) {
+ try {
+ String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId);
+ znode = ZKUtil.joinZNode(znode, filename);
+ ZKUtil.deleteChildrenRecursively(this.zookeeper, znode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed remove from list", e);
+ }
+ }
+
+ /**
+ * Set the current position of the specified cluster in the current hlog
+ * @param filename filename name of the hlog's znode
+ * @param clusterId clusterId name of the cluster's znode
+ * @param position the position in the file
+ * @throws IOException
+ */
+ public void writeReplicationStatus(String filename, String clusterId,
+ long position) {
+ try {
+ String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
+ znode = ZKUtil.joinZNode(znode, filename);
+ // Why serialize String of Long and note Long as bytes?
+ ZKUtil.createAndWatch(this.zookeeper, znode,
+ Bytes.toBytes(Long.toString(position)));
+ } catch (KeeperException e) {
+ this.abortable.abort("Writing replication status", e);
+ }
+ }
+
+ /**
+ * Get a list of all the other region servers in this cluster
+ * and set a watch
+ * @param watch the watch to set
+ * @return a list of server nanes
+ */
+ public List<String> getRegisteredRegionServers(Watcher watch) {
+ List<String> result = null;
+ try {
+ // TODO: This is rsZNode from zk which is like getListOfReplicators
+ // but maybe these are from different zk instances?
+ result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Get list of registered region servers", e);
+ }
+ return result;
+ }
+
+ /**
+ * Get the list of the replicators that have queues, they can be alive, dead
+ * or simply from a previous run
+ * @param watch the watche to set
+ * @return a list of server names
+ */
+ public List<String> getListOfReplicators() {
+ List<String> result = null;
+ try {
+ result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Get list of replicators", e);
+ }
+ return result;
+ }
+
+ /**
+ * Get the list of peer clusters for the specified server names
+ * @param rs server names of the rs
+ * @param watch the watch to set
+ * @return a list of peer cluster
+ */
+ public List<String> getListPeersForRS(String rs) {
+ String znode = ZKUtil.joinZNode(rsZNode, rs);
+ List<String> result = null;
+ try {
+ result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Get list of peers for rs", e);
+ }
+ return result;
+ }
+
+ /**
+ * Get the list of hlogs for the specified region server and peer cluster
+ * @param rs server names of the rs
+ * @param id peer cluster
+ * @param watch the watch to set
+ * @return a list of hlogs
+ */
+ public List<String> getListHLogsForPeerForRS(String rs, String id) {
+ String znode = ZKUtil.joinZNode(rsZNode, rs);
+ znode = ZKUtil.joinZNode(znode, id);
+ List<String> result = null;
+ try {
+ result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Get list of hlogs for peer", e);
+ }
+ return result;
+ }
+
+ /**
+ * Try to set a lock in another server's znode.
+ * @param znode the server names of the other server
+ * @return true if the lock was acquired, false in every other cases
+ */
+ public boolean lockOtherRS(String znode) {
+ try {
+ String parent = ZKUtil.joinZNode(this.rsZNode, znode);
+ String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
+ ZKUtil.createAndWatch(this.zookeeper, p, Bytes.toBytes(rsServerNameZnode));
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed lock other rs", e);
+ }
+ return true;
+ }
+
+ /**
+ * This methods copies all the hlogs queues from another region server
+ * and returns them all sorted per peer cluster (appended with the dead
+ * server's znode)
+ * @param znode server names to copy
+ * @return all hlogs for all peers of that cluster, null if an error occurred
+ */
+ public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
+ // TODO this method isn't atomic enough, we could start copying and then
+ // TODO fail for some reason and we would end up with znodes we don't want.
+ SortedMap<String,SortedSet<String>> queues =
+ new TreeMap<String,SortedSet<String>>();
+ try {
+ String nodePath = ZKUtil.joinZNode(rsZNode, znode);
+ List<String> clusters =
+ ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
+ // We have a lock znode in there, it will count as one.
+ if (clusters == null || clusters.size() <= 1) {
+ return queues;
+ }
+ // The lock isn't a peer cluster, remove it
+ clusters.remove(RS_LOCK_ZNODE);
+ for (String cluster : clusters) {
+ // We add the name of the recovered RS to the new znode, we can even
+ // do that for queues that were recovered 10 times giving a znode like
+ // number-startcode-number-otherstartcode-number-anotherstartcode-etc
+ String newCluster = cluster+"-"+znode;
+ String newClusterZnode = ZKUtil.joinZNode(rsServerNameZnode, newCluster);
+ ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
+ HConstants.EMPTY_BYTE_ARRAY);
+ String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
+ List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
+ // That region server didn't have anything to replicate for this cluster
+ if (hlogs == null || hlogs.size() == 0) {
+ continue;
+ }
+ SortedSet<String> logQueue = new TreeSet<String>();
+ queues.put(newCluster, logQueue);
+ for (String hlog : hlogs) {
+ String z = ZKUtil.joinZNode(clusterPath, hlog);
+ byte [] position = ZKUtil.getData(this.zookeeper, z);
+ LOG.debug("Creating " + hlog + " with data " + Bytes.toString(position));
+ String child = ZKUtil.joinZNode(newClusterZnode, hlog);
+ ZKUtil.createAndWatch(this.zookeeper, child, position);
+ logQueue.add(hlog);
+ }
+ }
+ } catch (KeeperException e) {
+ this.abortable.abort("Copy queues from rs", e);
+ }
+ return queues;
+ }
+
+ /**
+ * Delete a complete queue of hlogs
+ * @param peerZnode znode of the peer cluster queue of hlogs to delete
+ */
+ public void deleteSource(String peerZnode) {
+ try {
+ ZKUtil.deleteChildrenRecursively(this.zookeeper,
+ ZKUtil.joinZNode(rsServerNameZnode, peerZnode));
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed delete of " + peerZnode, e);
+ }
+ }
+
+ /**
+ * Recursive deletion of all znodes in specified rs' znode
+ * @param znode
+ */
+ public void deleteRsQueues(String znode) {
+ try {
+ ZKUtil.deleteChildrenRecursively(this.zookeeper,
+ ZKUtil.joinZNode(rsZNode, znode));
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed delete of " + znode, e);
+ }
+ }
+
+ /**
+ * Delete this cluster's queues
+ */
+ public void deleteOwnRSZNode() {
+ deleteRsQueues(this.rsServerNameZnode);
+ }
+
+ /**
+ * Get the position of the specified hlog in the specified peer znode
+ * @param peerId znode of the peer cluster
+ * @param hlog name of the hlog
+ * @return the position in that hlog
+ * @throws KeeperException
+ */
+ public long getHLogRepPosition(String peerId, String hlog)
+ throws KeeperException {
+ String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId);
+ String znode = ZKUtil.joinZNode(clusterZnode, hlog);
+ String data = Bytes.toString(ZKUtil.getData(this.zookeeper, znode));
+ return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
+ }
+
+ /**
+ * Tells if this cluster replicates or not
+ *
+ * @return if this is a master
+ */
+ public boolean isReplicationMaster() {
+ return this.replicationMaster;
+ }
+
+ /**
+ * Get the identification of the cluster
+ *
+ * @return the id for the cluster
+ */
+ public String getClusterId() {
+ return this.clusterId;
+ }
+
+ /**
+ * Get a map of all peer clusters
+ * @return map of peer cluster, zk address to ZKW
+ */
+ public Map<String, ReplicationZookeeper> getPeerClusters() {
+ return this.peerClusters;
+ }
+
+ /**
+ * Tracker for status of the replication
+ */
+ public class ReplicationStatusTracker extends ZooKeeperNodeTracker {
+ public ReplicationStatusTracker(ZooKeeperWatcher watcher, String node,
+ Abortable abortable) {
+ super(watcher, node, abortable);
+ }
+
+ @Override
+ public synchronized void nodeDataChanged(String path) {
+ super.nodeDataChanged(path);
+ setReplicating();
+ }
+ }
+}
\ No newline at end of file
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Sat Aug 28 00:31:18 2010
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LogCleanerDelegate;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
// REENALBE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -45,7 +45,7 @@ public class ReplicationLogCleaner imple
private static final Log LOG =
LogFactory.getLog(ReplicationLogCleaner.class);
private Configuration conf;
- private ReplicationZookeeperWrapper zkHelper;
+ private ReplicationZookeeper zkHelper;
private Set<String> hlogs = new HashSet<String>();
/**
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Sat Aug 28 00:31:18 2010
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -47,7 +47,7 @@ public class Replication implements WALO
private final ReplicationSourceManager replicationManager;
private boolean replicationMaster;
private final AtomicBoolean replicating = new AtomicBoolean(true);
- private final ReplicationZookeeperWrapper zkHelper;
+ private final ReplicationZookeeper zkHelper;
private final Configuration conf;
private ReplicationSink replicationSink;
// Hosting server
@@ -68,7 +68,7 @@ public class Replication implements WALO
this.conf = this.server.getConfiguration();
this.replication = isReplication(this.conf);
if (replication) {
- this.zkHelper = new ReplicationZookeeperWrapper(server.getZooKeeper(),
+ this.zkHelper = new ReplicationZookeeper(server.getZooKeeper(),
this.conf, this.replicating, this.server.getServerName());
this.replicationMaster = zkHelper.isReplicationMaster();
this.replicationManager = this.replicationMaster ?
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Sat Aug 28 00:31:18 2010
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.ipc.HRegi
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@@ -76,7 +76,7 @@ public class ReplicationSource extends T
private HLog.Entry[] entriesArray;
private HConnection conn;
// Helper class for zookeeper
- private ReplicationZookeeperWrapper zkHelper;
+ private ReplicationZookeeper zkHelper;
private Configuration conf;
// ratio of region servers to chose from a slave cluster
private float ratio;
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Sat Aug 28 00:31:18 2010
@@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.Stoppable
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -64,7 +64,7 @@ public class ReplicationSourceManager {
// Indicates if we are currently replicating
private final AtomicBoolean replicating;
// Helper for zookeeper
- private final ReplicationZookeeperWrapper zkHelper;
+ private final ReplicationZookeeper zkHelper;
// All about stopping
private final Stoppable stopper;
// All logs we are currently trackign
@@ -91,7 +91,7 @@ public class ReplicationSourceManager {
* @param logDir the directory that contains all hlog directories of live RSs
* @param oldLogDir the directory where old logs are archived
*/
- public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper,
+ public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
final Configuration conf,
final Stoppable stopper,
final FileSystem fs,
@@ -231,7 +231,7 @@ public class ReplicationSourceManager {
* Get the ZK help of this manager
* @return the helper
*/
- public ReplicationZookeeperWrapper getRepZkWrapper() {
+ public ReplicationZookeeper getRepZkWrapper() {
return zkHelper;
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java Sat Aug 28 00:31:18 2010
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.HBaseConfiguration;
/**
- * Tool for reading ZooKeeper servers from HBase XML configuation and producing
+ * Tool for reading ZooKeeper servers from HBase XML configuration and producing
* a line-by-line list for use by bash scripts.
*/
public class ZKServerTool {
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Sat Aug 28 00:31:18 2010
@@ -443,6 +443,31 @@ public class ZKUtil {
//
/**
+ * Get znode data. Does not set a watcher.
+ * @return ZNode data
+ */
+ public static byte [] getData(ZooKeeperWatcher zkw, String znode)
+ throws KeeperException {
+ try {
+ byte [] data = zkw.getZooKeeper().getData(znode, null, null);
+ zkw.debug("Retrieved " + data.length + " bytes of data from znode " + znode);
+ return data;
+ } catch (KeeperException.NoNodeException e) {
+ zkw.debug("Unable to get data of znode " + znode + " " +
+ "because node does not exist (not an error)");
+ return null;
+ } catch (KeeperException e) {
+ zkw.warn("Unable to get data of znode " + znode, e);
+ zkw.keeperException(e);
+ return null;
+ } catch (InterruptedException e) {
+ zkw.warn("Unable to get data of znode " + znode, e);
+ zkw.interruptedException(e);
+ return null;
+ }
+ }
+
+ /**
* Get the data at the specified znode and set a watch.
*
* Returns the data and sets a watch if the node exists. Returns null and no
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Sat Aug 28 00:31:18 2010
@@ -38,11 +38,11 @@ import org.apache.zookeeper.ZooKeeper;
* Acts as the single ZooKeeper Watcher. One instance of this is instantiated
* for each Master, RegionServer, and client process.
*
- * This is the only class that implements {@link Watcher}. Other internal
+ * <p>This is the only class that implements {@link Watcher}. Other internal
* classes which need to be notified of ZooKeeper events must register with
* the local instance of this watcher via {@link #registerListener}.
*
- * This class also holds and manages the connection to ZooKeeper. Code to deal
+ * <p>This class also holds and manages the connection to ZooKeeper. Code to deal
* with connection related events and exceptions are handled here.
*/
public class ZooKeeperWatcher implements Watcher {
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java Sat Aug 28 00:31:18 2010
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -41,7 +41,7 @@ public class TestLogsCleaner {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private ReplicationZookeeperWrapper zkHelper;
+ private ReplicationZookeeper zkHelper;
/**
* @throws java.lang.Exception
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=990310&r1=990309&r2=990310&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Sat Aug 28 00:31:18 2010
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
// REENABLE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.junit.After;