You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/27 07:01:07 UTC

svn commit: r990018 [5/10] - in /hbase/branches/0.90_master_rewrite: ./ bin/ bin/replication/ src/assembly/ src/docbkx/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/filter/ s...

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,494 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class serves as a helper for all things related to zookeeper
+ * in replication.
+ * <p/>
+ * The layout looks something like this under zookeeper.znode.parent
+ * for the master cluster:
+ * <p/>
+ * <pre>
+ * replication/
+ *  master     {contains a full cluster address}
+ *  state      {contains true or false}
+ *  clusterId  {contains a byte}
+ *  peers/
+ *    1/   {contains a full cluster address}
+ *    2/
+ *    ...
+ *  rs/ {lists all RS that replicate}
+ *    startcode1/ {lists all peer clusters}
+ *      1/ {lists hlogs to process}
+ *        10.10.1.76%3A53488.123456789 {contains nothing or a position}
+ *        10.10.1.76%3A53488.123456790
+ *        ...
+ *      2/
+ *      ...
+ *    startcode2/
+ *    ...
+ * </pre>
+ */
+public class ReplicationZookeeperWrapper {
+  // REENABLE
+
+//  private static final Log LOG =
+//      LogFactory.getLog(ReplicationZookeeperWrapper.class);
+//  // Name of znode we use to lock when failover
+//  private final static String RS_LOCK_ZNODE = "lock";
+//  /*
+//  // Our handle on zookeeper
+//  private final ZooKeeperWrapper zookeeperWrapper;
+//  // Map of addresses of peer clusters with their ZKW
+//  private final Map<String, ZooKeeperWrapper> peerClusters;*/
+//  // Path to the root replication znode
+//  private final String replicationZNode;
+//  // Path to the peer clusters znode
+//  private final String peersZNode;
+//  // Path to the znode that contains all RS that replicates
+//  private final String rsZNode;
+//  // Path to this region server's name under rsZNode
+//  private final String rsServerNameZnode;
+//  // Name node if the replicationState znode
+//  private final String replicationStateNodeName;
+//  // If this RS is part of a master cluster
+//  private final boolean replicationMaster;
+//  private final Configuration conf;
+//  // Is this cluster replicating at the moment?
+//  private final AtomicBoolean replicating;
+//  // Byte (stored as string here) that identifies this cluster
+//  private final String clusterId;
+//
+//  /**
+//   * Constructor used by region servers, connects to the peer cluster right away.
+//   *
+//   * @param zookeeperWrapper zkw to wrap
+//   * @param conf             conf to use
+//   * @param replicating    atomic boolean to start/stop replication
+//   * @param rsName      the name of this region server, null if
+//   *                         using RZH only to use the helping methods
+//   * @throws IOException
+//   */
+//  public ReplicationZookeeperWrapper(
+//      ZooKeeperWrapper zookeeperWrapper, Configuration conf,
+//      final AtomicBoolean replicating, String rsName) throws IOException {
+//    this.zookeeperWrapper = zookeeperWrapper;
+//    this.conf = conf;
+//    String replicationZNodeName =
+//        conf.get("zookeeper.znode.replication", "replication");
+//    String peersZNodeName =
+//        conf.get("zookeeper.znode.replication.peers", "peers");
+//    String repMasterZNodeName =
+//        conf.get("zookeeper.znode.replication.master", "master");
+//    this.replicationStateNodeName =
+//        conf.get("zookeeper.znode.replication.state", "state");
+//    String clusterIdZNodeName =
+//        conf.get("zookeeper.znode.replication.clusterId", "clusterId");
+//    String rsZNodeName =
+//        conf.get("zookeeper.znode.replication.rs", "rs");
+//    String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
+//          this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
+//          this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+//
+//    this.peerClusters = new HashMap<String, ZooKeeperWrapper>();
+//    this.replicationZNode = zookeeperWrapper.getZNode(
+//        zookeeperWrapper.getParentZNode(), replicationZNodeName);
+//    this.peersZNode =
+//        zookeeperWrapper.getZNode(replicationZNode, peersZNodeName);
+//    this.rsZNode =
+//        zookeeperWrapper.getZNode(replicationZNode, rsZNodeName);
+//
+//    this.replicating = replicating;
+//    setReplicating();
+//    String idResult = Bytes.toString(
+//        this.zookeeperWrapper.getData(this.replicationZNode,
+//        clusterIdZNodeName));
+//    this.clusterId =
+//        idResult == null ?
+//            Byte.toString(HConstants.DEFAULT_CLUSTER_ID) : idResult;
+//    String address = Bytes.toString(
+//        this.zookeeperWrapper.getData(this.replicationZNode,
+//          repMasterZNodeName));
+//    this.replicationMaster = thisCluster.equals(address);
+//    LOG.info("This cluster (" + thisCluster + ") is a "
+//          + (this.replicationMaster ? "master" : "slave") + " for replication" +
+//          ", compared with (" + address + ")");
+//    if (rsName != null) {
+//      this.rsServerNameZnode =
+//          this.zookeeperWrapper.getZNode(rsZNode, rsName);
+//      List<String> znodes = this.zookeeperWrapper.listZnodes(this.peersZNode,
+//          new ReplicationStatusWatcher());
+//      if (znodes != null) {
+//        for (String znode : znodes) {
+//          connectToPeer(znode);
+//        }
+//      }
+//    } else {
+//      this.rsServerNameZnode = null;
+//    }
+//
+//  }
+//
+//  /**
+//   * Returns all region servers from given peer
+//   *
+//   * @param peerClusterId (byte) the cluster to interrogate
+//   * @return addresses of all region servers
+//   */
+//  public List<HServerAddress> getPeersAddresses(String peerClusterId) {
+//    if (this.peerClusters.size() == 0) {
+//      return new ArrayList<HServerAddress>(0);
+//    }
+//    ZooKeeperWrapper zkw = this.peerClusters.get(peerClusterId);
+//    return zkw == null?
+//        new ArrayList<HServerAddress>(0) : zkw.scanRSDirectory();
+//  }
+//
+//  /**
+//   * This method connects this cluster to another one and registers it
+//   * in this region server's replication znode
+//   * @param peerId id of the peer cluster
+//   */
+//  private void connectToPeer(String peerId) throws IOException {
+//    String[] ensemble =
+//        Bytes.toString(this.zookeeperWrapper.getData(this.peersZNode, peerId)).
+//            split(":");
+//    if (ensemble.length != 3) {
+//      throw new IllegalArgumentException("Wrong format of cluster address: " +
+//          this.zookeeperWrapper.getData(this.peersZNode, peerId));
+//    }
+//    Configuration otherConf = new Configuration(this.conf);
+//    otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
+//    otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
+//    otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
+//    ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf,
+//        "connection to cluster: " + peerId);
+//    zkw.registerListener(new ReplicationStatusWatcher());
+//    this.peerClusters.put(peerId, zkw);
+//    this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode(
+//        this.rsServerNameZnode, peerId));
+//    LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
+//  }
+//
+//  /**
+//   * This reads the state znode for replication and sets the atomic boolean
+//   */
+//  private void setReplicating() {
+//    String value = Bytes.toString(this.zookeeperWrapper.getDataAndWatch(
+//        this.replicationZNode, this.replicationStateNodeName,
+//        new ReplicationStatusWatcher()));
+//    if (value != null) {
+//      this.replicating.set(value.equals("true"));
+//      LOG.info("Replication is now " + (this.replicating.get() ?
+//          "started" : "stopped"));
+//    }
+//  }
+//
+//  /**
+//   * Add a new log to the list of hlogs in zookeeper
+//   * @param filename name of the hlog's znode
+//   * @param clusterId name of the cluster's znode
+//   */
+//  public void addLogToList(String filename, String clusterId) {
+//    try {
+//      this.zookeeperWrapper.writeZNode(
+//          this.zookeeperWrapper.getZNode(
+//              this.rsServerNameZnode, clusterId), filename, "");
+//    } catch (InterruptedException e) {
+//      LOG.error(e);
+//    } catch (KeeperException e) {
+//      LOG.error(e);
+//    }
+//  }
+//
+//  /**
+//   * Remove a log from the list of hlogs in zookeeper
+//   * @param filename name of the hlog's znode
+//   * @param clusterId name of the cluster's znode
+//   */
+//  public void removeLogFromList(String filename, String clusterId) {
+//    try {
+//      this.zookeeperWrapper.deleteZNode(
+//          this.zookeeperWrapper.getZNode(this.rsServerNameZnode,
+//              this.zookeeperWrapper.getZNode(clusterId, filename)));
+//    } catch (InterruptedException e) {
+//      LOG.error(e);
+//    } catch (KeeperException e) {
+//      LOG.error(e);
+//    }
+//  }
+//
+//  /**
+//   * Set the current position of the specified cluster in the current hlog
+//   * @param filename filename name of the hlog's znode
+//   * @param clusterId clusterId name of the cluster's znode
+//   * @param position the position in the file
+//   * @throws IOException
+//   */
+//  public void writeReplicationStatus(String filename, String clusterId,
+//                                     long position) {
+//    try {
+//      String clusterZNode = this.zookeeperWrapper.getZNode(
+//          this.rsServerNameZnode, clusterId);
+//      this.zookeeperWrapper.writeZNode(clusterZNode, filename,
+//          Long.toString(position));
+//    } catch (InterruptedException e) {
+//      LOG.error(e);
+//    } catch (KeeperException e) {
+//      LOG.error(e);
+//    }
+//  }
+//
+//  /**
+//   * Get a list of all the other region servers in this cluster
+//   * and set a watch
+//   * @param watch the watch to set
+//   * @return a list of server nanes
+//   */
+//  public List<String> getRegisteredRegionServers(Watcher watch) {
+//    return this.zookeeperWrapper.listZnodes(
+//        this.zookeeperWrapper.getRsZNode(), watch);
+//  }
+//
+//  /**
+//   * Get the list of the replicators that have queues, they can be alive, dead
+//   * or simply from a previous run
+//   * @param watch the watche to set
+//   * @return a list of server names
+//   */
+//  public List<String> getListOfReplicators(Watcher watch) {
+//    return this.zookeeperWrapper.listZnodes(rsZNode, watch);
+//  }
+//
+//  /**
+//   * Get the list of peer clusters for the specified server names
+//   * @param rs server names of the rs
+//   * @param watch the watch to set
+//   * @return a list of peer cluster
+//   */
+//  public List<String> getListPeersForRS(String rs, Watcher watch) {
+//    return this.zookeeperWrapper.listZnodes(
+//        zookeeperWrapper.getZNode(rsZNode, rs), watch);
+//  }
+//
+//  /**
+//   * Get the list of hlogs for the specified region server and peer cluster
+//   * @param rs server names of the rs
+//   * @param id peer cluster
+//   * @param watch the watch to set
+//   * @return a list of hlogs
+//   */
+//  public List<String> getListHLogsForPeerForRS(String rs, String id, Watcher watch) {
+//    return this.zookeeperWrapper.listZnodes(
+//        zookeeperWrapper.getZNode(zookeeperWrapper.getZNode(rsZNode, rs), id), watch);
+//  }
+//
+//  /**
+//   * Try to set a lock in another server's znode.
+//   * @param znode the server names of the other server
+//   * @return true if the lock was acquired, false in every other cases
+//   */
+//  public boolean lockOtherRS(String znode) {
+//    try {
+//      this.zookeeperWrapper.writeZNode(
+//          this.zookeeperWrapper.getZNode(this.rsZNode, znode),
+//          RS_LOCK_ZNODE, rsServerNameZnode, true);
+//
+//    } catch (InterruptedException e) {
+//      LOG.error(e);
+//      return false;
+//    } catch (KeeperException e) {
+//      LOG.debug("Won't lock " + znode + " because " + e.getMessage());
+//      // TODO see if the other still exists!!
+//      return false;
+//    }
+//    return true;
+//  }
+//
+//  /**
+//   * This methods copies all the hlogs queues from another region server
+//   * and returns them all sorted per peer cluster (appended with the dead
+//   * server's znode)
+//   * @param znode server names to copy
+//   * @return all hlogs for all peers of that cluster, null if an error occurred
+//   */
+//  public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
+//    // TODO this method isn't atomic enough, we could start copying and then
+//    // TODO fail for some reason and we would end up with znodes we don't want.
+//    SortedMap<String,SortedSet<String>> queues =
+//        new TreeMap<String,SortedSet<String>>();
+//    try {
+//      String nodePath = this.zookeeperWrapper.getZNode(rsZNode, znode);
+//      List<String> clusters = this.zookeeperWrapper.listZnodes(nodePath, null);
+//      // We have a lock znode in there, it will count as one.
+//      if (clusters == null || clusters.size() <= 1) {
+//        return queues;
+//      }
+//      // The lock isn't a peer cluster, remove it
+//      clusters.remove(RS_LOCK_ZNODE);
+//      for (String cluster : clusters) {
+//        // We add the name of the recovered RS to the new znode, we can even
+//        // do that for queues that were recovered 10 times giving a znode like
+//        // number-startcode-number-otherstartcode-number-anotherstartcode-etc
+//        String newCluster = cluster+"-"+znode;
+//        String newClusterZnode =
+//            this.zookeeperWrapper.getZNode(rsServerNameZnode, newCluster);
+//        this.zookeeperWrapper.ensureExists(newClusterZnode);
+//        String clusterPath = this.zookeeperWrapper.getZNode(nodePath, cluster);
+//        List<String> hlogs = this.zookeeperWrapper.listZnodes(clusterPath, null);
+//        // That region server didn't have anything to replicate for this cluster
+//        if (hlogs == null || hlogs.size() == 0) {
+//          continue;
+//        }
+//        SortedSet<String> logQueue = new TreeSet<String>();
+//        queues.put(newCluster, logQueue);
+//        for (String hlog : hlogs) {
+//          String position = Bytes.toString(
+//              this.zookeeperWrapper.getData(clusterPath, hlog));
+//          LOG.debug("Creating " + hlog + " with data " + position);
+//          this.zookeeperWrapper.writeZNode(newClusterZnode, hlog, position);
+//          logQueue.add(hlog);
+//        }
+//      }
+//    } catch (InterruptedException e) {
+//      LOG.warn(e);
+//      return null;
+//    } catch (KeeperException e) {
+//      LOG.warn(e);
+//      return null;
+//    }
+//    return queues;
+//  }
+//
+//  /**
+//   * Delete a complete queue of hlogs
+//   * @param peerZnode znode of the peer cluster queue of hlogs to delete
+//   */
+//  public void deleteSource(String peerZnode) {
+//    try {
+//      this.zookeeperWrapper.deleteZNode(
+//          this.zookeeperWrapper.getZNode(rsServerNameZnode, peerZnode), true);
+//    } catch (InterruptedException e) {
+//      LOG.error(e);
+//    } catch (KeeperException e) {
+//      LOG.error(e);
+//    }
+//  }
+//
+//  /**
+//   * Recursive deletion of all znodes in specified rs' znode
+//   * @param znode
+//   */
+//  public void deleteRsQueues(String znode) {
+//    try {
+//      this.zookeeperWrapper.deleteZNode(
+//          this.zookeeperWrapper.getZNode(rsZNode, znode), true);
+//    } catch (InterruptedException e) {
+//      LOG.error(e);
+//    } catch (KeeperException e) {
+//      LOG.error(e);
+//    }
+//  }
+//
+//  /**
+//   * Delete this cluster's queues
+//   */
+//  public void deleteOwnRSZNode() {
+//    deleteRsQueues(this.rsServerNameZnode);
+//  }
+//
+//  /**
+//   * Get the position of the specified hlog in the specified peer znode
+//   * @param peerId znode of the peer cluster
+//   * @param hlog name of the hlog
+//   * @return the position in that hlog
+//   */
+//  public long getHLogRepPosition(String peerId, String hlog) {
+//    String clusterZnode =
+//        this.zookeeperWrapper.getZNode(rsServerNameZnode, peerId);
+//    String data = Bytes.toString(
+//        this.zookeeperWrapper.getData(clusterZnode, hlog));
+//    return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
+//  }
+//
+//  /**
+//   * Tells if this cluster replicates or not
+//   *
+//   * @return if this is a master
+//   */
+//  public boolean isReplicationMaster() {
+//    return this.replicationMaster;
+//  }
+//
+//  /**
+//   * Get the identification of the cluster
+//   *
+//   * @return the id for the cluster
+//   */
+//  public String getClusterId() {
+//    return this.clusterId;
+//  }
+//
+//  /**
+//   * Get a map of all peer clusters
+//   * @return map of peer cluster, zk address to ZKW
+//   */
+//  public Map<String, ZooKeeperWrapper> getPeerClusters() {
+//    return this.peerClusters;
+//  }
+//
+//  /**
+//   * Watcher for the status of the replication
+//   */
+//  public class ReplicationStatusWatcher implements Watcher {
+//    @Override
+//    public void process(WatchedEvent watchedEvent) {
+//      Event.EventType type = watchedEvent.getType();
+//      LOG.info("Got event " + type + " with path " + watchedEvent.getPath());
+//      if (type.equals(Event.EventType.NodeDataChanged)) {
+//        setReplicating();
+//      }
+//    }
+//  }
+
+}

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.LogCleanerDelegate;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+// REENALBE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Implementation of a log cleaner that checks if a log is still scheduled for
+ * replication before deleting it when its TTL is over.
+ */
+public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
+
+  private static final Log LOG =
+    LogFactory.getLog(ReplicationLogCleaner.class);
+  private Configuration conf;
+  private ReplicationZookeeperWrapper zkHelper;
+  private Set<String> hlogs = new HashSet<String>();
+
+  /**
+   * Instantiates the cleaner, does nothing more.
+   */
+  public ReplicationLogCleaner() {}
+
+  @Override
+  public boolean isLogDeletable(Path filePath) {
+    String log = filePath.getName();
+    // If we saw the hlog previously, let's consider it's still used
+    // At some point in the future we will refresh the list and it will be gone
+    if (this.hlogs.contains(log)) {
+      return false;
+    }
+
+    // Let's see it's still there
+    // This solution makes every miss very expensive to process since we
+    // almost completely refresh the cache each time
+    return !refreshHLogsAndSearch(log);
+  }
+
+  /**
+   * Search through all the hlogs we have in ZK to refresh the cache
+   * If a log is specified and found, then we early out and return true
+   * @param searchedLog log we are searching for, pass null to cache everything
+   *                    that's in zookeeper.
+   * @return false until a specified log is found.
+   */
+  private boolean refreshHLogsAndSearch(String searchedLog) {
+    this.hlogs.clear();
+    final boolean lookForLog = searchedLog != null;
+// REENALBE
+//    List<String> rss = zkHelper.getListOfReplicators(this);
+//    if (rss == null) {
+//      LOG.debug("Didn't find any region server that replicates, deleting: " +
+//          searchedLog);
+//      return false;
+//    }
+//    for (String rs: rss) {
+//      List<String> listOfPeers = zkHelper.getListPeersForRS(rs, this);
+//      // if rs just died, this will be null
+//      if (listOfPeers == null) {
+//        continue;
+//      }
+//      for (String id : listOfPeers) {
+//        List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id, this);
+//        if (peersHlogs != null) {
+//          this.hlogs.addAll(peersHlogs);
+//        }
+//        // early exit if we found the log
+//        if(lookForLog && this.hlogs.contains(searchedLog)) {
+//          LOG.debug("Found log in ZK, keeping: " + searchedLog);
+//          return true;
+//        }
+//      }
+//    }
+    LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
+    return false;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+//    try {
+      // REENABLE
+//      this.zkHelper = new ReplicationZookeeperWrapper(
+//          ZooKeeperWrapper.createInstance(this.conf,
+//              HMaster.class.getName()),
+//          this.conf, new AtomicBoolean(true), null);
+//    } catch (IOException e) {
+//      LOG.error(e);
+//    }
+    refreshHLogsAndSearch(null);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void process(WatchedEvent watchedEvent) {}
+}

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.LogEntryVisitor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.util.Bytes;
+// REENABLE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+
+import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Replication serves as an umbrella over the setup of replication and
+ * is used by HRS.
+ */
+public class Replication implements LogEntryVisitor {
+
+  private final boolean replication;
+// REENALBE  private final ReplicationSourceManager replicationManager;
+  private boolean replicationMaster;
+  private final AtomicBoolean replicating = new AtomicBoolean(true);
+// REENALBE  private final ReplicationZookeeperWrapper zkHelper;
+  private final Configuration conf;
+  private ReplicationSink replicationSink;
+  private final Server server;
+
+  /**
+   * Instantiate the replication management (if rep is enabled).
+   * @param conf conf to use
+   * @param hsi the info if this region server
+   * @param fs handle to the filesystem
+   * @param oldLogDir directory where logs are archived
+   * @param stopper This is set when we are to shut down.
+   * @throws IOException
+   */
+  public Replication(final Server server, FileSystem fs, Path logDir,
+      Path oldLogDir) throws IOException {
+    this.server = server;
+    this.conf = this.server.getConfiguration();
+    this.replication =
+        conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
+    if (replication) {
+      // REENALBE
+//      this.zkHelper = new ReplicationZookeeperWrapper(
+//        ZooKeeperWrapper.getInstance(conf, hsi.getServerName()), conf,
+//        this.replicating, hsi.getServerName());
+//      this.replicationMaster = zkHelper.isReplicationMaster();
+//      this.replicationManager = this.replicationMaster ?
+//        new ReplicationSourceManager(zkHelper, conf, stopRequested,
+//          fs, this.replicating, logDir, oldLogDir) : null;
+    } else {
+ // REENABLE     replicationManager = null;
+ // REENALBE     zkHelper = null;
+    }
+  }
+
+  /**
+   * Join with the replication threads
+   */
+  public void join() {
+    if (this.replication) {
+      if (this.replicationMaster) {
+// REENABLE        this.replicationManager.join();
+      }
+// REENABLE      this.zkHelper.deleteOwnRSZNode();
+    }
+  }
+
+  /**
+   * Carry on the list of log entries down to the sink
+   * @param entries list of entries to replicate
+   * @throws IOException
+   */
+  public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+    if (this.replication && !this.replicationMaster) {
+      this.replicationSink.replicateEntries(entries);
+    }
+  }
+
+  /**
+   * If replication is enabled and this cluster is a master,
+   * it starts
+   * @throws IOException
+   */
+  public void startReplicationServices() throws IOException {
+    if (this.replication) {
+      if (this.replicationMaster) {
+        // REENALBE          this.replicationManager.init();
+      } else {
+        this.replicationSink =
+            new ReplicationSink(this.conf, this.server);
+      }
+    }
+  }
+
+  /**
+   * Get the replication sources manager
+   * @return the manager if replication is enabled, else returns false
+   */
+  public ReplicationSourceManager getReplicationManager() {
+    return  null; // REENALBE   replicationManager;
+  }
+
+  @Override
+  public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+                                       WALEdit logEdit) {
+    NavigableMap<byte[], Integer> scopes =
+        new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    byte[] family;
+    for (KeyValue kv : logEdit.getKeyValues()) {
+      family = kv.getFamily();
+      int scope = info.getTableDesc().getFamily(family).getScope();
+      if (scope != HConstants.REPLICATION_SCOPE_LOCAL &&
+          !scopes.containsKey(family)) {
+        scopes.put(family, scope);
+      }
+    }
+    if (!scopes.isEmpty()) {
+      logEdit.setScopes(scopes);
+    }
+  }
+
+  /**
+   * Add this class as a log entry visitor for HLog if replication is enabled
+   * @param hlog log that was add ourselves on
+   */
+  public void addLogEntryVisitor(HLog hlog) {
+    if (replication) {
+      hlog.addLogEntryVisitor(this);
+    }
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.Stoppable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class is responsible for replicating the edits coming
+ * from another cluster.
+ * <p/>
+ * This replication process is currently waiting for the edits to be applied
+ * before the method can return. This means that the replication of edits
+ * is synchronized (after reading from HLogs in ReplicationSource) and that a
+ * single region server cannot receive edits from two sources at the same time
+ * <p/>
+ * This class uses the native HBase client in order to replicate entries.
+ * <p/>
+ *
+ * TODO make this class more like ReplicationSource wrt log handling
+ */
+public class ReplicationSink {
+
+  private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
+  // Name of the HDFS directory that contains the temporary rep logs
+  public static final String REPLICATION_LOG_DIR = ".replogs";
+  private final Configuration conf;
+  // Pool used to replicated
+  private final HTablePool pool;
+  // Chain to pull on when we want all to stop.
+  private final Stoppable stopper;
+  private final ReplicationSinkMetrics metrics;
+
+  /**
+   * Create a sink for replication
+   *
+   * @param conf                conf object
+   * @param stopper             boolean to tell this thread to stop
+   * @throws IOException thrown when HDFS goes bad or bad file name
+   */
+  public ReplicationSink(Configuration conf, Stoppable stopper)
+      throws IOException {
+    this.conf = conf;
+    this.pool = new HTablePool(this.conf,
+        conf.getInt("replication.sink.htablepool.capacity", 10));
+    this.stopper = stopper;
+    this.metrics = new ReplicationSinkMetrics();
+  }
+
+  /**
+   * Replicate this array of entries directly into the local cluster
+   * using the native client.
+   *
+   * @param entries
+   * @throws IOException
+   */
+  public synchronized void replicateEntries(HLog.Entry[] entries)
+      throws IOException {
+    if (entries.length == 0) {
+      return;
+    }
+    // Very simple optimization where we batch sequences of rows going
+    // to the same table.
+    try {
+      long totalReplicated = 0;
+      byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY;
+      List<Put> puts = new ArrayList<Put>();
+      for (HLog.Entry entry : entries) {
+        WALEdit edit = entry.getEdit();
+        List<KeyValue> kvs = edit.getKeyValues();
+        if (kvs.get(0).isDelete()) {
+          Delete delete = new Delete(kvs.get(0).getRow(),
+              kvs.get(0).getTimestamp(), null);
+          for (KeyValue kv : kvs) {
+            if (kv.isDeleteFamily()) {
+              delete.deleteFamily(kv.getFamily());
+            } else if (!kv.isEmptyColumn()) {
+              delete.deleteColumn(kv.getFamily(),
+                  kv.getQualifier());
+            }
+          }
+          delete(entry.getKey().getTablename(), delete);
+        } else {
+          // Switching table, flush
+          if (!Bytes.equals(lastTable, entry.getKey().getTablename())) {
+            put(lastTable, puts);
+          }
+          // With mini-batching, we need to expect multiple rows per edit
+          byte[] lastKey = kvs.get(0).getRow();
+          Put put = new Put(kvs.get(0).getRow(),
+              kvs.get(0).getTimestamp());
+          for (KeyValue kv : kvs) {
+            if (!Bytes.equals(lastKey, kv.getRow())) {
+              puts.add(put);
+              put = new Put(kv.getRow(), kv.getTimestamp());
+            }
+            put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
+            lastKey = kv.getRow();
+          }
+          puts.add(put);
+          lastTable = entry.getKey().getTablename();
+        }
+        totalReplicated++;
+      }
+      put(lastTable, puts);
+      this.metrics.setAgeOfLastAppliedOp(
+          entries[entries.length-1].getKey().getWriteTime());
+      this.metrics.appliedBatchesRate.inc(1);
+      LOG.info("Total replicated: " + totalReplicated);
+    } catch (IOException ex) {
+      if (ex.getCause() instanceof TableNotFoundException) {
+        LOG.warn("Losing edits because: ", ex);
+      } else {
+        // Should we log rejected edits in a file for replay?
+        LOG.error("Unable to accept edit because", ex);
+        this.stopper.stop("Unable to accept edit because " + ex.getMessage());
+        throw ex;
+      }
+    } catch (RuntimeException re) {
+      if (re.getCause() instanceof TableNotFoundException) {
+        LOG.warn("Losing edits because: ", re);
+      } else {
+        this.stopper.stop("Replication stopped us because " + re.getMessage());
+        throw re;
+      }
+    }
+  }
+
+  /**
+   * Do the puts and handle the pool
+   * @param tableName table to insert into
+   * @param puts list of puts
+   * @throws IOException
+   */
+  private void put(byte[] tableName, List<Put> puts) throws IOException {
+    if (puts.isEmpty()) {
+      return;
+    }
+    HTableInterface table = null;
+    try {
+      table = this.pool.getTable(tableName);
+      table.put(puts);
+      this.metrics.appliedOpsRate.inc(puts.size());
+      this.pool.putTable(table);
+      puts.clear();
+    } finally {
+      if (table != null) {
+        this.pool.putTable(table);
+      }
+    }
+  }
+
+  /**
+   * Do the delete and handle the pool
+   * @param tableName table to delete in
+   * @param delete the delete to use
+   * @throws IOException
+   */
+  private void delete(byte[] tableName, Delete delete) throws IOException {
+    HTableInterface table = null;
+    try {
+      table = this.pool.getTable(tableName);
+      table.delete(delete);
+      this.metrics.appliedOpsRate.inc(1);
+      this.pool.putTable(table);
+    } finally {
+      if (table != null) {
+        this.pool.putTable(table);
+      }
+    }
+  }
+}

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,81 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+import org.apache.hadoop.hbase.metrics.MetricsRate;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics.util.MetricsIntValue;
+import org.apache.hadoop.metrics.util.MetricsLongValue;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+
+/**
+ * This class is for maintaining the various replication statistics
+ * for a sink and publishing them through the metrics interfaces.
+ */
+public class ReplicationSinkMetrics implements Updater {
+  private final MetricsRecord metricsRecord;
+  private MetricsRegistry registry = new MetricsRegistry();
+  private static ReplicationSinkMetrics instance;
+
+  /** Rate of operations applied by the sink */
+  public final MetricsRate appliedOpsRate =
+      new MetricsRate("appliedOpsRate", registry);
+
+  /** Rate of batches (of operations) applied by the sink */
+  public final MetricsRate appliedBatchesRate =
+      new MetricsRate("appliedBatchesRate", registry);
+
+  /** Age of the last operation that was applied by the sink */
+  private final MetricsLongValue ageOfLastAppliedOp =
+      new MetricsLongValue("ageOfLastAppliedOp", registry);
+
+  /**
+   * Constructor used to register the metrics
+   */
+  public ReplicationSinkMetrics() {
+    MetricsContext context = MetricsUtil.getContext("hbase");
+    String name = Thread.currentThread().getName();
+    metricsRecord = MetricsUtil.createRecord(context, "replication");
+    metricsRecord.setTag("RegionServer", name);
+    context.registerUpdater(this);
+    // export for JMX
+    new ReplicationStatistics(this.registry, "ReplicationSink");
+  }
+
+  /**
+   * Set the age of the last edit that was applied
+   * @param timestamp write time of the edit
+   */
+  public void setAgeOfLastAppliedOp(long timestamp) {
+    ageOfLastAppliedOp.set(System.currentTimeMillis() - timestamp);
+  }
+  @Override
+  public void doUpdates(MetricsContext metricsContext) {
+    synchronized (this) {
+      this.appliedOpsRate.pushMetric(this.metricsRecord);
+      this.appliedBatchesRate.pushMetric(this.metricsRecord);
+      this.ageOfLastAppliedOp.pushMetric(this.metricsRecord);
+    }
+    this.metricsRecord.update();
+  }
+}

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,696 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Class that handles the source of a replication stream.
+ * Currently does not handle more than 1 slave
+ * For each slave cluster it selects a random number of peers
+ * using a replication ratio. For example, if replication ration = 0.1
+ * and slave cluster has 100 region servers, 10 will be selected.
+ * <p/>
+ * A stream is considered down when we cannot contact a region server on the
+ * peer cluster for more than 55 seconds by default.
+ * <p/>
+ *
+ */
+public class ReplicationSource extends Thread
+    implements ReplicationSourceInterface {
+
+  private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
+  // Queue of logs to process
+  private PriorityBlockingQueue<Path> queue;
+  // container of entries to replicate
+  private HLog.Entry[] entriesArray;
+  private HConnection conn;
+  // Helper class for zookeeper
+  private ReplicationZookeeperWrapper zkHelper;
+  private Configuration conf;
+  // ratio of region servers to chose from a slave cluster
+  private float ratio;
+  private Random random;
+  // should we replicate or not?
+  private AtomicBoolean replicating;
+  // id of the peer cluster this source replicates to
+  private String peerClusterId;
+  // The manager of all sources to which we ping back our progress
+  private ReplicationSourceManager manager;
+  // Should we stop everything?
+  private AtomicBoolean stop;
+  // List of chosen sinks (region servers)
+  private List<HServerAddress> currentPeers;
+  // How long should we sleep for each retry
+  private long sleepForRetries;
+  // Max size in bytes of entriesArray
+  private long replicationQueueSizeCapacity;
+  // Max number of entries in entriesArray
+  private int replicationQueueNbCapacity;
+  // Our reader for the current log
+  private HLog.Reader reader;
+  // Current position in the log
+  private long position = 0;
+  // Path of the current log
+  private volatile Path currentPath;
+  private FileSystem fs;
+  // id of this cluster
+  private byte clusterId;
+  // total number of edits we replicated
+  private long totalReplicatedEdits = 0;
+  // The znode we currently play with
+  private String peerClusterZnode;
+  // Indicates if this queue is recovered (and will be deleted when depleted)
+  private boolean queueRecovered;
+  // List of all the dead region servers that had this queue (if recovered)
+  private String[] deadRegionServers;
+  // Maximum number of retries before taking bold actions
+  private long maxRetriesMultiplier;
+  // Current number of entries that we need to replicate
+  private int currentNbEntries = 0;
+  // Current number of operations (Put/Delete) that we need to replicate
+  private int currentNbOperations = 0;
+  // Indicates if this particular source is running
+  private volatile boolean running = true;
+  // Metrics for this source
+  private ReplicationSourceMetrics metrics;
+
+  /**
+   * Instantiation method used by region servers
+   *
+   * @param conf configuration to use
+   * @param fs file system to use
+   * @param manager replication manager to ping to
+   * @param stopper     the atomic boolean to use to stop the regionserver
+   * @param replicating the atomic boolean that starts/stops replication
+   * @param peerClusterZnode the name of our znode
+   * @throws IOException
+   */
+  public void init(final Configuration conf,
+                   final FileSystem fs,
+                   final ReplicationSourceManager manager,
+                   final AtomicBoolean stopper,
+                   final AtomicBoolean replicating,
+                   final String peerClusterZnode)
+      throws IOException {
+    this.stop = stopper;
+    this.conf = conf;
+    this.replicationQueueSizeCapacity =
+        this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
+    this.replicationQueueNbCapacity =
+        this.conf.getInt("replication.source.nb.capacity", 25000);
+    this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity];
+    for (int i = 0; i < this.replicationQueueNbCapacity; i++) {
+      this.entriesArray[i] = new HLog.Entry();
+    }
+    this.maxRetriesMultiplier =
+        this.conf.getLong("replication.source.maxretriesmultiplier", 10);
+    this.queue =
+        new PriorityBlockingQueue<Path>(
+            conf.getInt("hbase.regionserver.maxlogs", 32),
+            new LogsComparator());
+    this.conn = HConnectionManager.getConnection(conf);
+ // REENABLE   this.zkHelper = manager.getRepZkWrapper();
+    this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
+    this.currentPeers = new ArrayList<HServerAddress>();
+    this.random = new Random();
+    this.replicating = replicating;
+    this.manager = manager;
+    this.sleepForRetries =
+        this.conf.getLong("replication.source.sleepforretries", 1000);
+    this.fs = fs;
+ // REENALBE   this.clusterId = Byte.valueOf(zkHelper.getClusterId());
+    this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
+
+    // Finally look if this is a recovered queue
+    this.checkIfQueueRecovered(peerClusterZnode);
+  }
+
+  // The passed znode will be either the id of the peer cluster or
+  // the handling story of that queue in the form of id-servername-*
+  private void checkIfQueueRecovered(String peerClusterZnode) {
+    String[] parts = peerClusterZnode.split("-");
+    this.queueRecovered = parts.length != 1;
+    this.peerClusterId = this.queueRecovered ?
+        parts[0] : peerClusterZnode;
+    this.peerClusterZnode = peerClusterZnode;
+    this.deadRegionServers = new String[parts.length-1];
+    // Extract all the places where we could find the hlogs
+    for (int i = 1; i < parts.length; i++) {
+      this.deadRegionServers[i-1] = parts[i];
+    }
+  }
+
+  /**
+   * Select a number of peers at random using the ratio. Mininum 1.
+   */
+  private void chooseSinks() {
+    // REENABLE
+//    this.currentPeers.clear();
+//    List<HServerAddress> addresses =
+//        this.zkHelper.getPeersAddresses(peerClusterId);
+//    Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
+//    int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
+//    LOG.info("Getting " + nbPeers +
+//        " rs from peer cluster # " + peerClusterId);
+//    for (int i = 0; i < nbPeers; i++) {
+//      HServerAddress address;
+//      // Make sure we get one address that we don't already have
+//      do {
+//        address = addresses.get(this.random.nextInt(addresses.size()));
+//      } while (setOfAddr.contains(address));
+//      LOG.info("Choosing peer " + address);
+//      setOfAddr.add(address);
+//    }
+//    this.currentPeers.addAll(setOfAddr);
+  }
+
+  @Override
+  public void enqueueLog(Path log) {
+    this.queue.put(log);
+    this.metrics.sizeOfLogQueue.set(queue.size());
+  }
+
+  @Override
+  public void run() {
+    connectToPeers();
+    // We were stopped while looping to connect to sinks, just abort
+    if (this.stop.get()) {
+      return;
+    }
+    // If this is recovered, the queue is already full and the first log
+    // normally has a position (unless the RS failed between 2 logs)
+    if (this.queueRecovered) {
+//      this.position = this.zkHelper.getHLogRepPosition(
+//          this.peerClusterZnode, this.queue.peek().getName());
+    }
+    int sleepMultiplier = 1;
+    // Loop until we close down
+    while (!stop.get() && this.running) {
+      // Get a new path
+      if (!getNextPath()) {
+        if (sleepForRetries("No log to process", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+        continue;
+      }
+      // Open a reader on it
+      if (!openReader(sleepMultiplier)) {
+        // Reset the sleep multiplier, else it'd be reused for the next file
+        sleepMultiplier = 1;
+        continue;
+      }
+
+      // If we got a null reader but didn't continue, then sleep and continue
+      if (this.reader == null) {
+        if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+        continue;
+      }
+
+      boolean gotIOE = false;
+      currentNbEntries = 0;
+      try {
+        if(readAllEntriesToReplicateOrNextFile()) {
+          continue;
+        }
+      } catch (IOException ioe) {
+        LOG.warn(peerClusterZnode + " Got: ", ioe);
+        gotIOE = true;
+        if (ioe.getCause() instanceof EOFException) {
+
+          boolean considerDumping = false;
+          if (this.queueRecovered) {
+            try {
+              FileStatus stat = this.fs.getFileStatus(this.currentPath);
+              if (stat.getLen() == 0) {
+                LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
+              }
+              considerDumping = true;
+            } catch (IOException e) {
+              LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
+            }
+          } else if (currentNbEntries != 0) {
+            LOG.warn(peerClusterZnode + " Got EOF while reading, " +
+                "looks like this file is broken? " + currentPath);
+            considerDumping = true;
+            currentNbEntries = 0;
+          }
+
+          if (considerDumping &&
+              sleepMultiplier == this.maxRetriesMultiplier &&
+              processEndOfFile()) {
+            continue;
+          }
+        }
+      } finally {
+        try {
+          // if current path is null, it means we processEndOfFile hence
+          if (this.currentPath != null && !gotIOE) {
+            this.position = this.reader.getPosition();
+          }
+          if (this.reader != null) {
+            this.reader.close();
+          }
+        } catch (IOException e) {
+          gotIOE = true;
+          LOG.warn("Unable to finalize the tailing of a file", e);
+        }
+      }
+
+      // If we didn't get anything to replicate, or if we hit a IOE,
+      // wait a bit and retry.
+      // But if we need to stop, don't bother sleeping
+      if (!stop.get() && (gotIOE || currentNbEntries == 0)) {
+        if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+        continue;
+      }
+      sleepMultiplier = 1;
+      shipEdits();
+
+    }
+    LOG.debug("Source exiting " + peerClusterId);
+  }
+
+  /**
+   * Read all the entries from the current log files and retain those
+   * that need to be replicated. Else, process the end of the current file.
+   * @return true if we got nothing and went to the next file, false if we got
+   * entries
+   * @throws IOException
+   */
+  protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{
+    long seenEntries = 0;
+    if (this.position != 0) {
+      this.reader.seek(this.position);
+    }
+    HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]);
+    while (entry != null) {
+      WALEdit edit = entry.getEdit();
+      this.metrics.logEditsReadRate.inc(1);
+      seenEntries++;
+      // Remove all KVs that should not be replicated
+      removeNonReplicableEdits(edit);
+      HLogKey logKey = entry.getKey();
+      // Don't replicate catalog entries, if the WALEdit wasn't
+      // containing anything to replicate and if we're currently not set to replicate
+      if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
+          Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
+          edit.size() != 0 && replicating.get()) {
+        logKey.setClusterId(this.clusterId);
+        currentNbOperations += countDistinctRowKeys(edit);
+        currentNbEntries++;
+      } else {
+        this.metrics.logEditsFilteredRate.inc(1);
+      }
+      // Stop if too many entries or too big
+      if ((this.reader.getPosition() - this.position)
+          >= this.replicationQueueSizeCapacity ||
+          currentNbEntries >= this.replicationQueueNbCapacity) {
+        break;
+      }
+      entry = this.reader.next(entriesArray[currentNbEntries]);
+    }
+    LOG.debug("currentNbOperations:" + currentNbOperations +
+        " and seenEntries:" + seenEntries +
+        " and size: " + (this.reader.getPosition() - this.position));
+    // If we didn't get anything and the queue has an object, it means we
+    // hit the end of the file for sure
+    return seenEntries == 0 && processEndOfFile();
+  }
+
+  private void connectToPeers() {
+    // Connect to peer cluster first, unless we have to stop
+    while (!this.stop.get() && this.currentPeers.size() == 0) {
+      try {
+        chooseSinks();
+        Thread.sleep(this.sleepForRetries);
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while trying to connect to sinks", e);
+      }
+    }
+  }
+
+  /**
+   * Poll for the next path
+   * @return true if a path was obtained, false if not
+   */
+  protected boolean getNextPath() {
+    try {
+      if (this.currentPath == null) {
+        this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
+        this.metrics.sizeOfLogQueue.set(queue.size());
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while reading edits", e);
+    }
+    return this.currentPath != null;
+  }
+
+  /**
+   * Open a reader on the current path
+   *
+   * @param sleepMultiplier by how many times the default sleeping time is augmented
+   * @return true if we should continue with that file, false if we are over with it
+   */
+  protected boolean openReader(int sleepMultiplier) {
+    // REENABLE
+//    try {
+//      LOG.info("Opening log for replication " + this.currentPath.getName() +
+//          " at " + this.position);
+//      try {
+//       this.reader = null;
+//       this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
+//      } catch (FileNotFoundException fnfe) {
+//        if (this.queueRecovered) {
+//          // We didn't find the log in the archive directory, look if it still
+//          // exists in the dead RS folder (there could be a chain of failures
+//          // to look at)
+//          for (int i = this.deadRegionServers.length - 1; i > 0; i--) {
+//            Path deadRsDirectory =
+//                new Path(this.manager.getLogDir(), this.deadRegionServers[i]);
+//            Path possibleLogLocation =
+//                new Path(deadRsDirectory, currentPath.getName());
+//            if (this.manager.getFs().exists(possibleLogLocation)) {
+//              // We found the right new location
+//              LOG.info("Log " + this.currentPath + " still exists at " +
+//                  possibleLogLocation);
+//              // Breaking here will make us sleep since reader is null
+//              break;
+//            }
+//          }
+//          // TODO What happens if the log was missing from every single location?
+//          // Although we need to check a couple of times as the log could have
+//          // been moved by the master between the checks
+//        } else {
+//          // If the log was archived, continue reading from there
+//          Path archivedLogLocation =
+//              new Path(manager.getOldLogDir(), currentPath.getName());
+//          if (this.manager.getFs().exists(archivedLogLocation)) {
+//            currentPath = archivedLogLocation;
+//            LOG.info("Log " + this.currentPath + " was moved to " +
+//                archivedLogLocation);
+//            // Open the log at the new location
+//            this.openReader(sleepMultiplier);
+//
+//          }
+//          // TODO What happens the log is missing in both places?
+//        }
+//      }
+//    } catch (IOException ioe) {
+//      LOG.warn(peerClusterZnode + " Got: ", ioe);
+//      // TODO Need a better way to determinate if a file is really gone but
+//      // TODO without scanning all logs dir
+//      if (sleepMultiplier == this.maxRetriesMultiplier) {
+//        LOG.warn("Waited too long for this file, considering dumping");
+//        return !processEndOfFile();
+//      }
+//    }
+    return true;
+  }
+
+  /**
+   * Do the sleeping logic
+   * @param msg Why we sleep
+   * @param sleepMultiplier by how many times the default sleeping time is augmented
+   * @return
+   */
+  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
+    try {
+      LOG.debug(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+      Thread.sleep(this.sleepForRetries * sleepMultiplier);
+    } catch (InterruptedException e) {
+      LOG.debug("Interrupted while sleeping between retries");
+    }
+    return sleepMultiplier < maxRetriesMultiplier;
+  }
+
+  /**
+   * We only want KVs that are scoped other than local
+   * @param edit The KV to check for replication
+   */
+  protected void removeNonReplicableEdits(WALEdit edit) {
+    NavigableMap<byte[], Integer> scopes = edit.getScopes();
+    List<KeyValue> kvs = edit.getKeyValues();
+    for (int i = 0; i < edit.size(); i++) {
+      KeyValue kv = kvs.get(i);
+      // The scope will be null or empty if
+      // there's nothing to replicate in that WALEdit
+      if (scopes == null || !scopes.containsKey(kv.getFamily())) {
+        kvs.remove(i);
+        i--;
+      }
+    }
+  }
+
+  /**
+   * Count the number of different row keys in the given edit because of
+   * mini-batching. We assume that there's at least one KV in the WALEdit.
+   * @param edit edit to count row keys from
+   * @return number of different row keys
+   */
+  private int countDistinctRowKeys(WALEdit edit) {
+    List<KeyValue> kvs = edit.getKeyValues();
+    int distinctRowKeys = 1;
+    KeyValue lastKV = kvs.get(0);
+    for (int i = 0; i < edit.size(); i++) {
+      if (!kvs.get(i).matchingRow(lastKV)) {
+        distinctRowKeys++;
+      }
+    }
+    return distinctRowKeys;
+  }
+
+  /**
+   * Do the shipping logic
+   */
+  protected void shipEdits() {
+    // REENABLE
+//    int sleepMultiplier = 1;
+//    while (!stop.get()) {
+//      try {
+//        HRegionInterface rrs = getRS();
+//        LOG.debug("Replicating " + currentNbEntries);
+//        rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
+//        this.manager.logPositionAndCleanOldLogs(this.currentPath,
+//            this.peerClusterZnode, this.position, queueRecovered);
+//        this.totalReplicatedEdits += currentNbEntries;
+//        this.metrics.shippedBatchesRate.inc(1);
+//        this.metrics.shippedOpsRate.inc(
+//            this.currentNbOperations);
+//        this.metrics.setAgeOfLastShippedOp(
+//            this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime());
+//        LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
+//        break;
+//
+//      } catch (IOException ioe) {
+//        LOG.warn("Unable to replicate because ", ioe);
+//        try {
+//          boolean down;
+//          do {
+//            down = isSlaveDown();
+//            if (down) {
+//              LOG.debug("The region server we tried to ping didn't answer, " +
+//                  "sleeping " + sleepForRetries + " times " + sleepMultiplier);
+//              Thread.sleep(this.sleepForRetries * sleepMultiplier);
+//              if (sleepMultiplier < maxRetriesMultiplier) {
+//                sleepMultiplier++;
+//              } else {
+//                chooseSinks();
+//              }
+//            }
+//          } while (!stop.get() && down);
+//        } catch (InterruptedException e) {
+//          LOG.debug("Interrupted while trying to contact the peer cluster");
+//        }
+//
+//      }
+//    }
+  }
+
+  /**
+   * If the queue isn't empty, switch to the next one
+   * Else if this is a recovered queue, it means we're done!
+   * Else we'll just continue to try reading the log file
+   * @return true if we're done with the current file, false if we should
+   * continue trying to read from it
+   */
+  protected boolean processEndOfFile() {
+    // REENABLE
+//    if (this.queue.size() != 0) {
+//      this.currentPath = null;
+//      this.position = 0;
+//      return true;
+//    } else if (this.queueRecovered) {
+//      this.manager.closeRecoveredQueue(this);
+//      this.abort();
+//      return true;
+//    }
+    return false;
+  }
+
+  public void startup() {
+    String n = Thread.currentThread().getName();
+    Thread.UncaughtExceptionHandler handler =
+        new Thread.UncaughtExceptionHandler() {
+          public void uncaughtException(final Thread t, final Throwable e) {
+            LOG.fatal("Set stop flag in " + t.getName(), e);
+            abort();
+          }
+        };
+    Threads.setDaemonThreadRunning(
+        this, n + ".replicationSource," + clusterId, handler);
+  }
+
+  /**
+   * Hastily stop the replication, then wait for shutdown
+   */
+  private void abort() {
+    LOG.info("abort");
+    this.running = false;
+    terminate();
+  }
+
+  public void terminate() {
+    LOG.info("terminate");
+    Threads.shutdown(this, this.sleepForRetries);
+  }
+
+  /**
+   * Get a new region server at random from this peer
+   * @return
+   * @throws IOException
+   */
+  private HRegionInterface getRS() throws IOException {
+    if (this.currentPeers.size() == 0) {
+      throw new IOException(this.peerClusterZnode + " has 0 region servers");
+    }
+    HServerAddress address =
+        currentPeers.get(random.nextInt(this.currentPeers.size()));
+    return this.conn.getHRegionConnection(address);
+  }
+
+  /**
+   * Check if the slave is down by trying to establish a connection
+   * @return true if down, false if up
+   * @throws InterruptedException
+   */
+  public boolean isSlaveDown() throws InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(1);
+    Thread pingThread = new Thread() {
+      public void run() {
+        try {
+          HRegionInterface rrs = getRS();
+          // Dummy call which should fail
+          rrs.getHServerInfo();
+          latch.countDown();
+        } catch (IOException ex) {
+          LOG.info("Slave cluster looks down: " + ex.getMessage());
+        }
+      }
+    };
+    pingThread.start();
+    // awaits returns true if countDown happened
+    boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS);
+    pingThread.interrupt();
+    return down;
+  }
+
+  /**
+   * Get the id that the source is replicating to
+   *
+   * @return peer cluster id
+   */
+  public String getPeerClusterZnode() {
+    return this.peerClusterZnode;
+  }
+
+  /**
+   * Get the path of the current HLog
+   * @return current hlog's path
+   */
+  public Path getCurrentPath() {
+    return this.currentPath;
+  }
+
+  /**
+   * Comparator used to compare logs together based on their start time
+   */
+  public static class LogsComparator implements Comparator<Path> {
+
+    @Override
+    public int compare(Path o1, Path o2) {
+      return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return true;
+    }
+
+    /**
+     * Split a path to get the start time
+     * For example: 10.20.20.171%3A60020.1277499063250
+     * @param p path to split
+     * @return start time
+     */
+    private long getTS(Path p) {
+      String[] parts = p.getName().split("\\.");
+      return Long.parseLong(parts[parts.length-1]);
+    }
+  }
+
+}

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Interface that defines a replication source
+ */
+public interface ReplicationSourceInterface {
+
+  /**
+   * Initializer for the source
+   * @param conf the configuration to use
+   * @param fs the file system to use
+   * @param manager the manager to use
+   * @param stopper the stopper object for this region server
+   * @param replicating the status of the replication on this cluster
+   * @param peerClusterId the id of the peer cluster
+   * @throws IOException
+   */
+  public void init(final Configuration conf,
+                   final FileSystem fs,
+                   final ReplicationSourceManager manager,
+                   final AtomicBoolean stopper,
+                   final AtomicBoolean replicating,
+                   final String peerClusterId) throws IOException;
+
+  /**
+   * Add a log to the list of logs to replicate
+   * @param log path to the log to replicate
+   */
+  public void enqueueLog(Path log);
+
+  /**
+   * Get the current log that's replicated
+   * @return the current log
+   */
+  public Path getCurrentPath();
+
+  /**
+   * Start the replication
+   */
+  public void startup();
+
+  /**
+   * End the replication
+   */
+  public void terminate();
+
+  /**
+   * Get the id that the source is replicating to
+   *
+   * @return peer cluster id
+   */
+  public String getPeerClusterZnode();
+}

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,383 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.LogActionsListener;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class is responsible to manage all the replication
+ * sources. There are two classes of sources:
+ * <li> Normal sources are persistent and one per peer cluster</li>
+ * <li> Old sources are recovered from a failed region server and our
+ * only goal is to finish replicating the HLog queue it had up in ZK</li>
+ *
+ * When a region server dies, this class uses a watcher to get notified and it
+ * tries to grab a lock in order to transfer all the queues in a local
+ * old source.
+ */
+public class ReplicationSourceManager implements LogActionsListener {
+
+  @Override
+  public void logRolled(Path newFile) {
+    // TODO Auto-generated method stub
+    
+  }
+  // REENABLE
+//
+//  private static final Log LOG =
+//      LogFactory.getLog(ReplicationSourceManager.class);
+//  // List of all the sources that read this RS's logs
+//  private final List<ReplicationSourceInterface> sources;
+//  // List of all the sources we got from died RSs
+//  private final List<ReplicationSourceInterface> oldsources;
+//  // Indicates if we are currently replicating
+//  private final AtomicBoolean replicating;
+//  // Helper for zookeeper
+//  private final ReplicationZookeeperWrapper zkHelper;
+//  // Indicates if the region server is closing
+//  private final AtomicBoolean stopper;
+//  // All logs we are currently trackign
+//  private final SortedSet<String> hlogs;
+//  private final Configuration conf;
+//  private final FileSystem fs;
+//  // The path to the latest log we saw, for new coming sources
+//  private Path latestPath;
+//  // List of all the other region servers in this cluster
+//  private final List<String> otherRegionServers;
+//  // Path to the hlogs directories
+//  private final Path logDir;
+//  // Path to the hlog archive
+//  private final Path oldLogDir;
+//
+//  /**
+//   * Creates a replication manager and sets the watch on all the other
+//   * registered region servers
+//   * @param zkHelper the zk helper for replication
+//   * @param conf the configuration to use
+//   * @param stopper the stopper object for this region server
+//   * @param fs the file system to use
+//   * @param replicating the status of the replication on this cluster
+//   * @param logDir the directory that contains all hlog directories of live RSs
+//   * @param oldLogDir the directory where old logs are archived
+//   */
+//  public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper,
+//                                  final Configuration conf,
+//                                  final AtomicBoolean stopper,
+//                                  final FileSystem fs,
+//                                  final AtomicBoolean replicating,
+//                                  final Path logDir,
+//                                  final Path oldLogDir) {
+//    this.sources = new ArrayList<ReplicationSourceInterface>();
+//    this.replicating = replicating;
+//    this.zkHelper = zkHelper;
+//    this.stopper = stopper;
+//    this.hlogs = new TreeSet<String>();
+//    this.oldsources = new ArrayList<ReplicationSourceInterface>();
+//    this.conf = conf;
+//    this.fs = fs;
+//    this.logDir = logDir;
+//    this.oldLogDir = oldLogDir;
+//    List<String> otherRSs =
+//        this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher());
+//    this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
+//  }
+//
+//  /**
+//   * Provide the id of the peer and a log key and this method will figure which
+//   * hlog it belongs to and will log, for this region server, the current
+//   * position. It will also clean old logs from the queue.
+//   * @param log Path to the log currently being replicated from
+//   * replication status in zookeeper. It will also delete older entries.
+//   * @param id id of the peer cluster
+//   * @param position current location in the log
+//   * @param queueRecovered indicates if this queue comes from another region server
+//   */
+//  public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
+//    String key = log.getName();
+//    LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
+//    this.zkHelper.writeReplicationStatus(key.toString(), id, position);
+//    synchronized (this.hlogs) {
+//      if (!queueRecovered && this.hlogs.first() != key) {
+//        SortedSet<String> hlogSet = this.hlogs.headSet(key);
+//        LOG.info("Removing " + hlogSet.size() +
+//            " logs in the list: " + hlogSet);
+//        for (String hlog : hlogSet) {
+//          this.zkHelper.removeLogFromList(hlog.toString(), id);
+//        }
+//        hlogSet.clear();
+//      }
+//    }
+//  }
+//
+//  /**
+//   * Adds a normal source per registered peer cluster and tries to process all
+//   * old region server hlog queues
+//   */
+//  public void init() throws IOException {
+//    for (String id : this.zkHelper.getPeerClusters().keySet()) {
+//      ReplicationSourceInterface src = addSource(id);
+//      src.startup();
+//    }
+//    List<String> currentReplicators = this.zkHelper.getListOfReplicators(null);
+//    synchronized (otherRegionServers) {
+//      LOG.info("Current list of replicators: " + currentReplicators
+//          + " other RSs: " + otherRegionServers);
+//    }
+//    // Look if there's anything to process after a restart
+//    for (String rs : currentReplicators) {
+//      synchronized (otherRegionServers) {
+//        if (!this.otherRegionServers.contains(rs)) {
+//          transferQueues(rs);
+//        }
+//      }
+//    }
+//  }
+//
+//  /**
+//   * Add a new normal source to this region server
+//   * @param id the id of the peer cluster
+//   * @return the created source
+//   * @throws IOException
+//   */
+//  public ReplicationSourceInterface addSource(String id) throws IOException {
+//    ReplicationSourceInterface src =
+//        getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
+//    this.sources.add(src);
+//    synchronized (this.hlogs) {
+//      if (this.hlogs.size() > 0) {
+//        this.zkHelper.addLogToList(this.hlogs.first(),
+//            this.sources.get(0).getPeerClusterZnode());
+//        src.enqueueLog(this.latestPath);
+//      }
+//    }
+//    return src;
+//  }
+//
+//  /**
+//   * Terminate the replication on this region server
+//   */
+//  public void join() {
+//    if (this.sources.size() == 0) {
+//      this.zkHelper.deleteOwnRSZNode();
+//    }
+//    for (ReplicationSourceInterface source : this.sources) {
+//      source.terminate();
+//    }
+//  }
+//
+//  /**
+//   * Get a copy of the hlogs of the first source on this rs
+//   * @return a sorted set of hlog names
+//   */
+//  protected SortedSet<String> getHLogs() {
+//    return new TreeSet(this.hlogs);
+//  }
+//
+//  /**
+//   * Get a list of all the normal sources of this rs
+//   * @return lis of all sources
+//   */
+//  public List<ReplicationSourceInterface> getSources() {
+//    return this.sources;
+//  }
+//
+//  @Override
+//  public void logRolled(Path newLog) {
+//    if (this.sources.size() > 0) {
+//      this.zkHelper.addLogToList(newLog.getName(),
+//          this.sources.get(0).getPeerClusterZnode());
+//    }
+//    synchronized (this.hlogs) {
+//      this.hlogs.add(newLog.getName());
+//    }
+//    this.latestPath = newLog;
+//    // This only update the sources we own, not the recovered ones
+//    for (ReplicationSourceInterface source : this.sources) {
+//      source.enqueueLog(newLog);
+//    }
+//  }
+//
+//  /**
+//   * Get the ZK help of this manager
+//   * @return the helper
+//   */
+//  public ReplicationZookeeperWrapper getRepZkWrapper() {
+//    return zkHelper;
+//  }
+//
+//  /**
+//   * Factory method to create a replication source
+//   * @param conf the configuration to use
+//   * @param fs the file system to use
+//   * @param manager the manager to use
+//   * @param stopper the stopper object for this region server
+//   * @param replicating the status of the replication on this cluster
+//   * @param peerClusterId the id of the peer cluster
+//   * @return the created source
+//   * @throws IOException
+//   */
+//  public ReplicationSourceInterface getReplicationSource(
+//      final Configuration conf,
+//      final FileSystem fs,
+//      final ReplicationSourceManager manager,
+//      final AtomicBoolean stopper,
+//      final AtomicBoolean replicating,
+//      final String peerClusterId) throws IOException {
+//    ReplicationSourceInterface src;
+//    try {
+//      Class c = Class.forName(conf.get("replication.replicationsource.implementation",
+//          ReplicationSource.class.getCanonicalName()));
+//      src = (ReplicationSourceInterface) c.newInstance();
+//    } catch (Exception e) {
+//      LOG.warn("Passed replication source implemention throws errors, " +
+//          "defaulting to ReplicationSource", e);
+//      src = new ReplicationSource();
+//
+//    }
+//    src.init(conf, fs, manager, stopper, replicating, peerClusterId);
+//    return src;
+//  }
+//
+//  /**
+//   * Transfer all the queues of the specified to this region server.
+//   * First it tries to grab a lock and if it works it will move the
+//   * znodes and finally will delete the old znodes.
+//   *
+//   * It creates one old source for any type of source of the old rs.
+//   * @param rsZnode
+//   */
+//  public void transferQueues(String rsZnode) {
+//    // We try to lock that rs' queue directory
+//    if (this.stopper.get()) {
+//      LOG.info("Not transferring queue since we are shutting down");
+//      return;
+//    }
+//    if (!this.zkHelper.lockOtherRS(rsZnode)) {
+//      return;
+//    }
+//    LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
+//    SortedMap<String, SortedSet<String>> newQueues =
+//        this.zkHelper.copyQueuesFromRS(rsZnode);
+//    if (newQueues == null || newQueues.size() == 0) {
+//      return;
+//    }
+//    this.zkHelper.deleteRsQueues(rsZnode);
+//
+//    for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
+//      String peerId = entry.getKey();
+//      try {
+//        ReplicationSourceInterface src = getReplicationSource(this.conf,
+//            this.fs, this, this.stopper, this.replicating, peerId);
+//        this.oldsources.add(src);
+//        for (String hlog : entry.getValue()) {
+//          src.enqueueLog(new Path(this.oldLogDir, hlog));
+//        }
+//        src.startup();
+//      } catch (IOException e) {
+//        // TODO manage it
+//        LOG.error("Failed creating a source", e);
+//      }
+//    }
+//  }
+//
+//  /**
+//   * Clear the references to the specified old source
+//   * @param src source to clear
+//   */
+//  public void closeRecoveredQueue(ReplicationSourceInterface src) {
+//    LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
+//    this.oldsources.remove(src);
+//    this.zkHelper.deleteSource(src.getPeerClusterZnode());
+//  }
+//
+//  /**
+//   * Watcher used to be notified of the other region server's death
+//   * in the local cluster. It initiates the process to transfer the queues
+//   * if it is able to grab the lock.
+//   */
+//  public class OtherRegionServerWatcher implements Watcher {
+//    @Override
+//    public void process(WatchedEvent watchedEvent) {
+//      LOG.info(" event " + watchedEvent);
+//      if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
+//          watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
+//        return;
+//      }
+//
+//      List<String> newRsList = (zkHelper.getRegisteredRegionServers(this));
+//      if (newRsList == null) {
+//        return;
+//      } else {
+//        synchronized (otherRegionServers) {
+//          otherRegionServers.clear();
+//          otherRegionServers.addAll(newRsList);
+//        }
+//      }
+//      if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
+//        LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
+//        String[] rsZnodeParts = watchedEvent.getPath().split("/");
+//        transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
+//      }
+//    }
+//  }
+//
+//  /**
+//   * Get the directory where hlogs are archived
+//   * @return the directory where hlogs are archived
+//   */
+//  public Path getOldLogDir() {
+//    return this.oldLogDir;
+//  }
+//
+//  /**
+//   * Get the directory where hlogs are stored by their RSs
+//   * @return the directory where hlogs are stored by their RSs
+//   */
+//  public Path getLogDir() {
+//    return this.logDir;
+//  }
+//
+//  /**
+//   * Get the handle on the local file system
+//   * @returnthe handle on the local file system
+//   */
+//  public FileSystem getFs() {
+//    return this.fs;
+//  }
+
+}