You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/27 07:01:07 UTC
svn commit: r990018 [5/10] - in /hbase/branches/0.90_master_rewrite: ./ bin/
bin/replication/ src/assembly/ src/docbkx/
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/filter/ s...
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,494 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class serves as a helper for all things related to zookeeper
+ * in replication.
+ * <p/>
+ * The layout looks something like this under zookeeper.znode.parent
+ * for the master cluster:
+ * <p/>
+ * <pre>
+ * replication/
+ * master {contains a full cluster address}
+ * state {contains true or false}
+ * clusterId {contains a byte}
+ * peers/
+ * 1/ {contains a full cluster address}
+ * 2/
+ * ...
+ * rs/ {lists all RS that replicate}
+ * startcode1/ {lists all peer clusters}
+ * 1/ {lists hlogs to process}
+ * 10.10.1.76%3A53488.123456789 {contains nothing or a position}
+ * 10.10.1.76%3A53488.123456790
+ * ...
+ * 2/
+ * ...
+ * startcode2/
+ * ...
+ * </pre>
+ */
+public class ReplicationZookeeperWrapper {
+ // REENABLE
+
+// private static final Log LOG =
+// LogFactory.getLog(ReplicationZookeeperWrapper.class);
+// // Name of znode we use to lock when failover
+// private final static String RS_LOCK_ZNODE = "lock";
+// /*
+// // Our handle on zookeeper
+// private final ZooKeeperWrapper zookeeperWrapper;
+// // Map of addresses of peer clusters with their ZKW
+// private final Map<String, ZooKeeperWrapper> peerClusters;*/
+// // Path to the root replication znode
+// private final String replicationZNode;
+// // Path to the peer clusters znode
+// private final String peersZNode;
+// // Path to the znode that contains all RS that replicates
+// private final String rsZNode;
+// // Path to this region server's name under rsZNode
+// private final String rsServerNameZnode;
+// // Name node if the replicationState znode
+// private final String replicationStateNodeName;
+// // If this RS is part of a master cluster
+// private final boolean replicationMaster;
+// private final Configuration conf;
+// // Is this cluster replicating at the moment?
+// private final AtomicBoolean replicating;
+// // Byte (stored as string here) that identifies this cluster
+// private final String clusterId;
+//
+// /**
+// * Constructor used by region servers, connects to the peer cluster right away.
+// *
+// * @param zookeeperWrapper zkw to wrap
+// * @param conf conf to use
+// * @param replicating atomic boolean to start/stop replication
+// * @param rsName the name of this region server, null if
+// * using RZH only to use the helping methods
+// * @throws IOException
+// */
+// public ReplicationZookeeperWrapper(
+// ZooKeeperWrapper zookeeperWrapper, Configuration conf,
+// final AtomicBoolean replicating, String rsName) throws IOException {
+// this.zookeeperWrapper = zookeeperWrapper;
+// this.conf = conf;
+// String replicationZNodeName =
+// conf.get("zookeeper.znode.replication", "replication");
+// String peersZNodeName =
+// conf.get("zookeeper.znode.replication.peers", "peers");
+// String repMasterZNodeName =
+// conf.get("zookeeper.znode.replication.master", "master");
+// this.replicationStateNodeName =
+// conf.get("zookeeper.znode.replication.state", "state");
+// String clusterIdZNodeName =
+// conf.get("zookeeper.znode.replication.clusterId", "clusterId");
+// String rsZNodeName =
+// conf.get("zookeeper.znode.replication.rs", "rs");
+// String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
+// this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
+// this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+//
+// this.peerClusters = new HashMap<String, ZooKeeperWrapper>();
+// this.replicationZNode = zookeeperWrapper.getZNode(
+// zookeeperWrapper.getParentZNode(), replicationZNodeName);
+// this.peersZNode =
+// zookeeperWrapper.getZNode(replicationZNode, peersZNodeName);
+// this.rsZNode =
+// zookeeperWrapper.getZNode(replicationZNode, rsZNodeName);
+//
+// this.replicating = replicating;
+// setReplicating();
+// String idResult = Bytes.toString(
+// this.zookeeperWrapper.getData(this.replicationZNode,
+// clusterIdZNodeName));
+// this.clusterId =
+// idResult == null ?
+// Byte.toString(HConstants.DEFAULT_CLUSTER_ID) : idResult;
+// String address = Bytes.toString(
+// this.zookeeperWrapper.getData(this.replicationZNode,
+// repMasterZNodeName));
+// this.replicationMaster = thisCluster.equals(address);
+// LOG.info("This cluster (" + thisCluster + ") is a "
+// + (this.replicationMaster ? "master" : "slave") + " for replication" +
+// ", compared with (" + address + ")");
+// if (rsName != null) {
+// this.rsServerNameZnode =
+// this.zookeeperWrapper.getZNode(rsZNode, rsName);
+// List<String> znodes = this.zookeeperWrapper.listZnodes(this.peersZNode,
+// new ReplicationStatusWatcher());
+// if (znodes != null) {
+// for (String znode : znodes) {
+// connectToPeer(znode);
+// }
+// }
+// } else {
+// this.rsServerNameZnode = null;
+// }
+//
+// }
+//
+// /**
+// * Returns all region servers from given peer
+// *
+// * @param peerClusterId (byte) the cluster to interrogate
+// * @return addresses of all region servers
+// */
+// public List<HServerAddress> getPeersAddresses(String peerClusterId) {
+// if (this.peerClusters.size() == 0) {
+// return new ArrayList<HServerAddress>(0);
+// }
+// ZooKeeperWrapper zkw = this.peerClusters.get(peerClusterId);
+// return zkw == null?
+// new ArrayList<HServerAddress>(0) : zkw.scanRSDirectory();
+// }
+//
+// /**
+// * This method connects this cluster to another one and registers it
+// * in this region server's replication znode
+// * @param peerId id of the peer cluster
+// */
+// private void connectToPeer(String peerId) throws IOException {
+// String[] ensemble =
+// Bytes.toString(this.zookeeperWrapper.getData(this.peersZNode, peerId)).
+// split(":");
+// if (ensemble.length != 3) {
+// throw new IllegalArgumentException("Wrong format of cluster address: " +
+// this.zookeeperWrapper.getData(this.peersZNode, peerId));
+// }
+// Configuration otherConf = new Configuration(this.conf);
+// otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
+// otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
+// otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
+// ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf,
+// "connection to cluster: " + peerId);
+// zkw.registerListener(new ReplicationStatusWatcher());
+// this.peerClusters.put(peerId, zkw);
+// this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode(
+// this.rsServerNameZnode, peerId));
+// LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
+// }
+//
+// /**
+// * This reads the state znode for replication and sets the atomic boolean
+// */
+// private void setReplicating() {
+// String value = Bytes.toString(this.zookeeperWrapper.getDataAndWatch(
+// this.replicationZNode, this.replicationStateNodeName,
+// new ReplicationStatusWatcher()));
+// if (value != null) {
+// this.replicating.set(value.equals("true"));
+// LOG.info("Replication is now " + (this.replicating.get() ?
+// "started" : "stopped"));
+// }
+// }
+//
+// /**
+// * Add a new log to the list of hlogs in zookeeper
+// * @param filename name of the hlog's znode
+// * @param clusterId name of the cluster's znode
+// */
+// public void addLogToList(String filename, String clusterId) {
+// try {
+// this.zookeeperWrapper.writeZNode(
+// this.zookeeperWrapper.getZNode(
+// this.rsServerNameZnode, clusterId), filename, "");
+// } catch (InterruptedException e) {
+// LOG.error(e);
+// } catch (KeeperException e) {
+// LOG.error(e);
+// }
+// }
+//
+// /**
+// * Remove a log from the list of hlogs in zookeeper
+// * @param filename name of the hlog's znode
+// * @param clusterId name of the cluster's znode
+// */
+// public void removeLogFromList(String filename, String clusterId) {
+// try {
+// this.zookeeperWrapper.deleteZNode(
+// this.zookeeperWrapper.getZNode(this.rsServerNameZnode,
+// this.zookeeperWrapper.getZNode(clusterId, filename)));
+// } catch (InterruptedException e) {
+// LOG.error(e);
+// } catch (KeeperException e) {
+// LOG.error(e);
+// }
+// }
+//
+// /**
+// * Set the current position of the specified cluster in the current hlog
+// * @param filename filename name of the hlog's znode
+// * @param clusterId clusterId name of the cluster's znode
+// * @param position the position in the file
+// * @throws IOException
+// */
+// public void writeReplicationStatus(String filename, String clusterId,
+// long position) {
+// try {
+// String clusterZNode = this.zookeeperWrapper.getZNode(
+// this.rsServerNameZnode, clusterId);
+// this.zookeeperWrapper.writeZNode(clusterZNode, filename,
+// Long.toString(position));
+// } catch (InterruptedException e) {
+// LOG.error(e);
+// } catch (KeeperException e) {
+// LOG.error(e);
+// }
+// }
+//
+// /**
+// * Get a list of all the other region servers in this cluster
+// * and set a watch
+// * @param watch the watch to set
+// * @return a list of server nanes
+// */
+// public List<String> getRegisteredRegionServers(Watcher watch) {
+// return this.zookeeperWrapper.listZnodes(
+// this.zookeeperWrapper.getRsZNode(), watch);
+// }
+//
+// /**
+// * Get the list of the replicators that have queues, they can be alive, dead
+// * or simply from a previous run
+// * @param watch the watche to set
+// * @return a list of server names
+// */
+// public List<String> getListOfReplicators(Watcher watch) {
+// return this.zookeeperWrapper.listZnodes(rsZNode, watch);
+// }
+//
+// /**
+// * Get the list of peer clusters for the specified server names
+// * @param rs server names of the rs
+// * @param watch the watch to set
+// * @return a list of peer cluster
+// */
+// public List<String> getListPeersForRS(String rs, Watcher watch) {
+// return this.zookeeperWrapper.listZnodes(
+// zookeeperWrapper.getZNode(rsZNode, rs), watch);
+// }
+//
+// /**
+// * Get the list of hlogs for the specified region server and peer cluster
+// * @param rs server names of the rs
+// * @param id peer cluster
+// * @param watch the watch to set
+// * @return a list of hlogs
+// */
+// public List<String> getListHLogsForPeerForRS(String rs, String id, Watcher watch) {
+// return this.zookeeperWrapper.listZnodes(
+// zookeeperWrapper.getZNode(zookeeperWrapper.getZNode(rsZNode, rs), id), watch);
+// }
+//
+// /**
+// * Try to set a lock in another server's znode.
+// * @param znode the server names of the other server
+// * @return true if the lock was acquired, false in every other cases
+// */
+// public boolean lockOtherRS(String znode) {
+// try {
+// this.zookeeperWrapper.writeZNode(
+// this.zookeeperWrapper.getZNode(this.rsZNode, znode),
+// RS_LOCK_ZNODE, rsServerNameZnode, true);
+//
+// } catch (InterruptedException e) {
+// LOG.error(e);
+// return false;
+// } catch (KeeperException e) {
+// LOG.debug("Won't lock " + znode + " because " + e.getMessage());
+// // TODO see if the other still exists!!
+// return false;
+// }
+// return true;
+// }
+//
+// /**
+// * This methods copies all the hlogs queues from another region server
+// * and returns them all sorted per peer cluster (appended with the dead
+// * server's znode)
+// * @param znode server names to copy
+// * @return all hlogs for all peers of that cluster, null if an error occurred
+// */
+// public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
+// // TODO this method isn't atomic enough, we could start copying and then
+// // TODO fail for some reason and we would end up with znodes we don't want.
+// SortedMap<String,SortedSet<String>> queues =
+// new TreeMap<String,SortedSet<String>>();
+// try {
+// String nodePath = this.zookeeperWrapper.getZNode(rsZNode, znode);
+// List<String> clusters = this.zookeeperWrapper.listZnodes(nodePath, null);
+// // We have a lock znode in there, it will count as one.
+// if (clusters == null || clusters.size() <= 1) {
+// return queues;
+// }
+// // The lock isn't a peer cluster, remove it
+// clusters.remove(RS_LOCK_ZNODE);
+// for (String cluster : clusters) {
+// // We add the name of the recovered RS to the new znode, we can even
+// // do that for queues that were recovered 10 times giving a znode like
+// // number-startcode-number-otherstartcode-number-anotherstartcode-etc
+// String newCluster = cluster+"-"+znode;
+// String newClusterZnode =
+// this.zookeeperWrapper.getZNode(rsServerNameZnode, newCluster);
+// this.zookeeperWrapper.ensureExists(newClusterZnode);
+// String clusterPath = this.zookeeperWrapper.getZNode(nodePath, cluster);
+// List<String> hlogs = this.zookeeperWrapper.listZnodes(clusterPath, null);
+// // That region server didn't have anything to replicate for this cluster
+// if (hlogs == null || hlogs.size() == 0) {
+// continue;
+// }
+// SortedSet<String> logQueue = new TreeSet<String>();
+// queues.put(newCluster, logQueue);
+// for (String hlog : hlogs) {
+// String position = Bytes.toString(
+// this.zookeeperWrapper.getData(clusterPath, hlog));
+// LOG.debug("Creating " + hlog + " with data " + position);
+// this.zookeeperWrapper.writeZNode(newClusterZnode, hlog, position);
+// logQueue.add(hlog);
+// }
+// }
+// } catch (InterruptedException e) {
+// LOG.warn(e);
+// return null;
+// } catch (KeeperException e) {
+// LOG.warn(e);
+// return null;
+// }
+// return queues;
+// }
+//
+// /**
+// * Delete a complete queue of hlogs
+// * @param peerZnode znode of the peer cluster queue of hlogs to delete
+// */
+// public void deleteSource(String peerZnode) {
+// try {
+// this.zookeeperWrapper.deleteZNode(
+// this.zookeeperWrapper.getZNode(rsServerNameZnode, peerZnode), true);
+// } catch (InterruptedException e) {
+// LOG.error(e);
+// } catch (KeeperException e) {
+// LOG.error(e);
+// }
+// }
+//
+// /**
+// * Recursive deletion of all znodes in specified rs' znode
+// * @param znode
+// */
+// public void deleteRsQueues(String znode) {
+// try {
+// this.zookeeperWrapper.deleteZNode(
+// this.zookeeperWrapper.getZNode(rsZNode, znode), true);
+// } catch (InterruptedException e) {
+// LOG.error(e);
+// } catch (KeeperException e) {
+// LOG.error(e);
+// }
+// }
+//
+// /**
+// * Delete this cluster's queues
+// */
+// public void deleteOwnRSZNode() {
+// deleteRsQueues(this.rsServerNameZnode);
+// }
+//
+// /**
+// * Get the position of the specified hlog in the specified peer znode
+// * @param peerId znode of the peer cluster
+// * @param hlog name of the hlog
+// * @return the position in that hlog
+// */
+// public long getHLogRepPosition(String peerId, String hlog) {
+// String clusterZnode =
+// this.zookeeperWrapper.getZNode(rsServerNameZnode, peerId);
+// String data = Bytes.toString(
+// this.zookeeperWrapper.getData(clusterZnode, hlog));
+// return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
+// }
+//
+// /**
+// * Tells if this cluster replicates or not
+// *
+// * @return if this is a master
+// */
+// public boolean isReplicationMaster() {
+// return this.replicationMaster;
+// }
+//
+// /**
+// * Get the identification of the cluster
+// *
+// * @return the id for the cluster
+// */
+// public String getClusterId() {
+// return this.clusterId;
+// }
+//
+// /**
+// * Get a map of all peer clusters
+// * @return map of peer cluster, zk address to ZKW
+// */
+// public Map<String, ZooKeeperWrapper> getPeerClusters() {
+// return this.peerClusters;
+// }
+//
+// /**
+// * Watcher for the status of the replication
+// */
+// public class ReplicationStatusWatcher implements Watcher {
+// @Override
+// public void process(WatchedEvent watchedEvent) {
+// Event.EventType type = watchedEvent.getType();
+// LOG.info("Got event " + type + " with path " + watchedEvent.getPath());
+// if (type.equals(Event.EventType.NodeDataChanged)) {
+// setReplicating();
+// }
+// }
+// }
+
+}
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.LogCleanerDelegate;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+// REENALBE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Implementation of a log cleaner that checks if a log is still scheduled for
+ * replication before deleting it when its TTL is over.
+ */
+public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
+
+ private static final Log LOG =
+ LogFactory.getLog(ReplicationLogCleaner.class);
+ private Configuration conf;
+ private ReplicationZookeeperWrapper zkHelper;
+ private Set<String> hlogs = new HashSet<String>();
+
+ /**
+ * Instantiates the cleaner, does nothing more.
+ */
+ public ReplicationLogCleaner() {}
+
+ @Override
+ public boolean isLogDeletable(Path filePath) {
+ String log = filePath.getName();
+ // If we saw the hlog previously, let's consider it's still used
+ // At some point in the future we will refresh the list and it will be gone
+ if (this.hlogs.contains(log)) {
+ return false;
+ }
+
+ // Let's see it's still there
+ // This solution makes every miss very expensive to process since we
+ // almost completely refresh the cache each time
+ return !refreshHLogsAndSearch(log);
+ }
+
+ /**
+ * Search through all the hlogs we have in ZK to refresh the cache
+ * If a log is specified and found, then we early out and return true
+ * @param searchedLog log we are searching for, pass null to cache everything
+ * that's in zookeeper.
+ * @return false until a specified log is found.
+ */
+ private boolean refreshHLogsAndSearch(String searchedLog) {
+ this.hlogs.clear();
+ final boolean lookForLog = searchedLog != null;
+// REENALBE
+// List<String> rss = zkHelper.getListOfReplicators(this);
+// if (rss == null) {
+// LOG.debug("Didn't find any region server that replicates, deleting: " +
+// searchedLog);
+// return false;
+// }
+// for (String rs: rss) {
+// List<String> listOfPeers = zkHelper.getListPeersForRS(rs, this);
+// // if rs just died, this will be null
+// if (listOfPeers == null) {
+// continue;
+// }
+// for (String id : listOfPeers) {
+// List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id, this);
+// if (peersHlogs != null) {
+// this.hlogs.addAll(peersHlogs);
+// }
+// // early exit if we found the log
+// if(lookForLog && this.hlogs.contains(searchedLog)) {
+// LOG.debug("Found log in ZK, keeping: " + searchedLog);
+// return true;
+// }
+// }
+// }
+ LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
+ return false;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+// try {
+ // REENABLE
+// this.zkHelper = new ReplicationZookeeperWrapper(
+// ZooKeeperWrapper.createInstance(this.conf,
+// HMaster.class.getName()),
+// this.conf, new AtomicBoolean(true), null);
+// } catch (IOException e) {
+// LOG.error(e);
+// }
+ refreshHLogsAndSearch(null);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void process(WatchedEvent watchedEvent) {}
+}
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.LogEntryVisitor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.util.Bytes;
+// REENABLE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+
+import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Replication serves as an umbrella over the setup of replication and
+ * is used by HRS.
+ */
+public class Replication implements LogEntryVisitor {
+
+ private final boolean replication;
+// REENALBE private final ReplicationSourceManager replicationManager;
+ private boolean replicationMaster;
+ private final AtomicBoolean replicating = new AtomicBoolean(true);
+// REENALBE private final ReplicationZookeeperWrapper zkHelper;
+ private final Configuration conf;
+ private ReplicationSink replicationSink;
+ private final Server server;
+
+ /**
+ * Instantiate the replication management (if rep is enabled).
+ * @param conf conf to use
+ * @param hsi the info if this region server
+ * @param fs handle to the filesystem
+ * @param oldLogDir directory where logs are archived
+ * @param stopper This is set when we are to shut down.
+ * @throws IOException
+ */
+ public Replication(final Server server, FileSystem fs, Path logDir,
+ Path oldLogDir) throws IOException {
+ this.server = server;
+ this.conf = this.server.getConfiguration();
+ this.replication =
+ conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
+ if (replication) {
+ // REENALBE
+// this.zkHelper = new ReplicationZookeeperWrapper(
+// ZooKeeperWrapper.getInstance(conf, hsi.getServerName()), conf,
+// this.replicating, hsi.getServerName());
+// this.replicationMaster = zkHelper.isReplicationMaster();
+// this.replicationManager = this.replicationMaster ?
+// new ReplicationSourceManager(zkHelper, conf, stopRequested,
+// fs, this.replicating, logDir, oldLogDir) : null;
+ } else {
+ // REENABLE replicationManager = null;
+ // REENALBE zkHelper = null;
+ }
+ }
+
+ /**
+ * Join with the replication threads
+ */
+ public void join() {
+ if (this.replication) {
+ if (this.replicationMaster) {
+// REENABLE this.replicationManager.join();
+ }
+// REENABLE this.zkHelper.deleteOwnRSZNode();
+ }
+ }
+
+ /**
+ * Carry on the list of log entries down to the sink
+ * @param entries list of entries to replicate
+ * @throws IOException
+ */
+ public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+ if (this.replication && !this.replicationMaster) {
+ this.replicationSink.replicateEntries(entries);
+ }
+ }
+
+ /**
+ * If replication is enabled and this cluster is a master,
+ * it starts
+ * @throws IOException
+ */
+ public void startReplicationServices() throws IOException {
+ if (this.replication) {
+ if (this.replicationMaster) {
+ // REENALBE this.replicationManager.init();
+ } else {
+ this.replicationSink =
+ new ReplicationSink(this.conf, this.server);
+ }
+ }
+ }
+
+ /**
+ * Get the replication sources manager
+ * @return the manager if replication is enabled, else returns false
+ */
+ public ReplicationSourceManager getReplicationManager() {
+ return null; // REENALBE replicationManager;
+ }
+
+ @Override
+ public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+ WALEdit logEdit) {
+ NavigableMap<byte[], Integer> scopes =
+ new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ byte[] family;
+ for (KeyValue kv : logEdit.getKeyValues()) {
+ family = kv.getFamily();
+ int scope = info.getTableDesc().getFamily(family).getScope();
+ if (scope != HConstants.REPLICATION_SCOPE_LOCAL &&
+ !scopes.containsKey(family)) {
+ scopes.put(family, scope);
+ }
+ }
+ if (!scopes.isEmpty()) {
+ logEdit.setScopes(scopes);
+ }
+ }
+
+ /**
+ * Add this class as a log entry visitor for HLog if replication is enabled
+ * @param hlog log that was add ourselves on
+ */
+ public void addLogEntryVisitor(HLog hlog) {
+ if (replication) {
+ hlog.addLogEntryVisitor(this);
+ }
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.Stoppable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class is responsible for replicating the edits coming
+ * from another cluster.
+ * <p/>
+ * This replication process is currently waiting for the edits to be applied
+ * before the method can return. This means that the replication of edits
+ * is synchronized (after reading from HLogs in ReplicationSource) and that a
+ * single region server cannot receive edits from two sources at the same time
+ * <p/>
+ * This class uses the native HBase client in order to replicate entries.
+ * <p/>
+ *
+ * TODO make this class more like ReplicationSource wrt log handling
+ */
+public class ReplicationSink {
+
+ private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
+ // Name of the HDFS directory that contains the temporary rep logs
+ public static final String REPLICATION_LOG_DIR = ".replogs";
+ private final Configuration conf;
+ // Pool used to replicated
+ private final HTablePool pool;
+ // Chain to pull on when we want all to stop.
+ private final Stoppable stopper;
+ private final ReplicationSinkMetrics metrics;
+
+ /**
+ * Create a sink for replication
+ *
+ * @param conf conf object
+ * @param stopper boolean to tell this thread to stop
+ * @throws IOException thrown when HDFS goes bad or bad file name
+ */
+ public ReplicationSink(Configuration conf, Stoppable stopper)
+ throws IOException {
+ this.conf = conf;
+ this.pool = new HTablePool(this.conf,
+ conf.getInt("replication.sink.htablepool.capacity", 10));
+ this.stopper = stopper;
+ this.metrics = new ReplicationSinkMetrics();
+ }
+
+ /**
+ * Replicate this array of entries directly into the local cluster
+ * using the native client.
+ *
+ * @param entries
+ * @throws IOException
+ */
+ public synchronized void replicateEntries(HLog.Entry[] entries)
+ throws IOException {
+ if (entries.length == 0) {
+ return;
+ }
+ // Very simple optimization where we batch sequences of rows going
+ // to the same table.
+ try {
+ long totalReplicated = 0;
+ byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY;
+ List<Put> puts = new ArrayList<Put>();
+ for (HLog.Entry entry : entries) {
+ WALEdit edit = entry.getEdit();
+ List<KeyValue> kvs = edit.getKeyValues();
+ if (kvs.get(0).isDelete()) {
+ Delete delete = new Delete(kvs.get(0).getRow(),
+ kvs.get(0).getTimestamp(), null);
+ for (KeyValue kv : kvs) {
+ if (kv.isDeleteFamily()) {
+ delete.deleteFamily(kv.getFamily());
+ } else if (!kv.isEmptyColumn()) {
+ delete.deleteColumn(kv.getFamily(),
+ kv.getQualifier());
+ }
+ }
+ delete(entry.getKey().getTablename(), delete);
+ } else {
+ // Switching table, flush
+ if (!Bytes.equals(lastTable, entry.getKey().getTablename())) {
+ put(lastTable, puts);
+ }
+ // With mini-batching, we need to expect multiple rows per edit
+ byte[] lastKey = kvs.get(0).getRow();
+ Put put = new Put(kvs.get(0).getRow(),
+ kvs.get(0).getTimestamp());
+ for (KeyValue kv : kvs) {
+ if (!Bytes.equals(lastKey, kv.getRow())) {
+ puts.add(put);
+ put = new Put(kv.getRow(), kv.getTimestamp());
+ }
+ put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
+ lastKey = kv.getRow();
+ }
+ puts.add(put);
+ lastTable = entry.getKey().getTablename();
+ }
+ totalReplicated++;
+ }
+ put(lastTable, puts);
+ this.metrics.setAgeOfLastAppliedOp(
+ entries[entries.length-1].getKey().getWriteTime());
+ this.metrics.appliedBatchesRate.inc(1);
+ LOG.info("Total replicated: " + totalReplicated);
+ } catch (IOException ex) {
+ if (ex.getCause() instanceof TableNotFoundException) {
+ LOG.warn("Losing edits because: ", ex);
+ } else {
+ // Should we log rejected edits in a file for replay?
+ LOG.error("Unable to accept edit because", ex);
+ this.stopper.stop("Unable to accept edit because " + ex.getMessage());
+ throw ex;
+ }
+ } catch (RuntimeException re) {
+ if (re.getCause() instanceof TableNotFoundException) {
+ LOG.warn("Losing edits because: ", re);
+ } else {
+ this.stopper.stop("Replication stopped us because " + re.getMessage());
+ throw re;
+ }
+ }
+ }
+
+ /**
+ * Do the puts and handle the pool
+ * @param tableName table to insert into
+ * @param puts list of puts
+ * @throws IOException
+ */
+ private void put(byte[] tableName, List<Put> puts) throws IOException {
+ if (puts.isEmpty()) {
+ return;
+ }
+ HTableInterface table = null;
+ try {
+ table = this.pool.getTable(tableName);
+ table.put(puts);
+ this.metrics.appliedOpsRate.inc(puts.size());
+ this.pool.putTable(table);
+ puts.clear();
+ } finally {
+ if (table != null) {
+ this.pool.putTable(table);
+ }
+ }
+ }
+
+ /**
+ * Do the delete and handle the pool
+ * @param tableName table to delete in
+ * @param delete the delete to use
+ * @throws IOException
+ */
+ private void delete(byte[] tableName, Delete delete) throws IOException {
+ HTableInterface table = null;
+ try {
+ table = this.pool.getTable(tableName);
+ table.delete(delete);
+ this.metrics.appliedOpsRate.inc(1);
+ this.pool.putTable(table);
+ } finally {
+ if (table != null) {
+ this.pool.putTable(table);
+ }
+ }
+ }
+}
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,81 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+import org.apache.hadoop.hbase.metrics.MetricsRate;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics.util.MetricsIntValue;
+import org.apache.hadoop.metrics.util.MetricsLongValue;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+
+/**
+ * This class is for maintaining the various replication statistics
+ * for a sink and publishing them through the metrics interfaces.
+ */
+public class ReplicationSinkMetrics implements Updater {
+ private final MetricsRecord metricsRecord;
+ private MetricsRegistry registry = new MetricsRegistry();
+ private static ReplicationSinkMetrics instance;
+
+ /** Rate of operations applied by the sink */
+ public final MetricsRate appliedOpsRate =
+ new MetricsRate("appliedOpsRate", registry);
+
+ /** Rate of batches (of operations) applied by the sink */
+ public final MetricsRate appliedBatchesRate =
+ new MetricsRate("appliedBatchesRate", registry);
+
+ /** Age of the last operation that was applied by the sink */
+ private final MetricsLongValue ageOfLastAppliedOp =
+ new MetricsLongValue("ageOfLastAppliedOp", registry);
+
+ /**
+ * Constructor used to register the metrics
+ */
+ public ReplicationSinkMetrics() {
+ MetricsContext context = MetricsUtil.getContext("hbase");
+ String name = Thread.currentThread().getName();
+ metricsRecord = MetricsUtil.createRecord(context, "replication");
+ metricsRecord.setTag("RegionServer", name);
+ context.registerUpdater(this);
+ // export for JMX
+ new ReplicationStatistics(this.registry, "ReplicationSink");
+ }
+
+ /**
+ * Set the age of the last edit that was applied
+ * @param timestamp write time of the edit
+ */
+ public void setAgeOfLastAppliedOp(long timestamp) {
+ ageOfLastAppliedOp.set(System.currentTimeMillis() - timestamp);
+ }
+ @Override
+ public void doUpdates(MetricsContext metricsContext) {
+ synchronized (this) {
+ this.appliedOpsRate.pushMetric(this.metricsRecord);
+ this.appliedBatchesRate.pushMetric(this.metricsRecord);
+ this.ageOfLastAppliedOp.pushMetric(this.metricsRecord);
+ }
+ this.metricsRecord.update();
+ }
+}
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,696 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Class that handles the source of a replication stream.
+ * Currently does not handle more than 1 slave
+ * For each slave cluster it selects a random number of peers
+ * using a replication ratio. For example, if replication ration = 0.1
+ * and slave cluster has 100 region servers, 10 will be selected.
+ * <p/>
+ * A stream is considered down when we cannot contact a region server on the
+ * peer cluster for more than 55 seconds by default.
+ * <p/>
+ *
+ */
+public class ReplicationSource extends Thread
+ implements ReplicationSourceInterface {
+
+ private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
+ // Queue of logs to process
+ private PriorityBlockingQueue<Path> queue;
+ // container of entries to replicate
+ private HLog.Entry[] entriesArray;
+ private HConnection conn;
+ // Helper class for zookeeper
+ private ReplicationZookeeperWrapper zkHelper;
+ private Configuration conf;
+ // ratio of region servers to chose from a slave cluster
+ private float ratio;
+ private Random random;
+ // should we replicate or not?
+ private AtomicBoolean replicating;
+ // id of the peer cluster this source replicates to
+ private String peerClusterId;
+ // The manager of all sources to which we ping back our progress
+ private ReplicationSourceManager manager;
+ // Should we stop everything?
+ private AtomicBoolean stop;
+ // List of chosen sinks (region servers)
+ private List<HServerAddress> currentPeers;
+ // How long should we sleep for each retry
+ private long sleepForRetries;
+ // Max size in bytes of entriesArray
+ private long replicationQueueSizeCapacity;
+ // Max number of entries in entriesArray
+ private int replicationQueueNbCapacity;
+ // Our reader for the current log
+ private HLog.Reader reader;
+ // Current position in the log
+ private long position = 0;
+ // Path of the current log
+ private volatile Path currentPath;
+ private FileSystem fs;
+ // id of this cluster
+ private byte clusterId;
+ // total number of edits we replicated
+ private long totalReplicatedEdits = 0;
+ // The znode we currently play with
+ private String peerClusterZnode;
+ // Indicates if this queue is recovered (and will be deleted when depleted)
+ private boolean queueRecovered;
+ // List of all the dead region servers that had this queue (if recovered)
+ private String[] deadRegionServers;
+ // Maximum number of retries before taking bold actions
+ private long maxRetriesMultiplier;
+ // Current number of entries that we need to replicate
+ private int currentNbEntries = 0;
+ // Current number of operations (Put/Delete) that we need to replicate
+ private int currentNbOperations = 0;
+ // Indicates if this particular source is running
+ private volatile boolean running = true;
+ // Metrics for this source
+ private ReplicationSourceMetrics metrics;
+
+ /**
+ * Instantiation method used by region servers
+ *
+ * @param conf configuration to use
+ * @param fs file system to use
+ * @param manager replication manager to ping to
+ * @param stopper the atomic boolean to use to stop the regionserver
+ * @param replicating the atomic boolean that starts/stops replication
+ * @param peerClusterZnode the name of our znode
+ * @throws IOException
+ */
+ public void init(final Configuration conf,
+ final FileSystem fs,
+ final ReplicationSourceManager manager,
+ final AtomicBoolean stopper,
+ final AtomicBoolean replicating,
+ final String peerClusterZnode)
+ throws IOException {
+ this.stop = stopper;
+ this.conf = conf;
+ this.replicationQueueSizeCapacity =
+ this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
+ this.replicationQueueNbCapacity =
+ this.conf.getInt("replication.source.nb.capacity", 25000);
+ this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity];
+ for (int i = 0; i < this.replicationQueueNbCapacity; i++) {
+ this.entriesArray[i] = new HLog.Entry();
+ }
+ this.maxRetriesMultiplier =
+ this.conf.getLong("replication.source.maxretriesmultiplier", 10);
+ this.queue =
+ new PriorityBlockingQueue<Path>(
+ conf.getInt("hbase.regionserver.maxlogs", 32),
+ new LogsComparator());
+ this.conn = HConnectionManager.getConnection(conf);
+ // REENABLE this.zkHelper = manager.getRepZkWrapper();
+ this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
+ this.currentPeers = new ArrayList<HServerAddress>();
+ this.random = new Random();
+ this.replicating = replicating;
+ this.manager = manager;
+ this.sleepForRetries =
+ this.conf.getLong("replication.source.sleepforretries", 1000);
+ this.fs = fs;
+ // REENALBE this.clusterId = Byte.valueOf(zkHelper.getClusterId());
+ this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
+
+ // Finally look if this is a recovered queue
+ this.checkIfQueueRecovered(peerClusterZnode);
+ }
+
+ // The passed znode will be either the id of the peer cluster or
+ // the handling story of that queue in the form of id-servername-*
+ private void checkIfQueueRecovered(String peerClusterZnode) {
+ String[] parts = peerClusterZnode.split("-");
+ this.queueRecovered = parts.length != 1;
+ this.peerClusterId = this.queueRecovered ?
+ parts[0] : peerClusterZnode;
+ this.peerClusterZnode = peerClusterZnode;
+ this.deadRegionServers = new String[parts.length-1];
+ // Extract all the places where we could find the hlogs
+ for (int i = 1; i < parts.length; i++) {
+ this.deadRegionServers[i-1] = parts[i];
+ }
+ }
+
+ /**
+ * Select a number of peers at random using the ratio. Mininum 1.
+ */
+ private void chooseSinks() {
+ // REENABLE
+// this.currentPeers.clear();
+// List<HServerAddress> addresses =
+// this.zkHelper.getPeersAddresses(peerClusterId);
+// Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
+// int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
+// LOG.info("Getting " + nbPeers +
+// " rs from peer cluster # " + peerClusterId);
+// for (int i = 0; i < nbPeers; i++) {
+// HServerAddress address;
+// // Make sure we get one address that we don't already have
+// do {
+// address = addresses.get(this.random.nextInt(addresses.size()));
+// } while (setOfAddr.contains(address));
+// LOG.info("Choosing peer " + address);
+// setOfAddr.add(address);
+// }
+// this.currentPeers.addAll(setOfAddr);
+ }
+
+ @Override
+ public void enqueueLog(Path log) {
+ this.queue.put(log);
+ this.metrics.sizeOfLogQueue.set(queue.size());
+ }
+
+ @Override
+ public void run() {
+ connectToPeers();
+ // We were stopped while looping to connect to sinks, just abort
+ if (this.stop.get()) {
+ return;
+ }
+ // If this is recovered, the queue is already full and the first log
+ // normally has a position (unless the RS failed between 2 logs)
+ if (this.queueRecovered) {
+// this.position = this.zkHelper.getHLogRepPosition(
+// this.peerClusterZnode, this.queue.peek().getName());
+ }
+ int sleepMultiplier = 1;
+ // Loop until we close down
+ while (!stop.get() && this.running) {
+ // Get a new path
+ if (!getNextPath()) {
+ if (sleepForRetries("No log to process", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+ // Open a reader on it
+ if (!openReader(sleepMultiplier)) {
+ // Reset the sleep multiplier, else it'd be reused for the next file
+ sleepMultiplier = 1;
+ continue;
+ }
+
+ // If we got a null reader but didn't continue, then sleep and continue
+ if (this.reader == null) {
+ if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+
+ boolean gotIOE = false;
+ currentNbEntries = 0;
+ try {
+ if(readAllEntriesToReplicateOrNextFile()) {
+ continue;
+ }
+ } catch (IOException ioe) {
+ LOG.warn(peerClusterZnode + " Got: ", ioe);
+ gotIOE = true;
+ if (ioe.getCause() instanceof EOFException) {
+
+ boolean considerDumping = false;
+ if (this.queueRecovered) {
+ try {
+ FileStatus stat = this.fs.getFileStatus(this.currentPath);
+ if (stat.getLen() == 0) {
+ LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
+ }
+ considerDumping = true;
+ } catch (IOException e) {
+ LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
+ }
+ } else if (currentNbEntries != 0) {
+ LOG.warn(peerClusterZnode + " Got EOF while reading, " +
+ "looks like this file is broken? " + currentPath);
+ considerDumping = true;
+ currentNbEntries = 0;
+ }
+
+ if (considerDumping &&
+ sleepMultiplier == this.maxRetriesMultiplier &&
+ processEndOfFile()) {
+ continue;
+ }
+ }
+ } finally {
+ try {
+ // if current path is null, it means we processEndOfFile hence
+ if (this.currentPath != null && !gotIOE) {
+ this.position = this.reader.getPosition();
+ }
+ if (this.reader != null) {
+ this.reader.close();
+ }
+ } catch (IOException e) {
+ gotIOE = true;
+ LOG.warn("Unable to finalize the tailing of a file", e);
+ }
+ }
+
+ // If we didn't get anything to replicate, or if we hit a IOE,
+ // wait a bit and retry.
+ // But if we need to stop, don't bother sleeping
+ if (!stop.get() && (gotIOE || currentNbEntries == 0)) {
+ if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+ sleepMultiplier = 1;
+ shipEdits();
+
+ }
+ LOG.debug("Source exiting " + peerClusterId);
+ }
+
+ /**
+ * Read all the entries from the current log files and retain those
+ * that need to be replicated. Else, process the end of the current file.
+ * @return true if we got nothing and went to the next file, false if we got
+ * entries
+ * @throws IOException
+ */
+ protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{
+ long seenEntries = 0;
+ if (this.position != 0) {
+ this.reader.seek(this.position);
+ }
+ HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]);
+ while (entry != null) {
+ WALEdit edit = entry.getEdit();
+ this.metrics.logEditsReadRate.inc(1);
+ seenEntries++;
+ // Remove all KVs that should not be replicated
+ removeNonReplicableEdits(edit);
+ HLogKey logKey = entry.getKey();
+ // Don't replicate catalog entries, if the WALEdit wasn't
+ // containing anything to replicate and if we're currently not set to replicate
+ if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
+ Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
+ edit.size() != 0 && replicating.get()) {
+ logKey.setClusterId(this.clusterId);
+ currentNbOperations += countDistinctRowKeys(edit);
+ currentNbEntries++;
+ } else {
+ this.metrics.logEditsFilteredRate.inc(1);
+ }
+ // Stop if too many entries or too big
+ if ((this.reader.getPosition() - this.position)
+ >= this.replicationQueueSizeCapacity ||
+ currentNbEntries >= this.replicationQueueNbCapacity) {
+ break;
+ }
+ entry = this.reader.next(entriesArray[currentNbEntries]);
+ }
+ LOG.debug("currentNbOperations:" + currentNbOperations +
+ " and seenEntries:" + seenEntries +
+ " and size: " + (this.reader.getPosition() - this.position));
+ // If we didn't get anything and the queue has an object, it means we
+ // hit the end of the file for sure
+ return seenEntries == 0 && processEndOfFile();
+ }
+
+ private void connectToPeers() {
+ // Connect to peer cluster first, unless we have to stop
+ while (!this.stop.get() && this.currentPeers.size() == 0) {
+ try {
+ chooseSinks();
+ Thread.sleep(this.sleepForRetries);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while trying to connect to sinks", e);
+ }
+ }
+ }
+
+ /**
+ * Poll for the next path
+ * @return true if a path was obtained, false if not
+ */
+ protected boolean getNextPath() {
+ try {
+ if (this.currentPath == null) {
+ this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
+ this.metrics.sizeOfLogQueue.set(queue.size());
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while reading edits", e);
+ }
+ return this.currentPath != null;
+ }
+
+ /**
+ * Open a reader on the current path
+ *
+ * @param sleepMultiplier by how many times the default sleeping time is augmented
+ * @return true if we should continue with that file, false if we are over with it
+ */
+ protected boolean openReader(int sleepMultiplier) {
+ // REENABLE
+// try {
+// LOG.info("Opening log for replication " + this.currentPath.getName() +
+// " at " + this.position);
+// try {
+// this.reader = null;
+// this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
+// } catch (FileNotFoundException fnfe) {
+// if (this.queueRecovered) {
+// // We didn't find the log in the archive directory, look if it still
+// // exists in the dead RS folder (there could be a chain of failures
+// // to look at)
+// for (int i = this.deadRegionServers.length - 1; i > 0; i--) {
+// Path deadRsDirectory =
+// new Path(this.manager.getLogDir(), this.deadRegionServers[i]);
+// Path possibleLogLocation =
+// new Path(deadRsDirectory, currentPath.getName());
+// if (this.manager.getFs().exists(possibleLogLocation)) {
+// // We found the right new location
+// LOG.info("Log " + this.currentPath + " still exists at " +
+// possibleLogLocation);
+// // Breaking here will make us sleep since reader is null
+// break;
+// }
+// }
+// // TODO What happens if the log was missing from every single location?
+// // Although we need to check a couple of times as the log could have
+// // been moved by the master between the checks
+// } else {
+// // If the log was archived, continue reading from there
+// Path archivedLogLocation =
+// new Path(manager.getOldLogDir(), currentPath.getName());
+// if (this.manager.getFs().exists(archivedLogLocation)) {
+// currentPath = archivedLogLocation;
+// LOG.info("Log " + this.currentPath + " was moved to " +
+// archivedLogLocation);
+// // Open the log at the new location
+// this.openReader(sleepMultiplier);
+//
+// }
+// // TODO What happens the log is missing in both places?
+// }
+// }
+// } catch (IOException ioe) {
+// LOG.warn(peerClusterZnode + " Got: ", ioe);
+// // TODO Need a better way to determinate if a file is really gone but
+// // TODO without scanning all logs dir
+// if (sleepMultiplier == this.maxRetriesMultiplier) {
+// LOG.warn("Waited too long for this file, considering dumping");
+// return !processEndOfFile();
+// }
+// }
+ return true;
+ }
+
+ /**
+ * Do the sleeping logic
+ * @param msg Why we sleep
+ * @param sleepMultiplier by how many times the default sleeping time is augmented
+ * @return
+ */
+ protected boolean sleepForRetries(String msg, int sleepMultiplier) {
+ try {
+ LOG.debug(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+ Thread.sleep(this.sleepForRetries * sleepMultiplier);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while sleeping between retries");
+ }
+ return sleepMultiplier < maxRetriesMultiplier;
+ }
+
+ /**
+ * We only want KVs that are scoped other than local
+ * @param edit The KV to check for replication
+ */
+ protected void removeNonReplicableEdits(WALEdit edit) {
+ NavigableMap<byte[], Integer> scopes = edit.getScopes();
+ List<KeyValue> kvs = edit.getKeyValues();
+ for (int i = 0; i < edit.size(); i++) {
+ KeyValue kv = kvs.get(i);
+ // The scope will be null or empty if
+ // there's nothing to replicate in that WALEdit
+ if (scopes == null || !scopes.containsKey(kv.getFamily())) {
+ kvs.remove(i);
+ i--;
+ }
+ }
+ }
+
+ /**
+ * Count the number of different row keys in the given edit because of
+ * mini-batching. We assume that there's at least one KV in the WALEdit.
+ * @param edit edit to count row keys from
+ * @return number of different row keys
+ */
+ private int countDistinctRowKeys(WALEdit edit) {
+ List<KeyValue> kvs = edit.getKeyValues();
+ int distinctRowKeys = 1;
+ KeyValue lastKV = kvs.get(0);
+ for (int i = 0; i < edit.size(); i++) {
+ if (!kvs.get(i).matchingRow(lastKV)) {
+ distinctRowKeys++;
+ }
+ }
+ return distinctRowKeys;
+ }
+
+ /**
+ * Do the shipping logic
+ */
+ protected void shipEdits() {
+ // REENABLE
+// int sleepMultiplier = 1;
+// while (!stop.get()) {
+// try {
+// HRegionInterface rrs = getRS();
+// LOG.debug("Replicating " + currentNbEntries);
+// rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
+// this.manager.logPositionAndCleanOldLogs(this.currentPath,
+// this.peerClusterZnode, this.position, queueRecovered);
+// this.totalReplicatedEdits += currentNbEntries;
+// this.metrics.shippedBatchesRate.inc(1);
+// this.metrics.shippedOpsRate.inc(
+// this.currentNbOperations);
+// this.metrics.setAgeOfLastShippedOp(
+// this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime());
+// LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
+// break;
+//
+// } catch (IOException ioe) {
+// LOG.warn("Unable to replicate because ", ioe);
+// try {
+// boolean down;
+// do {
+// down = isSlaveDown();
+// if (down) {
+// LOG.debug("The region server we tried to ping didn't answer, " +
+// "sleeping " + sleepForRetries + " times " + sleepMultiplier);
+// Thread.sleep(this.sleepForRetries * sleepMultiplier);
+// if (sleepMultiplier < maxRetriesMultiplier) {
+// sleepMultiplier++;
+// } else {
+// chooseSinks();
+// }
+// }
+// } while (!stop.get() && down);
+// } catch (InterruptedException e) {
+// LOG.debug("Interrupted while trying to contact the peer cluster");
+// }
+//
+// }
+// }
+ }
+
+ /**
+ * If the queue isn't empty, switch to the next one
+ * Else if this is a recovered queue, it means we're done!
+ * Else we'll just continue to try reading the log file
+ * @return true if we're done with the current file, false if we should
+ * continue trying to read from it
+ */
+ protected boolean processEndOfFile() {
+ // REENABLE
+// if (this.queue.size() != 0) {
+// this.currentPath = null;
+// this.position = 0;
+// return true;
+// } else if (this.queueRecovered) {
+// this.manager.closeRecoveredQueue(this);
+// this.abort();
+// return true;
+// }
+ return false;
+ }
+
+ public void startup() {
+ String n = Thread.currentThread().getName();
+ Thread.UncaughtExceptionHandler handler =
+ new Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(final Thread t, final Throwable e) {
+ LOG.fatal("Set stop flag in " + t.getName(), e);
+ abort();
+ }
+ };
+ Threads.setDaemonThreadRunning(
+ this, n + ".replicationSource," + clusterId, handler);
+ }
+
+ /**
+ * Hastily stop the replication, then wait for shutdown
+ */
+ private void abort() {
+ LOG.info("abort");
+ this.running = false;
+ terminate();
+ }
+
+ public void terminate() {
+ LOG.info("terminate");
+ Threads.shutdown(this, this.sleepForRetries);
+ }
+
+ /**
+ * Get a new region server at random from this peer
+ * @return
+ * @throws IOException
+ */
+ private HRegionInterface getRS() throws IOException {
+ if (this.currentPeers.size() == 0) {
+ throw new IOException(this.peerClusterZnode + " has 0 region servers");
+ }
+ HServerAddress address =
+ currentPeers.get(random.nextInt(this.currentPeers.size()));
+ return this.conn.getHRegionConnection(address);
+ }
+
+ /**
+ * Check if the slave is down by trying to establish a connection
+ * @return true if down, false if up
+ * @throws InterruptedException
+ */
+ public boolean isSlaveDown() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ Thread pingThread = new Thread() {
+ public void run() {
+ try {
+ HRegionInterface rrs = getRS();
+ // Dummy call which should fail
+ rrs.getHServerInfo();
+ latch.countDown();
+ } catch (IOException ex) {
+ LOG.info("Slave cluster looks down: " + ex.getMessage());
+ }
+ }
+ };
+ pingThread.start();
+ // awaits returns true if countDown happened
+ boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS);
+ pingThread.interrupt();
+ return down;
+ }
+
+ /**
+ * Get the id that the source is replicating to
+ *
+ * @return peer cluster id
+ */
+ public String getPeerClusterZnode() {
+ return this.peerClusterZnode;
+ }
+
+ /**
+ * Get the path of the current HLog
+ * @return current hlog's path
+ */
+ public Path getCurrentPath() {
+ return this.currentPath;
+ }
+
+ /**
+ * Comparator used to compare logs together based on their start time
+ */
+ public static class LogsComparator implements Comparator<Path> {
+
+ @Override
+ public int compare(Path o1, Path o2) {
+ return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return true;
+ }
+
+ /**
+ * Split a path to get the start time
+ * For example: 10.20.20.171%3A60020.1277499063250
+ * @param p path to split
+ * @return start time
+ */
+ private long getTS(Path p) {
+ String[] parts = p.getName().split("\\.");
+ return Long.parseLong(parts[parts.length-1]);
+ }
+ }
+
+}
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Interface that defines a replication source
+ */
+public interface ReplicationSourceInterface {
+
+ /**
+ * Initializer for the source
+ * @param conf the configuration to use
+ * @param fs the file system to use
+ * @param manager the manager to use
+ * @param stopper the stopper object for this region server
+ * @param replicating the status of the replication on this cluster
+ * @param peerClusterId the id of the peer cluster
+ * @throws IOException
+ */
+ public void init(final Configuration conf,
+ final FileSystem fs,
+ final ReplicationSourceManager manager,
+ final AtomicBoolean stopper,
+ final AtomicBoolean replicating,
+ final String peerClusterId) throws IOException;
+
+ /**
+ * Add a log to the list of logs to replicate
+ * @param log path to the log to replicate
+ */
+ public void enqueueLog(Path log);
+
+ /**
+ * Get the current log that's replicated
+ * @return the current log
+ */
+ public Path getCurrentPath();
+
+ /**
+ * Start the replication
+ */
+ public void startup();
+
+ /**
+ * End the replication
+ */
+ public void terminate();
+
+ /**
+ * Get the id that the source is replicating to
+ *
+ * @return peer cluster id
+ */
+ public String getPeerClusterZnode();
+}
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,383 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.LogActionsListener;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class is responsible to manage all the replication
+ * sources. There are two classes of sources:
+ * <li> Normal sources are persistent and one per peer cluster</li>
+ * <li> Old sources are recovered from a failed region server and our
+ * only goal is to finish replicating the HLog queue it had up in ZK</li>
+ *
+ * When a region server dies, this class uses a watcher to get notified and it
+ * tries to grab a lock in order to transfer all the queues in a local
+ * old source.
+ */
+public class ReplicationSourceManager implements LogActionsListener {
+
+ @Override
+ public void logRolled(Path newFile) {
+ // TODO Auto-generated method stub
+
+ }
+ // REENABLE
+//
+// private static final Log LOG =
+// LogFactory.getLog(ReplicationSourceManager.class);
+// // List of all the sources that read this RS's logs
+// private final List<ReplicationSourceInterface> sources;
+// // List of all the sources we got from died RSs
+// private final List<ReplicationSourceInterface> oldsources;
+// // Indicates if we are currently replicating
+// private final AtomicBoolean replicating;
+// // Helper for zookeeper
+// private final ReplicationZookeeperWrapper zkHelper;
+// // Indicates if the region server is closing
+// private final AtomicBoolean stopper;
+// // All logs we are currently trackign
+// private final SortedSet<String> hlogs;
+// private final Configuration conf;
+// private final FileSystem fs;
+// // The path to the latest log we saw, for new coming sources
+// private Path latestPath;
+// // List of all the other region servers in this cluster
+// private final List<String> otherRegionServers;
+// // Path to the hlogs directories
+// private final Path logDir;
+// // Path to the hlog archive
+// private final Path oldLogDir;
+//
+// /**
+// * Creates a replication manager and sets the watch on all the other
+// * registered region servers
+// * @param zkHelper the zk helper for replication
+// * @param conf the configuration to use
+// * @param stopper the stopper object for this region server
+// * @param fs the file system to use
+// * @param replicating the status of the replication on this cluster
+// * @param logDir the directory that contains all hlog directories of live RSs
+// * @param oldLogDir the directory where old logs are archived
+// */
+// public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper,
+// final Configuration conf,
+// final AtomicBoolean stopper,
+// final FileSystem fs,
+// final AtomicBoolean replicating,
+// final Path logDir,
+// final Path oldLogDir) {
+// this.sources = new ArrayList<ReplicationSourceInterface>();
+// this.replicating = replicating;
+// this.zkHelper = zkHelper;
+// this.stopper = stopper;
+// this.hlogs = new TreeSet<String>();
+// this.oldsources = new ArrayList<ReplicationSourceInterface>();
+// this.conf = conf;
+// this.fs = fs;
+// this.logDir = logDir;
+// this.oldLogDir = oldLogDir;
+// List<String> otherRSs =
+// this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher());
+// this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
+// }
+//
+// /**
+// * Provide the id of the peer and a log key and this method will figure which
+// * hlog it belongs to and will log, for this region server, the current
+// * position. It will also clean old logs from the queue.
+// * @param log Path to the log currently being replicated from
+// * replication status in zookeeper. It will also delete older entries.
+// * @param id id of the peer cluster
+// * @param position current location in the log
+// * @param queueRecovered indicates if this queue comes from another region server
+// */
+// public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
+// String key = log.getName();
+// LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
+// this.zkHelper.writeReplicationStatus(key.toString(), id, position);
+// synchronized (this.hlogs) {
+// if (!queueRecovered && this.hlogs.first() != key) {
+// SortedSet<String> hlogSet = this.hlogs.headSet(key);
+// LOG.info("Removing " + hlogSet.size() +
+// " logs in the list: " + hlogSet);
+// for (String hlog : hlogSet) {
+// this.zkHelper.removeLogFromList(hlog.toString(), id);
+// }
+// hlogSet.clear();
+// }
+// }
+// }
+//
+// /**
+// * Adds a normal source per registered peer cluster and tries to process all
+// * old region server hlog queues
+// */
+// public void init() throws IOException {
+// for (String id : this.zkHelper.getPeerClusters().keySet()) {
+// ReplicationSourceInterface src = addSource(id);
+// src.startup();
+// }
+// List<String> currentReplicators = this.zkHelper.getListOfReplicators(null);
+// synchronized (otherRegionServers) {
+// LOG.info("Current list of replicators: " + currentReplicators
+// + " other RSs: " + otherRegionServers);
+// }
+// // Look if there's anything to process after a restart
+// for (String rs : currentReplicators) {
+// synchronized (otherRegionServers) {
+// if (!this.otherRegionServers.contains(rs)) {
+// transferQueues(rs);
+// }
+// }
+// }
+// }
+//
+// /**
+// * Add a new normal source to this region server
+// * @param id the id of the peer cluster
+// * @return the created source
+// * @throws IOException
+// */
+// public ReplicationSourceInterface addSource(String id) throws IOException {
+// ReplicationSourceInterface src =
+// getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
+// this.sources.add(src);
+// synchronized (this.hlogs) {
+// if (this.hlogs.size() > 0) {
+// this.zkHelper.addLogToList(this.hlogs.first(),
+// this.sources.get(0).getPeerClusterZnode());
+// src.enqueueLog(this.latestPath);
+// }
+// }
+// return src;
+// }
+//
+// /**
+// * Terminate the replication on this region server
+// */
+// public void join() {
+// if (this.sources.size() == 0) {
+// this.zkHelper.deleteOwnRSZNode();
+// }
+// for (ReplicationSourceInterface source : this.sources) {
+// source.terminate();
+// }
+// }
+//
+// /**
+// * Get a copy of the hlogs of the first source on this rs
+// * @return a sorted set of hlog names
+// */
+// protected SortedSet<String> getHLogs() {
+// return new TreeSet(this.hlogs);
+// }
+//
+// /**
+// * Get a list of all the normal sources of this rs
+// * @return lis of all sources
+// */
+// public List<ReplicationSourceInterface> getSources() {
+// return this.sources;
+// }
+//
+// @Override
+// public void logRolled(Path newLog) {
+// if (this.sources.size() > 0) {
+// this.zkHelper.addLogToList(newLog.getName(),
+// this.sources.get(0).getPeerClusterZnode());
+// }
+// synchronized (this.hlogs) {
+// this.hlogs.add(newLog.getName());
+// }
+// this.latestPath = newLog;
+// // This only update the sources we own, not the recovered ones
+// for (ReplicationSourceInterface source : this.sources) {
+// source.enqueueLog(newLog);
+// }
+// }
+//
+// /**
+// * Get the ZK help of this manager
+// * @return the helper
+// */
+// public ReplicationZookeeperWrapper getRepZkWrapper() {
+// return zkHelper;
+// }
+//
+// /**
+// * Factory method to create a replication source
+// * @param conf the configuration to use
+// * @param fs the file system to use
+// * @param manager the manager to use
+// * @param stopper the stopper object for this region server
+// * @param replicating the status of the replication on this cluster
+// * @param peerClusterId the id of the peer cluster
+// * @return the created source
+// * @throws IOException
+// */
+// public ReplicationSourceInterface getReplicationSource(
+// final Configuration conf,
+// final FileSystem fs,
+// final ReplicationSourceManager manager,
+// final AtomicBoolean stopper,
+// final AtomicBoolean replicating,
+// final String peerClusterId) throws IOException {
+// ReplicationSourceInterface src;
+// try {
+// Class c = Class.forName(conf.get("replication.replicationsource.implementation",
+// ReplicationSource.class.getCanonicalName()));
+// src = (ReplicationSourceInterface) c.newInstance();
+// } catch (Exception e) {
+// LOG.warn("Passed replication source implemention throws errors, " +
+// "defaulting to ReplicationSource", e);
+// src = new ReplicationSource();
+//
+// }
+// src.init(conf, fs, manager, stopper, replicating, peerClusterId);
+// return src;
+// }
+//
+// /**
+// * Transfer all the queues of the specified to this region server.
+// * First it tries to grab a lock and if it works it will move the
+// * znodes and finally will delete the old znodes.
+// *
+// * It creates one old source for any type of source of the old rs.
+// * @param rsZnode
+// */
+// public void transferQueues(String rsZnode) {
+// // We try to lock that rs' queue directory
+// if (this.stopper.get()) {
+// LOG.info("Not transferring queue since we are shutting down");
+// return;
+// }
+// if (!this.zkHelper.lockOtherRS(rsZnode)) {
+// return;
+// }
+// LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
+// SortedMap<String, SortedSet<String>> newQueues =
+// this.zkHelper.copyQueuesFromRS(rsZnode);
+// if (newQueues == null || newQueues.size() == 0) {
+// return;
+// }
+// this.zkHelper.deleteRsQueues(rsZnode);
+//
+// for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
+// String peerId = entry.getKey();
+// try {
+// ReplicationSourceInterface src = getReplicationSource(this.conf,
+// this.fs, this, this.stopper, this.replicating, peerId);
+// this.oldsources.add(src);
+// for (String hlog : entry.getValue()) {
+// src.enqueueLog(new Path(this.oldLogDir, hlog));
+// }
+// src.startup();
+// } catch (IOException e) {
+// // TODO manage it
+// LOG.error("Failed creating a source", e);
+// }
+// }
+// }
+//
+// /**
+// * Clear the references to the specified old source
+// * @param src source to clear
+// */
+// public void closeRecoveredQueue(ReplicationSourceInterface src) {
+// LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
+// this.oldsources.remove(src);
+// this.zkHelper.deleteSource(src.getPeerClusterZnode());
+// }
+//
+// /**
+// * Watcher used to be notified of the other region server's death
+// * in the local cluster. It initiates the process to transfer the queues
+// * if it is able to grab the lock.
+// */
+// public class OtherRegionServerWatcher implements Watcher {
+// @Override
+// public void process(WatchedEvent watchedEvent) {
+// LOG.info(" event " + watchedEvent);
+// if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
+// watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
+// return;
+// }
+//
+// List<String> newRsList = (zkHelper.getRegisteredRegionServers(this));
+// if (newRsList == null) {
+// return;
+// } else {
+// synchronized (otherRegionServers) {
+// otherRegionServers.clear();
+// otherRegionServers.addAll(newRsList);
+// }
+// }
+// if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
+// LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
+// String[] rsZnodeParts = watchedEvent.getPath().split("/");
+// transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
+// }
+// }
+// }
+//
+// /**
+// * Get the directory where hlogs are archived
+// * @return the directory where hlogs are archived
+// */
+// public Path getOldLogDir() {
+// return this.oldLogDir;
+// }
+//
+// /**
+// * Get the directory where hlogs are stored by their RSs
+// * @return the directory where hlogs are stored by their RSs
+// */
+// public Path getLogDir() {
+// return this.logDir;
+// }
+//
+// /**
+// * Get the handle on the local file system
+// * @returnthe handle on the local file system
+// */
+// public FileSystem getFs() {
+// return this.fs;
+// }
+
+}