You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/27 22:53:16 UTC

svn commit: r990266 [2/2] - in /hbase/branches/0.90_master_rewrite/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/wal/ mai...

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Fri Aug 27 20:53:15 2010
@@ -19,82 +19,84 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.LogEntryVisitor;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
 import org.apache.hadoop.hbase.util.Bytes;
-// REENABLE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-
-import java.io.IOException;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Replication serves as an umbrella over the setup of replication and
  * is used by HRS.
  */
-public class Replication implements LogEntryVisitor {
-
+public class Replication implements WALObserver {
   private final boolean replication;
-// REENALBE  private final ReplicationSourceManager replicationManager;
+  private final ReplicationSourceManager replicationManager;
   private boolean replicationMaster;
   private final AtomicBoolean replicating = new AtomicBoolean(true);
-// REENALBE  private final ReplicationZookeeperWrapper zkHelper;
+  private final ReplicationZookeeperWrapper zkHelper;
   private final Configuration conf;
   private ReplicationSink replicationSink;
+  // Hosting server
   private final Server server;
 
   /**
    * Instantiate the replication management (if rep is enabled).
-   * @param conf conf to use
-   * @param hsi the info if this region server
+   * @param server Hosting server
    * @param fs handle to the filesystem
+   * @param logDir
    * @param oldLogDir directory where logs are archived
-   * @param stopper This is set when we are to shut down.
    * @throws IOException
    */
-  public Replication(final Server server, FileSystem fs, Path logDir,
-      Path oldLogDir) throws IOException {
+  public Replication(final Server server, final FileSystem fs,
+      final Path logDir, final Path oldLogDir)
+  throws IOException {
     this.server = server;
     this.conf = this.server.getConfiguration();
-    this.replication =
-        conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
+    this.replication = isReplication(this.conf);
     if (replication) {
-      // REENALBE
-//      this.zkHelper = new ReplicationZookeeperWrapper(
-//        ZooKeeperWrapper.getInstance(conf, hsi.getServerName()), conf,
-//        this.replicating, hsi.getServerName());
-//      this.replicationMaster = zkHelper.isReplicationMaster();
-//      this.replicationManager = this.replicationMaster ?
-//        new ReplicationSourceManager(zkHelper, conf, stopRequested,
-//          fs, this.replicating, logDir, oldLogDir) : null;
+      this.zkHelper = new ReplicationZookeeperWrapper(server.getZooKeeper(),
+        this.conf, this.replicating, this.server.getServerName());
+      this.replicationMaster = zkHelper.isReplicationMaster();
+      this.replicationManager = this.replicationMaster ?
+        new ReplicationSourceManager(zkHelper, conf, this.server,
+          fs, this.replicating, logDir, oldLogDir) : null;
     } else {
- // REENABLE     replicationManager = null;
- // REENALBE     zkHelper = null;
+      this.replicationManager = null;
+      this.zkHelper = null;
     }
   }
 
   /**
+   * @param c Configuration to look at
+   * @return True if replication is enabled.
+   */
+  public static boolean isReplication(final Configuration c) {
+    return c.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
+  }
+
+  /**
    * Join with the replication threads
    */
   public void join() {
     if (this.replication) {
       if (this.replicationMaster) {
-// REENABLE        this.replicationManager.join();
+        this.replicationManager.join();
       }
-// REENABLE      this.zkHelper.deleteOwnRSZNode();
+        this.zkHelper.deleteOwnRSZNode();
     }
   }
 
@@ -117,10 +119,9 @@ public class Replication implements LogE
   public void startReplicationServices() throws IOException {
     if (this.replication) {
       if (this.replicationMaster) {
-        // REENALBE          this.replicationManager.init();
+        this.replicationManager.init();
       } else {
-        this.replicationSink =
-            new ReplicationSink(this.conf, this.server);
+        this.replicationSink = new ReplicationSink(this.conf, this.server);
       }
     }
   }
@@ -130,12 +131,12 @@ public class Replication implements LogE
    * @return the manager if replication is enabled, else returns false
    */
   public ReplicationSourceManager getReplicationManager() {
-    return  null; // REENALBE   replicationManager;
+    return this.replicationManager;
   }
 
   @Override
   public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
-                                       WALEdit logEdit) {
+      WALEdit logEdit) {
     NavigableMap<byte[], Integer> scopes =
         new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
     byte[] family;
@@ -152,13 +153,13 @@ public class Replication implements LogE
     }
   }
 
-  /**
-   * Add this class as a log entry visitor for HLog if replication is enabled
-   * @param hlog log that was add ourselves on
-   */
-  public void addLogEntryVisitor(HLog hlog) {
-    if (replication) {
-      hlog.addLogEntryVisitor(this);
-    }
+  @Override
+  public void logRolled(Path p) {
+    getReplicationManager().logRolled(p);
+  }
+
+  @Override
+  public void logRollRequested() {
+    // Not interested
   }
 }
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java Fri Aug 27 20:53:15 2010
@@ -19,13 +19,13 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hbase.Stoppable;
 
 /**
  * Interface that defines a replication source
@@ -45,7 +45,7 @@ public interface ReplicationSourceInterf
   public void init(final Configuration conf,
                    final FileSystem fs,
                    final ReplicationSourceManager manager,
-                   final AtomicBoolean stopper,
+                   final Stoppable stopper,
                    final AtomicBoolean replicating,
                    final String peerClusterId) throws IOException;
 

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Fri Aug 27 20:53:15 2010
@@ -25,7 +25,11 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.regionserver.wal.LogActionsListener;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -50,334 +54,324 @@ import java.util.concurrent.atomic.Atomi
  * tries to grab a lock in order to transfer all the queues in a local
  * old source.
  */
-public class ReplicationSourceManager implements LogActionsListener {
+public class ReplicationSourceManager {
+  private static final Log LOG =
+      LogFactory.getLog(ReplicationSourceManager.class);
+  // List of all the sources that read this RS's logs
+  private final List<ReplicationSourceInterface> sources;
+  // List of all the sources we got from died RSs
+  private final List<ReplicationSourceInterface> oldsources;
+  // Indicates if we are currently replicating
+  private final AtomicBoolean replicating;
+  // Helper for zookeeper
+  private final ReplicationZookeeperWrapper zkHelper;
+  // All about stopping
+  private final Stoppable stopper;
+  // All logs we are currently trackign
+  private final SortedSet<String> hlogs;
+  private final Configuration conf;
+  private final FileSystem fs;
+  // The path to the latest log we saw, for new coming sources
+  private Path latestPath;
+  // List of all the other region servers in this cluster
+  private final List<String> otherRegionServers;
+  // Path to the hlogs directories
+  private final Path logDir;
+  // Path to the hlog archive
+  private final Path oldLogDir;
 
-  @Override
-  public void logRolled(Path newFile) {
-    // TODO Auto-generated method stub
-    
-  }
-  // REENABLE
-//
-//  private static final Log LOG =
-//      LogFactory.getLog(ReplicationSourceManager.class);
-//  // List of all the sources that read this RS's logs
-//  private final List<ReplicationSourceInterface> sources;
-//  // List of all the sources we got from died RSs
-//  private final List<ReplicationSourceInterface> oldsources;
-//  // Indicates if we are currently replicating
-//  private final AtomicBoolean replicating;
-//  // Helper for zookeeper
-//  private final ReplicationZookeeperWrapper zkHelper;
-//  // Indicates if the region server is closing
-//  private final AtomicBoolean stopper;
-//  // All logs we are currently trackign
-//  private final SortedSet<String> hlogs;
-//  private final Configuration conf;
-//  private final FileSystem fs;
-//  // The path to the latest log we saw, for new coming sources
-//  private Path latestPath;
-//  // List of all the other region servers in this cluster
-//  private final List<String> otherRegionServers;
-//  // Path to the hlogs directories
-//  private final Path logDir;
-//  // Path to the hlog archive
-//  private final Path oldLogDir;
-//
-//  /**
-//   * Creates a replication manager and sets the watch on all the other
-//   * registered region servers
-//   * @param zkHelper the zk helper for replication
-//   * @param conf the configuration to use
-//   * @param stopper the stopper object for this region server
-//   * @param fs the file system to use
-//   * @param replicating the status of the replication on this cluster
-//   * @param logDir the directory that contains all hlog directories of live RSs
-//   * @param oldLogDir the directory where old logs are archived
-//   */
-//  public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper,
-//                                  final Configuration conf,
-//                                  final AtomicBoolean stopper,
-//                                  final FileSystem fs,
-//                                  final AtomicBoolean replicating,
-//                                  final Path logDir,
-//                                  final Path oldLogDir) {
-//    this.sources = new ArrayList<ReplicationSourceInterface>();
-//    this.replicating = replicating;
-//    this.zkHelper = zkHelper;
-//    this.stopper = stopper;
-//    this.hlogs = new TreeSet<String>();
-//    this.oldsources = new ArrayList<ReplicationSourceInterface>();
-//    this.conf = conf;
-//    this.fs = fs;
-//    this.logDir = logDir;
-//    this.oldLogDir = oldLogDir;
-//    List<String> otherRSs =
-//        this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher());
-//    this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
-//  }
-//
-//  /**
-//   * Provide the id of the peer and a log key and this method will figure which
-//   * hlog it belongs to and will log, for this region server, the current
-//   * position. It will also clean old logs from the queue.
-//   * @param log Path to the log currently being replicated from
-//   * replication status in zookeeper. It will also delete older entries.
-//   * @param id id of the peer cluster
-//   * @param position current location in the log
-//   * @param queueRecovered indicates if this queue comes from another region server
-//   */
-//  public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
-//    String key = log.getName();
-//    LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
-//    this.zkHelper.writeReplicationStatus(key.toString(), id, position);
-//    synchronized (this.hlogs) {
-//      if (!queueRecovered && this.hlogs.first() != key) {
-//        SortedSet<String> hlogSet = this.hlogs.headSet(key);
-//        LOG.info("Removing " + hlogSet.size() +
-//            " logs in the list: " + hlogSet);
-//        for (String hlog : hlogSet) {
-//          this.zkHelper.removeLogFromList(hlog.toString(), id);
-//        }
-//        hlogSet.clear();
-//      }
-//    }
-//  }
-//
-//  /**
-//   * Adds a normal source per registered peer cluster and tries to process all
-//   * old region server hlog queues
-//   */
-//  public void init() throws IOException {
-//    for (String id : this.zkHelper.getPeerClusters().keySet()) {
-//      ReplicationSourceInterface src = addSource(id);
-//      src.startup();
-//    }
-//    List<String> currentReplicators = this.zkHelper.getListOfReplicators(null);
-//    synchronized (otherRegionServers) {
-//      LOG.info("Current list of replicators: " + currentReplicators
-//          + " other RSs: " + otherRegionServers);
-//    }
-//    // Look if there's anything to process after a restart
-//    for (String rs : currentReplicators) {
-//      synchronized (otherRegionServers) {
-//        if (!this.otherRegionServers.contains(rs)) {
-//          transferQueues(rs);
-//        }
-//      }
-//    }
-//  }
-//
-//  /**
-//   * Add a new normal source to this region server
-//   * @param id the id of the peer cluster
-//   * @return the created source
-//   * @throws IOException
-//   */
-//  public ReplicationSourceInterface addSource(String id) throws IOException {
-//    ReplicationSourceInterface src =
-//        getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
-//    this.sources.add(src);
-//    synchronized (this.hlogs) {
-//      if (this.hlogs.size() > 0) {
-//        this.zkHelper.addLogToList(this.hlogs.first(),
-//            this.sources.get(0).getPeerClusterZnode());
-//        src.enqueueLog(this.latestPath);
-//      }
-//    }
-//    return src;
-//  }
-//
-//  /**
-//   * Terminate the replication on this region server
-//   */
-//  public void join() {
-//    if (this.sources.size() == 0) {
-//      this.zkHelper.deleteOwnRSZNode();
-//    }
-//    for (ReplicationSourceInterface source : this.sources) {
-//      source.terminate();
-//    }
-//  }
-//
-//  /**
-//   * Get a copy of the hlogs of the first source on this rs
-//   * @return a sorted set of hlog names
-//   */
-//  protected SortedSet<String> getHLogs() {
-//    return new TreeSet(this.hlogs);
-//  }
-//
-//  /**
-//   * Get a list of all the normal sources of this rs
-//   * @return lis of all sources
-//   */
-//  public List<ReplicationSourceInterface> getSources() {
-//    return this.sources;
-//  }
-//
-//  @Override
-//  public void logRolled(Path newLog) {
-//    if (this.sources.size() > 0) {
-//      this.zkHelper.addLogToList(newLog.getName(),
-//          this.sources.get(0).getPeerClusterZnode());
-//    }
-//    synchronized (this.hlogs) {
-//      this.hlogs.add(newLog.getName());
-//    }
-//    this.latestPath = newLog;
-//    // This only update the sources we own, not the recovered ones
-//    for (ReplicationSourceInterface source : this.sources) {
-//      source.enqueueLog(newLog);
-//    }
-//  }
-//
-//  /**
-//   * Get the ZK help of this manager
-//   * @return the helper
-//   */
-//  public ReplicationZookeeperWrapper getRepZkWrapper() {
-//    return zkHelper;
-//  }
-//
-//  /**
-//   * Factory method to create a replication source
-//   * @param conf the configuration to use
-//   * @param fs the file system to use
-//   * @param manager the manager to use
-//   * @param stopper the stopper object for this region server
-//   * @param replicating the status of the replication on this cluster
-//   * @param peerClusterId the id of the peer cluster
-//   * @return the created source
-//   * @throws IOException
-//   */
-//  public ReplicationSourceInterface getReplicationSource(
-//      final Configuration conf,
-//      final FileSystem fs,
-//      final ReplicationSourceManager manager,
-//      final AtomicBoolean stopper,
-//      final AtomicBoolean replicating,
-//      final String peerClusterId) throws IOException {
-//    ReplicationSourceInterface src;
-//    try {
-//      Class c = Class.forName(conf.get("replication.replicationsource.implementation",
-//          ReplicationSource.class.getCanonicalName()));
-//      src = (ReplicationSourceInterface) c.newInstance();
-//    } catch (Exception e) {
-//      LOG.warn("Passed replication source implemention throws errors, " +
-//          "defaulting to ReplicationSource", e);
-//      src = new ReplicationSource();
-//
-//    }
-//    src.init(conf, fs, manager, stopper, replicating, peerClusterId);
-//    return src;
-//  }
-//
-//  /**
-//   * Transfer all the queues of the specified to this region server.
-//   * First it tries to grab a lock and if it works it will move the
-//   * znodes and finally will delete the old znodes.
-//   *
-//   * It creates one old source for any type of source of the old rs.
-//   * @param rsZnode
-//   */
-//  public void transferQueues(String rsZnode) {
-//    // We try to lock that rs' queue directory
-//    if (this.stopper.get()) {
-//      LOG.info("Not transferring queue since we are shutting down");
-//      return;
-//    }
-//    if (!this.zkHelper.lockOtherRS(rsZnode)) {
-//      return;
-//    }
-//    LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
-//    SortedMap<String, SortedSet<String>> newQueues =
-//        this.zkHelper.copyQueuesFromRS(rsZnode);
-//    if (newQueues == null || newQueues.size() == 0) {
-//      return;
-//    }
-//    this.zkHelper.deleteRsQueues(rsZnode);
-//
-//    for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
-//      String peerId = entry.getKey();
-//      try {
-//        ReplicationSourceInterface src = getReplicationSource(this.conf,
-//            this.fs, this, this.stopper, this.replicating, peerId);
-//        this.oldsources.add(src);
-//        for (String hlog : entry.getValue()) {
-//          src.enqueueLog(new Path(this.oldLogDir, hlog));
-//        }
-//        src.startup();
-//      } catch (IOException e) {
-//        // TODO manage it
-//        LOG.error("Failed creating a source", e);
-//      }
-//    }
-//  }
-//
-//  /**
-//   * Clear the references to the specified old source
-//   * @param src source to clear
-//   */
-//  public void closeRecoveredQueue(ReplicationSourceInterface src) {
-//    LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
-//    this.oldsources.remove(src);
-//    this.zkHelper.deleteSource(src.getPeerClusterZnode());
-//  }
-//
-//  /**
-//   * Watcher used to be notified of the other region server's death
-//   * in the local cluster. It initiates the process to transfer the queues
-//   * if it is able to grab the lock.
-//   */
-//  public class OtherRegionServerWatcher implements Watcher {
-//    @Override
-//    public void process(WatchedEvent watchedEvent) {
-//      LOG.info(" event " + watchedEvent);
-//      if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
-//          watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
-//        return;
-//      }
-//
-//      List<String> newRsList = (zkHelper.getRegisteredRegionServers(this));
-//      if (newRsList == null) {
-//        return;
-//      } else {
-//        synchronized (otherRegionServers) {
-//          otherRegionServers.clear();
-//          otherRegionServers.addAll(newRsList);
-//        }
-//      }
-//      if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
-//        LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
-//        String[] rsZnodeParts = watchedEvent.getPath().split("/");
-//        transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
-//      }
-//    }
-//  }
-//
-//  /**
-//   * Get the directory where hlogs are archived
-//   * @return the directory where hlogs are archived
-//   */
-//  public Path getOldLogDir() {
-//    return this.oldLogDir;
-//  }
-//
-//  /**
-//   * Get the directory where hlogs are stored by their RSs
-//   * @return the directory where hlogs are stored by their RSs
-//   */
-//  public Path getLogDir() {
-//    return this.logDir;
-//  }
-//
-//  /**
-//   * Get the handle on the local file system
-//   * @returnthe handle on the local file system
-//   */
-//  public FileSystem getFs() {
-//    return this.fs;
-//  }
+  /**
+   * Creates a replication manager and sets the watch on all the other
+   * registered region servers
+   * @param zkHelper the zk helper for replication
+   * @param conf the configuration to use
+   * @param stopper the stopper object for this region server
+   * @param fs the file system to use
+   * @param replicating the status of the replication on this cluster
+   * @param logDir the directory that contains all hlog directories of live RSs
+   * @param oldLogDir the directory where old logs are archived
+   */
+  public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper,
+                                  final Configuration conf,
+                                  final Stoppable stopper,
+                                  final FileSystem fs,
+                                  final AtomicBoolean replicating,
+                                  final Path logDir,
+                                  final Path oldLogDir) {
+    this.sources = new ArrayList<ReplicationSourceInterface>();
+    this.replicating = replicating;
+    this.zkHelper = zkHelper;
+    this.stopper = stopper;
+    this.hlogs = new TreeSet<String>();
+    this.oldsources = new ArrayList<ReplicationSourceInterface>();
+    this.conf = conf;
+    this.fs = fs;
+    this.logDir = logDir;
+    this.oldLogDir = oldLogDir;
+    List<String> otherRSs =
+        this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher());
+    this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
+  }
+
+  /**
+   * Provide the id of the peer and a log key and this method will figure which
+   * hlog it belongs to and will log, for this region server, the current
+   * position. It will also clean old logs from the queue.
+   * @param log Path to the log currently being replicated from
+   * replication status in zookeeper. It will also delete older entries.
+   * @param id id of the peer cluster
+   * @param position current location in the log
+   * @param queueRecovered indicates if this queue comes from another region server
+   */
+  public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
+    String key = log.getName();
+    LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
+    this.zkHelper.writeReplicationStatus(key.toString(), id, position);
+    synchronized (this.hlogs) {
+      if (!queueRecovered && this.hlogs.first() != key) {
+        SortedSet<String> hlogSet = this.hlogs.headSet(key);
+        LOG.info("Removing " + hlogSet.size() +
+            " logs in the list: " + hlogSet);
+        for (String hlog : hlogSet) {
+          this.zkHelper.removeLogFromList(hlog.toString(), id);
+        }
+        hlogSet.clear();
+      }
+    }
+  }
+
+  /**
+   * Adds a normal source per registered peer cluster and tries to process all
+   * old region server hlog queues
+   */
+  public void init() throws IOException {
+    for (String id : this.zkHelper.getPeerClusters().keySet()) {
+      ReplicationSourceInterface src = addSource(id);
+      src.startup();
+    }
+    List<String> currentReplicators = this.zkHelper.getListOfReplicators(null);
+    synchronized (otherRegionServers) {
+      LOG.info("Current list of replicators: " + currentReplicators
+          + " other RSs: " + otherRegionServers);
+    }
+    // Look if there's anything to process after a restart
+    for (String rs : currentReplicators) {
+      synchronized (otherRegionServers) {
+        if (!this.otherRegionServers.contains(rs)) {
+          transferQueues(rs);
+        }
+      }
+    }
+  }
+
+  /**
+   * Add a new normal source to this region server
+   * @param id the id of the peer cluster
+   * @return the created source
+   * @throws IOException
+   */
+  public ReplicationSourceInterface addSource(String id) throws IOException {
+    ReplicationSourceInterface src =
+        getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
+    this.sources.add(src);
+    synchronized (this.hlogs) {
+      if (this.hlogs.size() > 0) {
+        this.zkHelper.addLogToList(this.hlogs.first(),
+            this.sources.get(0).getPeerClusterZnode());
+        src.enqueueLog(this.latestPath);
+      }
+    }
+    return src;
+  }
+
+  /**
+   * Terminate the replication on this region server
+   */
+  public void join() {
+    if (this.sources.size() == 0) {
+      this.zkHelper.deleteOwnRSZNode();
+    }
+    for (ReplicationSourceInterface source : this.sources) {
+      source.terminate();
+    }
+  }
+
+  /**
+   * Get a copy of the hlogs of the first source on this rs
+   * @return a sorted set of hlog names
+   */
+  protected SortedSet<String> getHLogs() {
+    return new TreeSet(this.hlogs);
+  }
+
+  /**
+   * Get a list of all the normal sources of this rs
+   * @return lis of all sources
+   */
+  public List<ReplicationSourceInterface> getSources() {
+    return this.sources;
+  }
+
+  void logRolled(Path newLog) {
+    if (this.sources.size() > 0) {
+      this.zkHelper.addLogToList(newLog.getName(),
+          this.sources.get(0).getPeerClusterZnode());
+    }
+    synchronized (this.hlogs) {
+      this.hlogs.add(newLog.getName());
+    }
+    this.latestPath = newLog;
+    // This only update the sources we own, not the recovered ones
+    for (ReplicationSourceInterface source : this.sources) {
+      source.enqueueLog(newLog);
+    }
+  }
+
+  /**
+   * Get the ZK help of this manager
+   * @return the helper
+   */
+  public ReplicationZookeeperWrapper getRepZkWrapper() {
+    return zkHelper;
+  }
 
+  /**
+   * Factory method to create a replication source
+   * @param conf the configuration to use
+   * @param fs the file system to use
+   * @param manager the manager to use
+   * @param stopper the stopper object for this region server
+   * @param replicating the status of the replication on this cluster
+   * @param peerClusterId the id of the peer cluster
+   * @return the created source
+   * @throws IOException
+   */
+  public ReplicationSourceInterface getReplicationSource(
+      final Configuration conf,
+      final FileSystem fs,
+      final ReplicationSourceManager manager,
+      final Stoppable stopper,
+      final AtomicBoolean replicating,
+      final String peerClusterId) throws IOException {
+    ReplicationSourceInterface src;
+    try {
+      Class c = Class.forName(conf.get("replication.replicationsource.implementation",
+          ReplicationSource.class.getCanonicalName()));
+      src = (ReplicationSourceInterface) c.newInstance();
+    } catch (Exception e) {
+      LOG.warn("Passed replication source implemention throws errors, " +
+          "defaulting to ReplicationSource", e);
+      src = new ReplicationSource();
+
+    }
+    src.init(conf, fs, manager, stopper, replicating, peerClusterId);
+    return src;
+  }
+
+  /**
+   * Transfer all the queues of the specified to this region server.
+   * First it tries to grab a lock and if it works it will move the
+   * znodes and finally will delete the old znodes.
+   *
+   * It creates one old source for any type of source of the old rs.
+   * @param rsZnode
+   */
+  public void transferQueues(String rsZnode) {
+    // We try to lock that rs' queue directory
+    if (this.stopper.isStopped()) {
+      LOG.info("Not transferring queue since we are shutting down");
+      return;
+    }
+    if (!this.zkHelper.lockOtherRS(rsZnode)) {
+      return;
+    }
+    LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
+    SortedMap<String, SortedSet<String>> newQueues =
+        this.zkHelper.copyQueuesFromRS(rsZnode);
+    if (newQueues == null || newQueues.size() == 0) {
+      return;
+    }
+    this.zkHelper.deleteRsQueues(rsZnode);
+
+    for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
+      String peerId = entry.getKey();
+      try {
+        ReplicationSourceInterface src = getReplicationSource(this.conf,
+            this.fs, this, this.stopper, this.replicating, peerId);
+        this.oldsources.add(src);
+        for (String hlog : entry.getValue()) {
+          src.enqueueLog(new Path(this.oldLogDir, hlog));
+        }
+        src.startup();
+      } catch (IOException e) {
+        // TODO manage it
+        LOG.error("Failed creating a source", e);
+      }
+    }
+  }
+
+  /**
+   * Clear the references to the specified old source
+   * @param src source to clear
+   */
+  public void closeRecoveredQueue(ReplicationSourceInterface src) {
+    LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
+    this.oldsources.remove(src);
+    this.zkHelper.deleteSource(src.getPeerClusterZnode());
+  }
+
+  /**
+   * Watcher used to be notified of the other region server's death
+   * in the local cluster. It initiates the process to transfer the queues
+   * if it is able to grab the lock.
+   */
+  public class OtherRegionServerWatcher implements Watcher {
+    @Override
+    public void process(WatchedEvent watchedEvent) {
+      LOG.info(" event " + watchedEvent);
+      if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
+          watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
+        return;
+      }
+
+      List<String> newRsList = (zkHelper.getRegisteredRegionServers(this));
+      if (newRsList == null) {
+        return;
+      } else {
+        synchronized (otherRegionServers) {
+          otherRegionServers.clear();
+          otherRegionServers.addAll(newRsList);
+        }
+      }
+      if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
+        LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
+        String[] rsZnodeParts = watchedEvent.getPath().split("/");
+        transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
+      }
+    }
+  }
+
+  /**
+   * Get the directory where hlogs are archived
+   * @return the directory where hlogs are archived
+   */
+  public Path getOldLogDir() {
+    return this.oldLogDir;
+  }
+
+  /**
+   * Get the directory where hlogs are stored by their RSs
+   * @return the directory where hlogs are stored by their RSs
+   */
+  public Path getLogDir() {
+    return this.logDir;
+  }
+
+  /**
+   * Get the handle on the local file system
+   * @returnthe handle on the local file system
+   */
+  public FileSystem getFs() {
+    return this.fs;
+  }
 }

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java Fri Aug 27 20:53:15 2010
@@ -149,8 +149,7 @@ class HMerge {
       Path logdir = new Path(tabledir, "merge_" + System.currentTimeMillis() +
           HConstants.HREGION_LOGDIR_NAME);
       Path oldLogDir = new Path(tabledir, HConstants.HREGION_OLDLOGDIR_NAME);
-      this.hlog =
-        new HLog(fs, logdir, oldLogDir, conf, null);
+      this.hlog = new HLog(fs, logdir, oldLogDir, conf);
     }
 
     void process() throws IOException {

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java Fri Aug 27 20:53:15 2010
@@ -103,7 +103,7 @@ public class MetaUtils {
           HConstants.HREGION_LOGDIR_NAME + "_" + System.currentTimeMillis());
       Path oldLogDir = new Path(this.fs.getHomeDirectory(),
           HConstants.HREGION_OLDLOGDIR_NAME);
-      this.log = new HLog(this.fs, logdir, oldLogDir, this.conf, null);
+      this.log = new HLog(this.fs, logdir, oldLogDir, this.conf);
     }
     return this.log;
   }

Modified: hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/regionserver/regionserver.jsp
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/regionserver/regionserver.jsp?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/regionserver/regionserver.jsp (original)
+++ hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/regionserver/regionserver.jsp Fri Aug 27 20:53:15 2010
@@ -50,7 +50,7 @@
 <table>
 <tr><th>Region Name</th><th>Start Key</th><th>End Key</th><th>Metrics</th></tr>
 <%   for (HRegionInfo r: onlineRegions) { 
-        HServerLoad.RegionLoad load = regionServer.createRegionLoad(r.getRegionName());
+        HServerLoad.RegionLoad load = regionServer.createRegionLoad(r.getEncodedName());
  %>
 <tr><td><%= r.getRegionNameAsString() %></td>
     <td><%= Bytes.toStringBinary(r.getStartKey()) %></td><td><%= Bytes.toStringBinary(r.getEndKey()) %></td>

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java Fri Aug 27 20:53:15 2010
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.master;
 import static org.junit.Assert.assertEquals;
 
 import java.net.URLEncoder;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -31,7 +30,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -102,7 +100,7 @@ public class TestLogsCleaner {
         return this.stopped;
       }
     };
-    LogsCleaner cleaner  = new LogsCleaner(1000, stoppable,c, fs, oldLogDir);
+    LogCleaner cleaner  = new LogCleaner(1000, stoppable, c, fs, oldLogDir);
 
     // Create 2 invalid files, 1 "recent" file, 1 very new file and 30 old files
     long now = System.currentTimeMillis();

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java Fri Aug 27 20:53:15 2010
@@ -29,7 +29,7 @@ public class InstrumentedSequenceFileLog
   @Override
     public void append(HLog.Entry entry) throws IOException {
       super.append(entry);
-      if (activateFailure && Bytes.equals(entry.getKey().getRegionName(), "break".getBytes())) {
+      if (activateFailure && Bytes.equals(entry.getKey().getEncodedRegionName(), "break".getBytes())) {
         System.out.println(getClass().getName() + ": I will throw an exception now...");
         throw(new IOException("This exception is instrumented and should only be thrown for testing"));
       }

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Fri Aug 27 20:53:15 2010
@@ -133,7 +133,7 @@ public class TestHLog  {
     final byte [] tableName = Bytes.toBytes(getName());
     final byte [] rowName = tableName;
     Path logdir = new Path(dir, HConstants.HREGION_LOGDIR_NAME);
-    HLog log = new HLog(fs, logdir, oldLogDir, conf, null);
+    HLog log = new HLog(fs, logdir, oldLogDir, conf);
     final int howmany = 3;
     HRegionInfo[] infos = new HRegionInfo[3];
     for(int i = 0; i < howmany; i++) {
@@ -192,7 +192,7 @@ public class TestHLog  {
     out.close();
     in.close();
     Path subdir = new Path(dir, "hlogdir");
-    HLog wal = new HLog(fs, subdir, oldLogDir, conf, null);
+    HLog wal = new HLog(fs, subdir, oldLogDir, conf);
     final int total = 20;
 
     HRegionInfo info = new HRegionInfo(new HTableDescriptor(bytes),
@@ -295,7 +295,7 @@ public class TestHLog  {
         HLog.Entry entry = new HLog.Entry();
         while((entry = reader.next(entry)) != null) {
           HLogKey key = entry.getKey();
-          String region = Bytes.toString(key.getRegionName());
+          String region = Bytes.toString(key.getEncodedRegionName());
           // Assert that all edits are for same region.
           if (previousRegion != null) {
             assertEquals(previousRegion, region);
@@ -325,7 +325,7 @@ public class TestHLog  {
         HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
     Path subdir = new Path(dir, "hlogdir");
     Path archdir = new Path(dir, "hlogdir_archive");
-    HLog wal = new HLog(fs, subdir, archdir, conf, null);
+    HLog wal = new HLog(fs, subdir, archdir, conf);
     final int total = 20;
 
     for (int i = 0; i < total; i++) {
@@ -429,7 +429,7 @@ public class TestHLog  {
     final byte [] tableName = Bytes.toBytes("tablename");
     final byte [] row = Bytes.toBytes("row");
     HLog.Reader reader = null;
-    HLog log = new HLog(fs, dir, oldLogDir, conf, null);
+    HLog log = new HLog(fs, dir, oldLogDir, conf);
     try {
       // Write columns named 1, 2, 3, etc. and then values of single byte
       // 1, 2, 3...
@@ -442,10 +442,9 @@ public class TestHLog  {
       }
       HRegionInfo info = new HRegionInfo(new HTableDescriptor(tableName),
         row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
-      final byte [] regionName = info.getRegionName();
       log.append(info, tableName, cols, System.currentTimeMillis());
       long logSeqId = log.startCacheFlush();
-      log.completeCacheFlush(regionName, tableName, logSeqId, info.isMetaRegion());
+      log.completeCacheFlush(info.getEncodedNameAsBytes(), tableName, logSeqId, info.isMetaRegion());
       log.close();
       Path filename = log.computeFilename();
       log = null;
@@ -458,7 +457,7 @@ public class TestHLog  {
         if (entry == null) break;
         HLogKey key = entry.getKey();
         WALEdit val = entry.getEdit();
-        assertTrue(Bytes.equals(regionName, key.getRegionName()));
+        assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
         assertTrue(Bytes.equals(tableName, key.getTablename()));
         KeyValue kv = val.getKeyValues().get(0);
         assertTrue(Bytes.equals(row, kv.getRow()));
@@ -470,7 +469,7 @@ public class TestHLog  {
         HLogKey key = entry.getKey();
         WALEdit val = entry.getEdit();
         // Assert only one more row... the meta flushed row.
-        assertTrue(Bytes.equals(regionName, key.getRegionName()));
+        assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
         assertTrue(Bytes.equals(tableName, key.getTablename()));
         KeyValue kv = val.getKeyValues().get(0);
         assertTrue(Bytes.equals(HLog.METAROW, kv.getRow()));
@@ -498,7 +497,7 @@ public class TestHLog  {
     final byte [] tableName = Bytes.toBytes("tablename");
     final byte [] row = Bytes.toBytes("row");
     Reader reader = null;
-    HLog log = new HLog(fs, dir, oldLogDir, conf, null);
+    HLog log = new HLog(fs, dir, oldLogDir, conf);
     try {
       // Write columns named 1, 2, 3, etc. and then values of single byte
       // 1, 2, 3...
@@ -513,7 +512,7 @@ public class TestHLog  {
           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
       log.append(hri, tableName, cols, System.currentTimeMillis());
       long logSeqId = log.startCacheFlush();
-      log.completeCacheFlush(hri.getRegionName(), tableName, logSeqId, false);
+      log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, logSeqId, false);
       log.close();
       Path filename = log.computeFilename();
       log = null;
@@ -524,7 +523,7 @@ public class TestHLog  {
       int idx = 0;
       for (KeyValue val : entry.getEdit().getKeyValues()) {
         assertTrue(Bytes.equals(hri.getRegionName(),
-          entry.getKey().getRegionName()));
+          entry.getKey().getEncodedRegionName()));
         assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
         assertTrue(Bytes.equals(row, val.getRow()));
         assertEquals((byte)(idx + '0'), val.getValue()[0]);
@@ -537,7 +536,7 @@ public class TestHLog  {
       assertEquals(1, entry.getEdit().size());
       for (KeyValue val : entry.getEdit().getKeyValues()) {
         assertTrue(Bytes.equals(hri.getRegionName(),
-          entry.getKey().getRegionName()));
+          entry.getKey().getEncodedRegionName()));
         assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
         assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
         assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
@@ -564,9 +563,9 @@ public class TestHLog  {
     final int COL_COUNT = 10;
     final byte [] tableName = Bytes.toBytes("tablename");
     final byte [] row = Bytes.toBytes("row");
-    HLog log = new HLog(fs, dir, oldLogDir, conf, null);
-    DumbLogEntriesVisitor visitor = new DumbLogEntriesVisitor();
-    log.addLogEntryVisitor(visitor);
+    HLog log = new HLog(fs, dir, oldLogDir, conf);
+    DumbWALObserver visitor = new DumbWALObserver();
+    log.registerWALActionsListener(visitor);
     long timestamp = System.currentTimeMillis();
     HRegionInfo hri = new HRegionInfo(new HTableDescriptor(tableName),
         HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
@@ -578,7 +577,7 @@ public class TestHLog  {
       log.append(hri, tableName, cols, System.currentTimeMillis());
     }
     assertEquals(COL_COUNT, visitor.increments);
-    log.removeLogEntryVisitor(visitor);
+    log.unregisterWALActionsListener(visitor);
     WALEdit cols = new WALEdit();
     cols.add(new KeyValue(row, Bytes.toBytes("column"),
         Bytes.toBytes(Integer.toString(11)),
@@ -587,8 +586,7 @@ public class TestHLog  {
     assertEquals(COL_COUNT, visitor.increments);
   }
 
-  static class DumbLogEntriesVisitor implements LogEntryVisitor {
-
+  static class DumbWALObserver implements WALObserver {
     int increments = 0;
 
     @Override
@@ -596,5 +594,17 @@ public class TestHLog  {
                                          WALEdit logEdit) {
       increments++;
     }
+
+    @Override
+    public void logRolled(Path newFile) {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public void logRollRequested() {
+      // TODO Auto-generated method stub
+      
+    }
   }
 }

Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALObserver.java?rev=990266&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALObserver.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALObserver.java Fri Aug 27 20:53:15 2010
@@ -0,0 +1,135 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Test that the actions are called while playing with an HLog
+ */
+public class TestWALObserver {
+  protected static final Log LOG = LogFactory.getLog(TestWALObserver.class);
+
+  private final static HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+
+  private final static byte[] SOME_BYTES =  Bytes.toBytes("t");
+  private static FileSystem fs;
+  private static Path oldLogDir;
+  private static Path logDir;
+  private static Configuration conf;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf = TEST_UTIL.getConfiguration();
+    conf.setInt("hbase.regionserver.maxlogs", 5);
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(HBaseTestingUtility.getTestDir(),
+        HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(HBaseTestingUtility.getTestDir(),
+        HConstants.HREGION_LOGDIR_NAME);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    setUp();
+  }
+
+  /**
+   * Add a bunch of dummy data and roll the logs every two insert. We
+   * should end up with 10 rolled files (plus the roll called in
+   * the constructor). Also test adding a listener while it's running.
+   */
+  @Test
+  public void testActionListener() throws Exception {
+    DummyWALObserver observer = new DummyWALObserver();
+    List<WALObserver> list = new ArrayList<WALObserver>();
+    list.add(observer);
+    DummyWALObserver laterobserver = new DummyWALObserver();
+    HLog hlog = new HLog(fs, logDir, oldLogDir, conf, list, null);
+    HRegionInfo hri = new HRegionInfo(new HTableDescriptor(SOME_BYTES),
+        SOME_BYTES, SOME_BYTES, false);
+
+    for (int i = 0; i < 20; i++) {
+      byte[] b = Bytes.toBytes(i+"");
+      KeyValue kv = new KeyValue(b,b,b);
+      WALEdit edit = new WALEdit();
+      edit.add(kv);
+      HLogKey key = new HLogKey(b,b, 0, 0);
+      hlog.append(hri, key, edit);
+      if (i == 10) {
+        hlog.registerWALActionsListener(laterobserver);
+      }
+      if (i % 2 == 0) {
+        hlog.rollWriter();
+      }
+    }
+    assertEquals(11, observer.logRollCounter);
+    assertEquals(5, laterobserver.logRollCounter);
+  }
+
+  /**
+   * Just counts when methods are called
+   */
+  static class DummyWALObserver implements WALObserver {
+    public int logRollCounter = 0;
+
+    @Override
+    public void logRolled(Path newFile) {
+      logRollCounter++;
+    }
+
+    @Override
+    public void logRollRequested() {
+      // Not interested
+    }
+
+    @Override
+    public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+        WALEdit logEdit) {
+      // Not interested
+      
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Fri Aug 27 20:53:15 2010
@@ -482,10 +482,10 @@ public class TestWALReplay {
    * @throws IOException
    */
   private HLog createWAL(final Configuration c) throws IOException {
-    HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c, null);
+    HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c);
     // Set down maximum recovery so we dfsclient doesn't linger retrying something
     // long gone.
     HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
     return wal;
   }
-}
+}
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java Fri Aug 27 20:53:15 2010
@@ -248,7 +248,7 @@ public class TestMergeTool extends HBase
       System.currentTimeMillis());
     LOG.info("Creating log " + logPath.toString());
     Path oldLogDir = new Path("/tmp", HConstants.HREGION_OLDLOGDIR_NAME);
-    HLog log = new HLog(this.fs, logPath, oldLogDir, this.conf, null);
+    HLog log = new HLog(this.fs, logPath, oldLogDir, this.conf);
     try {
        // Merge Region 0 and Region 1
       HRegion merged = mergeAndVerify("merging regions 0 and 1",