You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/27 22:53:16 UTC

svn commit: r990266 [1/2] - in /hbase/branches/0.90_master_rewrite/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/wal/ mai...

Author: stack
Date: Fri Aug 27 20:53:15 2010
New Revision: 990266

URL: http://svn.apache.org/viewvc?rev=990266&view=rev
Log:

Fixup around how replication is setup in HRS.  Made the 3 interfaces that
were interested in listening to WAL events and that each registered themselves
in different ways into a single WAL observer.  Changed keying of Maps and
recording of region names in the likes of HLogKeys to be encoded name
rather than full region name.

M src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
M src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java
  Constructor for HLog changed.
M src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java
  Renamed as TestWALObserver
M src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALObserver.java
  Renamed and fixed up.
M src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
  HLogKeys now have encoded names of regions rather than full region name.
M src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
  Encoded names of regions rather than full names.
M src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
  Formatting.
M src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
  Javadoc.
M src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
  Cleanup around replication, wal log setup and log roller setup.
  Cast all as observers and allow others register themselves as observers.
M src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
  Cast as a WALObserver.
  Use new Server and RegionServerServices interfaces rather than refer
  to RegionServer directly (makes this class easier to test).
M src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogRollListener.java
  Removed.  Redone as method in WALObserver.
M src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
  Used region encoded name instead of full region name.
M src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java
M src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java
  Removed.  Redone as method in WALObserver.
M src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
  Changed listeners and visitors to all use WALObserver instead.
  Add registration.  Also an edit to make sure we use region encoded name
  everywhere rather than full region name.
M src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java
  Aggregation of all the interfaces that were interested in WAL log events.
M src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
  Used region encoded name rather than full thing when marking flushes, etc.
M src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
  Encoded region name as bytes.
M src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
  Renamed LogsCleaner as LogCleaner to match the LogCleanerDelegate interface.
M src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java
  Rename.
M src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
  Some fixup merging with new master stuff.
   Mostly uncommenting out commented out stuff.  Not done yet.
M src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
  Fixup to make replication registration setup cleaner.  Made this the
  listener for WAL events forwarding those its interested in elsehwere when necessary
  (was already listening for one event type).
  Use new Server interface.
M src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
  Use Stoppable instead of AtomicBoolean stop. 
M src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
  Uncommented stuff I'd commented out to make things compile..
M src/main/java/org/apache/hadoop/hbase/util/HMerge.java
M src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java
  HLog constructor changed.

Added:
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALObserver.java
Removed:
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogRollListener.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java
Modified:
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java
    hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/regionserver/regionserver.jsp
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java Fri Aug 27 20:53:15 2010
@@ -148,6 +148,7 @@ public class HRegionInfo extends Version
   //TODO: Move NO_HASH to HStoreFile which is really the only place it is used.
   public static final String NO_HASH = null;
   private volatile String encodedName = NO_HASH;
+  private byte [] encodedNameAsBytes = null;
 
   private void setHashCode() {
     int result = Arrays.hashCode(this.regionName);
@@ -426,6 +427,13 @@ public class HRegionInfo extends Version
     return this.encodedName;
   }
 
+  public synchronized byte [] getEncodedNameAsBytes() {
+    if (this.encodedNameAsBytes == null) {
+      this.encodedNameAsBytes = Bytes.toBytes(getEncodedName());
+    }
+    return this.encodedNameAsBytes;
+  }
+
   /** @return the startKey */
   public byte [] getStartKey(){
     return startKey;

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java?rev=990266&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java Fri Aug 27 20:53:15 2010
@@ -0,0 +1,155 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+/**
+ * This Chore, everytime it runs, will clear the wal logs in the old logs folder
+ * that are deletable for each log cleaner in the chain, in order to limit the
+ * number of deletes it sends, will only delete maximum 20 in a single run.
+ */
+public class LogCleaner extends Chore {
+  static final Log LOG = LogFactory.getLog(LogCleaner.class.getName());
+
+  // Max number we can delete on every chore, this is to make sure we don't
+  // issue thousands of delete commands around the same time
+  private final int maxDeletedLogs;
+  private final FileSystem fs;
+  private final Path oldLogDir;
+  private List<LogCleanerDelegate> logCleanersChain;
+  private final Configuration conf;
+
+  /**
+   *
+   * @param p the period of time to sleep between each run
+   * @param s the stopper
+   * @param conf configuration to use
+   * @param fs handle to the FS
+   * @param oldLogDir the path to the archived logs
+   */
+  public LogCleaner(final int p, final Stoppable s,
+                        Configuration conf, FileSystem fs,
+                        Path oldLogDir) {
+    super("LogsCleaner", p, s);
+
+    this.maxDeletedLogs =
+        conf.getInt("hbase.master.logcleaner.maxdeletedlogs", 20);
+    this.fs = fs;
+    this.oldLogDir = oldLogDir;
+    this.conf = conf;
+    this.logCleanersChain = new LinkedList<LogCleanerDelegate>();
+
+    initLogCleanersChain();
+  }
+
+  /*
+   * Initialize the chain of log cleaners from the configuration. The default
+   * three LogCleanerDelegates in this chain are: TimeToLiveLogCleaner,
+   * ReplicationLogCleaner and SnapshotLogCleaner.
+   */
+  private void initLogCleanersChain() {
+    String[] logCleaners = conf.getStrings("hbase.master.logcleaner.plugins");
+    if (logCleaners != null) {
+      for (String className : logCleaners) {
+        LogCleanerDelegate logCleaner = newLogCleaner(className, conf);
+        addLogCleaner(logCleaner);
+      }
+    }
+  }
+
+  /**
+   * A utility method to create new instances of LogCleanerDelegate based
+   * on the class name of the LogCleanerDelegate.
+   * @param className fully qualified class name of the LogCleanerDelegate
+   * @param conf
+   * @return the new instance
+   */
+  public static LogCleanerDelegate newLogCleaner(String className, Configuration conf) {
+    try {
+      Class c = Class.forName(className);
+      LogCleanerDelegate cleaner = (LogCleanerDelegate) c.newInstance();
+      cleaner.setConf(conf);
+      return cleaner;
+    } catch(Exception e) {
+      LOG.warn("Can NOT create LogCleanerDelegate: " + className, e);
+      // skipping if can't instantiate
+      return null;
+    }
+  }
+
+  /**
+   * Add a LogCleanerDelegate to the log cleaner chain. A log file is deletable
+   * if it is deletable for each LogCleanerDelegate in the chain.
+   * @param logCleaner
+   */
+  public void addLogCleaner(LogCleanerDelegate logCleaner) {
+    if (logCleaner != null && !logCleanersChain.contains(logCleaner)) {
+      logCleanersChain.add(logCleaner);
+      LOG.debug("Add log cleaner in chain: " + logCleaner.getClass().getName());
+    }
+  }
+
+  @Override
+  protected void chore() {
+    try {
+      FileStatus[] files = this.fs.listStatus(this.oldLogDir);
+      int nbDeletedLog = 0;
+      FILE: for (FileStatus file : files) {
+        Path filePath = file.getPath();
+        if (HLog.validateHLogFilename(filePath.getName())) {
+          for (LogCleanerDelegate logCleaner : logCleanersChain) {
+            if (!logCleaner.isLogDeletable(filePath) ) {
+              // this log is not deletable, continue to process next log file
+              continue FILE;
+            }
+          }
+          // delete this log file if it passes all the log cleaners
+          this.fs.delete(filePath, true);
+          nbDeletedLog++;
+        } else {
+          LOG.warn("Found a wrongly formated file: "
+              + file.getPath().getName());
+          this.fs.delete(filePath, true);
+          nbDeletedLog++;
+        }
+        if (nbDeletedLog >= maxDeletedLogs) {
+          break;
+        }
+      }
+    } catch (IOException e) {
+      e = RemoteExceptionHandler.checkIOException(e);
+      LOG.warn("Error while cleaning the logs", e);
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Fri Aug 27 20:53:15 2010
@@ -88,7 +88,7 @@ public class ServerManager {
 
   private int minimumServerCount;
 
-  private final LogsCleaner logCleaner;
+  private final LogCleaner logCleaner;
 
   // Reporting to track master metrics.
   private final MasterMetrics metrics;
@@ -134,7 +134,7 @@ public class ServerManager {
     String n = Thread.currentThread().getName();
     Threads.setDaemonThreadRunning(this.serverMonitorThread,
       n + ".serverMonitor");
-    this.logCleaner = new LogsCleaner(
+    this.logCleaner = new LogCleaner(
       c.getInt("hbase.master.meta.thread.rescanfrequency",60 * 1000),
       master, c, this.services.getMasterFileSystem().getFileSystem(),
       this.services.getMasterFileSystem().getOldLogDir());

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Aug 27 20:53:15 2010
@@ -940,7 +940,7 @@ public class HRegion implements HeapSize
     //     and that all updates to the log for this regionName that have lower
     //     log-sequence-ids can be safely ignored.
     if (wal != null) {
-      wal.completeCacheFlush(getRegionName(),
+      wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(),
         regionInfo.getTableDesc().getName(), completeSequenceId,
         this.getRegionInfo().isMetaRegion());
     }
@@ -1831,7 +1831,7 @@ public class HRegion implements HeapSize
         // Check this edit is for me. Also, guard against writing the special
         // METACOLUMN info such as HBASE::CACHEFLUSH entries
         if (kv.matchingFamily(HLog.METAFAMILY) ||
-            !Bytes.equals(key.getRegionName(), this.regionInfo.getRegionName())) {
+            !Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getRegionName())) {
           skippedEdits++;
           continue;
         }
@@ -2369,7 +2369,7 @@ public class HRegion implements HeapSize
     fs.mkdirs(regionDir);
     HRegion region = HRegion.newHRegion(tableDir,
       new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
-          new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf, null),
+          new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf),
       fs, conf, info, null);
     region.initialize();
     return region;
@@ -3132,7 +3132,7 @@ public class HRegion implements HeapSize
         + EnvironmentEdgeManager.currentTimeMillis());
     final Path oldLogDir = new Path(c.get("hbase.tmp.dir"),
         HConstants.HREGION_OLDLOGDIR_NAME);
-    final HLog log = new HLog(fs, logdir, oldLogDir, c, null);
+    final HLog log = new HLog(fs, logdir, oldLogDir, c);
     try {
       processTable(fs, tableDir, log, c, majorCompact);
      } finally {

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Aug 27 20:53:15 2010
@@ -108,6 +108,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -255,7 +256,7 @@ public class HRegionServer implements HR
   // Instance of the hbase executor service.
   private ExecutorService service;
 
-  // Replication services
+  // Replication services. If no replication, this handler will be null.
   private Replication replicationHandler;
 
   /**
@@ -375,9 +376,6 @@ public class HRegionServer implements HR
     // Compaction thread
     this.compactSplitThread = new CompactSplitThread(this);
 
-    // Log rolling thread
-    this.hlogRoller = new LogRoller(this);
-
     // Background thread to check for major compactions; needed if region
     // has not gotten updates in a while. Make it run at a lesser frequency.
     int multiplier = this.conf.getInt(HConstants.THREAD_WAKE_FREQUENCY
@@ -701,7 +699,7 @@ public class HRegionServer implements HR
       // Get fs instance used by this RS
       this.fs = FileSystem.get(this.conf);
       this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
-      this.hlog = setupHLog();
+      this.hlog = setupWALAndReplication();
       // Init in here rather than in constructor after thread name has been set
       this.metrics = new RegionServerMetrics();
       startServiceThreads();
@@ -741,12 +739,12 @@ public class HRegionServer implements HR
   }
 
   /**
-   * @param regionName
+   * @param encodedRegionName
    * @return An instance of RegionLoad.
    * @throws IOException
    */
-  public HServerLoad.RegionLoad createRegionLoad(final byte[] regionName) {
-    return createRegionLoad(this.onlineRegions.get(Bytes.mapKey(regionName)));
+  public HServerLoad.RegionLoad createRegionLoad(final String encodedRegionName) {
+    return createRegionLoad(this.onlineRegions.get(encodedRegionName));
   }
 
   /*
@@ -887,30 +885,59 @@ public class HRegionServer implements HR
     return isOnline;
   }
 
-  private HLog setupHLog() throws IOException {
+  /**
+   * Setup WAL log and replication if enabled.
+   * Replication setup is done in here because it wants to be hooked up to WAL.
+   * @return A WAL instance.
+   * @throws IOException
+   */
+  private HLog setupWALAndReplication() throws IOException {
     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
     Path logdir = new Path(rootDir, HLog.getHLogDirectoryName(this.serverInfo));
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Log dir " + logdir);
+      LOG.debug("logdir=" + logdir);
     }
-    if (fs.exists(logdir)) {
-      throw new RegionServerRunningException("region server already "
+    if (this.fs.exists(logdir)) {
+      throw new RegionServerRunningException("Region server already "
           + "running at " + this.serverInfo.getServerName()
           + " because logdir " + logdir.toString() + " exists");
     }
-    this.replicationHandler = new Replication(this, this.fs, logdir, oldLogDir);
-    HLog log = instantiateHLog(logdir, oldLogDir);
-    this.replicationHandler.addLogEntryVisitor(log);
-    return log;
+
+    // Instantiate replication manager if replication enabled.  Pass it the
+    // log directories.
+    this.replicationHandler = Replication.isReplication(this.conf)?
+      new Replication(this, this.fs, logdir, oldLogDir): null;
+    return instantiateHLog(logdir, oldLogDir);
   }
 
-  // instantiate
+  /**
+   * Called by {@link #setupWALAndReplication()} creating WAL instance.
+   * @param logdir
+   * @param oldLogDir
+   * @return WAL instance.
+   * @throws IOException
+   */
   protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
-    return new HLog(this.fs, logdir, oldLogDir, this.conf, this.hlogRoller,
-      this.replicationHandler != null?
-        this.replicationHandler.getReplicationManager():
-        null,
-        this.serverInfo.getServerAddress().toString());
+    return new HLog(this.fs, logdir, oldLogDir, this.conf,
+      getWALActionListeners(), this.serverInfo.getServerAddress().toString());
+  }
+
+  /**
+   * Called by {@link #instantiateHLog(Path, Path)} setting up WAL instance.
+   * Add any {@link WALObserver}s you want inserted before WAL startup.
+   * @return List of WALActionsListener that will be passed in to
+   * {@link HLog} on construction.
+   */
+  protected List<WALObserver> getWALActionListeners() {
+    List<WALObserver> listeners = new ArrayList<WALObserver>();
+    // Log roller.
+    this.hlogRoller = new LogRoller(this, this);
+    if (this.replicationHandler != null) {
+      listeners = new ArrayList<WALObserver>();
+      // Replication handler is an implementation of WALActionsListener.
+      listeners.add(this.replicationHandler);
+    }
+    return listeners;
   }
 
   protected LogRoller getLogRoller() {
@@ -1056,7 +1083,9 @@ public class HRegionServer implements HR
       }
     }
 
-    this.replicationHandler.startReplicationServices();
+    if (this.replicationHandler != null) {
+      this.replicationHandler.startReplicationServices();
+    }
 
     // Start Server.  This service is like leases in that it internally runs
     // a thread.
@@ -1177,7 +1206,9 @@ public class HRegionServer implements HR
     Threads.shutdown(this.compactSplitThread);
     Threads.shutdown(this.hlogRoller);
     this.service.shutdown();
-    this.replicationHandler.join();
+    if (this.replicationHandler != null) {
+      this.replicationHandler.join();
+    }
   }
 
   /**
@@ -1924,7 +1955,7 @@ public class HRegionServer implements HR
   }
 
   @Override
-  public HRegion removeFromOnlineRegions(final String encodedName) {
+  public boolean removeFromOnlineRegions(final String encodedName) {
     this.lock.writeLock().lock();
     HRegion toReturn = null;
     try {
@@ -1932,7 +1963,7 @@ public class HRegionServer implements HR
     } finally {
       this.lock.writeLock().unlock();
     }
-    return toReturn;
+    return toReturn != null;
   }
 
   /**
@@ -2269,7 +2300,9 @@ public class HRegionServer implements HR
   }
 
   @Override
-  public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+  public void replicateLogEntries(final HLog.Entry[] entries)
+  throws IOException {
+    if (this.replicationHandler == null) return;
     this.replicationHandler.replicateLogEntries(entries);
   }
 

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java Fri Aug 27 20:53:15 2010
@@ -21,9 +21,15 @@ package org.apache.hadoop.hbase.regionse
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
-import org.apache.hadoop.hbase.regionserver.wal.LogRollListener;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
@@ -37,21 +43,26 @@ import java.util.concurrent.locks.Reentr
  * can be interrupted when there is something to do, rather than the Chore
  * sleep time which is invariant.
  */
-class LogRoller extends Thread implements LogRollListener {
+class LogRoller extends Thread implements WALObserver {
   static final Log LOG = LogFactory.getLog(LogRoller.class);
   private final ReentrantLock rollLock = new ReentrantLock();
   private final AtomicBoolean rollLog = new AtomicBoolean(false);
-  private final HRegionServer server;
+  private final Server server;
+  private final RegionServerServices services;
   private volatile long lastrolltime = System.currentTimeMillis();
   // Period to roll log.
   private final long rollperiod;
+  private final int threadWakeFrequency;
 
   /** @param server */
-  public LogRoller(final HRegionServer server) {
+  public LogRoller(final Server server, final RegionServerServices services) {
     super();
     this.server = server;
-    this.rollperiod =
-      this.server.conf.getLong("hbase.regionserver.logroll.period", 3600000);
+    this.services = services;
+    this.rollperiod = this.server.getConfiguration().
+      getLong("hbase.regionserver.logroll.period", 3600000);
+    this.threadWakeFrequency = this.server.getConfiguration().
+      getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
   }
 
   @Override
@@ -64,7 +75,7 @@ class LogRoller extends Thread implement
         if (!periodic) {
           synchronized (rollLog) {
             try {
-              rollLog.wait(server.threadWakeFrequency);
+              rollLog.wait(this.threadWakeFrequency);
             } catch (InterruptedException e) {
               // Fall through
             }
@@ -79,27 +90,21 @@ class LogRoller extends Thread implement
       rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
       try {
         this.lastrolltime = now;
-        byte [][] regionsToFlush = server.getWAL().rollWriter();
+        // This is array of actual region names.
+        byte [][] regionsToFlush = this.services.getWAL().rollWriter();
         if (regionsToFlush != null) {
           for (byte [] r: regionsToFlush) scheduleFlush(r);
         }
       } catch (FailedLogCloseException e) {
-        LOG.fatal("Forcing server shutdown", e);
-        server.checkFileSystem();
         server.abort("Failed log close in log roller", e);
       } catch (java.net.ConnectException e) {
-        LOG.fatal("Forcing server shutdown", e);
-        server.checkFileSystem();
-        server.abort("Failed connect in log roller", e);
+        server.abort("Failed log close in log roller", e);
       } catch (IOException ex) {
-        LOG.fatal("Log rolling failed with ioe: ",
-          RemoteExceptionHandler.checkIOException(ex));
-        server.checkFileSystem();
         // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
-        server.abort("IOE in log roller", ex);
+        server.abort("IOE in log roller",
+          RemoteExceptionHandler.checkIOException(ex));
       } catch (Exception ex) {
         LOG.error("Log rolling failed", ex);
-        server.checkFileSystem();
         server.abort("Log rolling failed", ex);
       } finally {
         rollLog.set(false);
@@ -109,12 +114,15 @@ class LogRoller extends Thread implement
     LOG.info("LogRoller exiting.");
   }
 
+  /**
+   * @param region Encoded name of region to flush.
+   */
   private void scheduleFlush(final byte [] region) {
     boolean scheduled = false;
-    HRegion r = this.server.getOnlineRegion(region);
+    HRegion r = this.services.getFromOnlineRegions(Bytes.toString(region));
     FlushRequester requester = null;
     if (r != null) {
-      requester = this.server.getFlushRequester();
+      requester = this.services.getFlushRequester();
       if (requester != null) {
         requester.requestFlush(r);
         scheduled = true;
@@ -145,4 +153,15 @@ class LogRoller extends Thread implement
       rollLock.unlock();
     }
   }
-}
+
+  @Override
+  public void logRolled(Path newFile) {
+    // Not interested
+  }
+
+  @Override
+  public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+      WALEdit logEdit) {
+    // Not interested.
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java Fri Aug 27 20:53:15 2010
@@ -34,11 +34,14 @@ interface OnlineRegions {
    * This method removes HRegion corresponding to hri from the Map of onlineRegions.
    *
    * @param encodedRegionName
-   * @return the removed HRegion, or null if the HRegion was not in onlineRegions.
+   * @return True if we removed a region from online list.
    */
-  public HRegion removeFromOnlineRegions(String encodedRegionName);
+  public boolean removeFromOnlineRegions(String encodedRegionName);
 
   /**
+   * Return {@link HRegion} instance.
+   * Only works if caller is in same context, in same JVM. HRegion is not
+   * serializable.
    * @param encodedRegionName
    * @return HRegion for the passed encoded <code>encodedRegionName</code> or
    * null if named region is not member of the online regions.

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java Fri Aug 27 20:53:15 2010
@@ -217,7 +217,9 @@ class SplitTransaction {
     List<StoreFile> hstoreFilesToSplit = this.parent.close(false);
     this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
 
-    if (or != null) or.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
+    if (or != null) {
+      or.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
+    }
     this.journal.add(JournalEntry.OFFLINED_PARENT);
 
     splitStoreFiles(this.splitdir, hstoreFilesToSplit);

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Aug 27 20:53:15 2010
@@ -137,15 +137,15 @@ public class HLog implements Syncable {
   private final FileSystem fs;
   private final Path dir;
   private final Configuration conf;
-  private final LogRollListener listener;
+  // Listeners that are called on WAL events.
+  private List<WALObserver> listeners =
+    new CopyOnWriteArrayList<WALObserver>();
   private final long optionalFlushInterval;
   private final long blocksize;
   private final int flushlogentries;
   private final String prefix;
   private final AtomicInteger unflushedEntries = new AtomicInteger(0);
   private final Path oldLogDir;
-  private final List<LogActionsListener> actionListeners =
-      Collections.synchronizedList(new ArrayList<LogActionsListener>());
 
 
   private static Class<? extends Writer> logWriterClass;
@@ -188,7 +188,8 @@ public class HLog implements Syncable {
     Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
 
   /*
-   * Map of regions to first sequence/edit id in their memstore.
+   * Map of regions to most recent sequence/edit id in their memstore.
+   * Key is encoded region name.
    */
   private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
     new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
@@ -229,9 +230,6 @@ public class HLog implements Syncable {
    */
   private final LogSyncer logSyncerThread;
 
-  private final List<LogEntryVisitor> logEntryVisitors =
-      new CopyOnWriteArrayList<LogEntryVisitor>();
-
   /**
    * Pattern used to validate a HLog file name
    */
@@ -279,19 +277,18 @@ public class HLog implements Syncable {
   }
 
   /**
-   * HLog creating with a null actions listener.
+   * Constructor.
    *
    * @param fs filesystem handle
    * @param dir path to where hlogs are stored
    * @param oldLogDir path to where hlogs are archived
    * @param conf configuration to use
-   * @param listener listerner used to request log rolls
    * @throws IOException
    */
   public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
-              final Configuration conf, final LogRollListener listener)
+              final Configuration conf)
   throws IOException {
-    this(fs, dir, oldLogDir, conf, listener, null, null);
+    this(fs, dir, oldLogDir, conf, null, null);
   }
 
   /**
@@ -305,22 +302,27 @@ public class HLog implements Syncable {
    * @param dir path to where hlogs are stored
    * @param oldLogDir path to where hlogs are archived
    * @param conf configuration to use
-   * @param listener listerner used to request log rolls
-   * @param actionListener optional listener for hlog actions like archiving
+   * @param listeners Listeners on WAL events. Listeners passed here will
+   * be registered before we do anything else; e.g. the
+   * Constructor {@link #rollWriter().
    * @param prefix should always be hostname and port in distributed env and
    *        it will be URL encoded before being used.
    *        If prefix is null, "hlog" will be used
    * @throws IOException
    */
   public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
-              final Configuration conf, final LogRollListener listener,
-              final LogActionsListener actionListener, final String prefix)
+    final Configuration conf, final List<WALObserver> listeners,
+    final String prefix)
   throws IOException {
     super();
     this.fs = fs;
     this.dir = dir;
     this.conf = conf;
-    this.listener = listener;
+    if (listeners != null) {
+      for (WALObserver i: listeners) {
+        registerWALActionsListener(i);
+      }
+    }
     this.flushlogentries =
       conf.getInt("hbase.regionserver.flushlogentries", 1);
     this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
@@ -346,9 +348,6 @@ public class HLog implements Syncable {
       ", enabled=" + this.enabled +
       ", flushlogentries=" + this.flushlogentries +
       ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
-    if (actionListener != null) {
-      addLogActionsListerner(actionListener);
-    }
     // If prefix is null||empty then just name it hlog
     this.prefix = prefix == null || prefix.isEmpty() ?
         "hlog" : URLEncoder.encode(prefix, "UTF8");
@@ -357,22 +356,26 @@ public class HLog implements Syncable {
 
     // handle the reflection necessary to call getNumCurrentReplicas()
     this.getNumCurrentReplicas = null;
-    if(this.hdfs_out != null) {
+    Exception exception = null;
+    if (this.hdfs_out != null) {
       try {
         this.getNumCurrentReplicas = this.hdfs_out.getClass().
           getMethod("getNumCurrentReplicas", new Class<?> []{});
         this.getNumCurrentReplicas.setAccessible(true);
       } catch (NoSuchMethodException e) {
         // Thrown if getNumCurrentReplicas() function isn't available
+        exception = e;
       } catch (SecurityException e) {
         // Thrown if we can't get access to getNumCurrentReplicas()
+        exception = e;
         this.getNumCurrentReplicas = null; // could happen on setAccessible()
       }
     }
-    if(this.getNumCurrentReplicas != null) {
+    if (this.getNumCurrentReplicas != null) {
       LOG.info("Using getNumCurrentReplicas--HDFS-826");
     } else {
-      LOG.info("getNumCurrentReplicas--HDFS-826 not available" );
+      LOG.info("getNumCurrentReplicas--HDFS-826 not available; hdfs_out=" +
+        this.hdfs_out + ", exception=" + exception.getMessage());
     }
 
     logSyncerThread = new LogSyncer(this.optionalFlushInterval);
@@ -380,6 +383,14 @@ public class HLog implements Syncable {
         Thread.currentThread().getName() + ".logSyncer");
   }
 
+  public void registerWALActionsListener (final WALObserver listener) {
+    this.listeners.add(listener);
+  }
+
+  public boolean unregisterWALActionsListener(final WALObserver listener) {
+    return this.listeners.remove(listener);
+  }
+
   /**
    * @return Current state of the monotonically increasing file id.
    */
@@ -431,7 +442,8 @@ public class HLog implements Syncable {
    * for the lock on this and consequently never release the cacheFlushLock
    *
    * @return If lots of logs, flush the returned regions so next time through
-   * we can clean logs. Returns null if nothing to flush.
+   * we can clean logs. Returns null if nothing to flush.  Names are actual
+   * region names as returned by {@link HRegionInfo#getRegionName()}
    * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
    * @throws IOException
    */
@@ -477,14 +489,14 @@ public class HLog implements Syncable {
         this.numEntries.set(0);
       }
       // Tell our listeners that a new log was created
-      if (!this.actionListeners.isEmpty()) {
-        for (LogActionsListener list : this.actionListeners) {
-          list.logRolled(newPath);
+      if (!this.listeners.isEmpty()) {
+        for (WALObserver i : this.listeners) {
+          i.logRolled(newPath);
         }
       }
       // Can we delete any of the old log files?
       if (this.outputfiles.size() > 0) {
-        if (this.lastSeqWritten.size() <= 0) {
+        if (this.lastSeqWritten.isEmpty()) {
           LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
           // If so, then no new writes have come in since all regions were
           // flushed (and removed from the lastSeqWritten map). Means can
@@ -559,7 +571,8 @@ public class HLog implements Syncable {
   /*
    * Clean up old commit logs.
    * @return If lots of logs, flush the returned region so next time through
-   * we can clean logs. Returns null if nothing to flush.
+   * we can clean logs. Returns null if nothing to flush.  Returns array of
+   * encoded region names to flush.
    * @throws IOException
    */
   private byte [][] cleanOldLogs() throws IOException {
@@ -586,10 +599,12 @@ public class HLog implements Syncable {
     }
 
     // If too many log files, figure which regions we need to flush.
+    // Array is an array of encoded region names.
     byte [][] regions = null;
     int logCount = this.outputfiles.size() - logsToRemove;
     if (logCount > this.maxLogs && this.outputfiles != null &&
         this.outputfiles.size() > 0) {
+      // This is an array of encoded region names.
       regions = findMemstoresWithEditsOlderThan(this.outputfiles.firstKey(),
         this.lastSeqWritten);
       StringBuilder sb = new StringBuilder();
@@ -633,6 +648,10 @@ public class HLog implements Syncable {
     return Collections.min(this.lastSeqWritten.values());
   }
 
+  /**
+   * @param oldestOutstandingSeqNum
+   * @return (Encoded) name of oldest outstanding region.
+   */
   private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
     byte [] oldestRegion = null;
     for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
@@ -789,7 +808,6 @@ public class HLog implements Syncable {
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
     }
-    byte [] regionName = regionInfo.getRegionName();
     synchronized (updateLock) {
       long seqNum = obtainSeqNum();
       logKey.setLogSeqNum(seqNum);
@@ -798,7 +816,8 @@ public class HLog implements Syncable {
       // memstore). When the cache is flushed, the entry for the
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
-      this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
+      this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
+        Long.valueOf(seqNum));
       doWrite(regionInfo, logKey, logEdit);
       this.unflushedEntries.incrementAndGet();
       this.numEntries.incrementAndGet();
@@ -809,8 +828,8 @@ public class HLog implements Syncable {
   }
 
   /**
-   * Append a set of edits to the log. Log edits are keyed by regionName,
-   * rowname, and log-sequence-id.
+   * Append a set of edits to the log. Log edits are keyed by (encoded)
+   * regionName, rowname, and log-sequence-id.
    *
    * Later, if we sort by these keys, we obtain all the relevant edits for a
    * given key-range of the HRegion (TODO). Any edits that do not have a
@@ -835,8 +854,6 @@ public class HLog implements Syncable {
     final long now)
   throws IOException {
     if (edits.isEmpty()) return;
-    
-    byte[] regionName = info.getRegionName();
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
     }
@@ -847,8 +864,11 @@ public class HLog implements Syncable {
       // memstore). . When the cache is flushed, the entry for the
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
-      this.lastSeqWritten.putIfAbsent(regionName, seqNum);
-      HLogKey logKey = makeKey(regionName, tableName, seqNum, now);
+      // Use encoded name.  Its shorter, guaranteed unique and a subset of
+      // actual  name.
+      byte [] hriKey = info.getEncodedNameAsBytes();
+      this.lastSeqWritten.putIfAbsent(hriKey, seqNum);
+      HLogKey logKey = makeKey(hriKey, tableName, seqNum, now);
       doWrite(info, logKey, edits);
       this.numEntries.incrementAndGet();
 
@@ -1045,8 +1065,10 @@ public class HLog implements Syncable {
   }
 
   private void requestLogRoll() {
-    if (this.listener != null) {
-      this.listener.logRollRequested();
+    if (!this.listeners.isEmpty()) {
+      for (WALObserver i: this.listeners) {
+        i.logRollRequested();
+      }
     }
   }
 
@@ -1055,9 +1077,9 @@ public class HLog implements Syncable {
     if (!this.enabled) {
       return;
     }
-    if (!this.logEntryVisitors.isEmpty()) {
-      for (LogEntryVisitor visitor : this.logEntryVisitors) {
-        visitor.visitLogEntryBeforeWrite(info, logKey, logEdit);
+    if (!this.listeners.isEmpty()) {
+      for (WALObserver i: this.listeners) {
+        i.visitLogEntryBeforeWrite(info, logKey, logEdit);
       }
     }
     try {
@@ -1117,14 +1139,13 @@ public class HLog implements Syncable {
    *
    * Protected by cacheFlushLock
    *
-   * @param regionName
+   * @param encodedRegionName
    * @param tableName
    * @param logSeqId
    * @throws IOException
    */
-  public void completeCacheFlush(final byte [] regionName, final byte [] tableName,
-    final long logSeqId,
-    final boolean isMetaRegion)
+  public void completeCacheFlush(final byte [] encodedRegionName,
+      final byte [] tableName, final long logSeqId, final boolean isMetaRegion)
   throws IOException {
     try {
       if (this.closed) {
@@ -1133,15 +1154,15 @@ public class HLog implements Syncable {
       synchronized (updateLock) {
         long now = System.currentTimeMillis();
         WALEdit edit = completeCacheFlushLogEdit();
-        HLogKey key = makeKey(regionName, tableName, logSeqId,
+        HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
             System.currentTimeMillis());
         this.writer.append(new Entry(key, edit));
         writeTime += System.currentTimeMillis() - now;
         writeOps++;
         this.numEntries.incrementAndGet();
-        Long seq = this.lastSeqWritten.get(regionName);
+        Long seq = this.lastSeqWritten.get(encodedRegionName);
         if (seq != null && logSeqId >= seq.longValue()) {
-          this.lastSeqWritten.remove(regionName);
+          this.lastSeqWritten.remove(encodedRegionName);
         }
       }
       // sync txn to file system
@@ -1560,7 +1581,7 @@ public class HLog implements Syncable {
     try {
       Entry entry;
       while ((entry = in.next()) != null) {
-        byte[] region = entry.getKey().getRegionName();
+        byte[] region = entry.getKey().getEncodedRegionName();
         LinkedList<Entry> queue = splitLogsMap.get(region);
         if (queue == null) {
           queue = new LinkedList<Entry>();
@@ -1684,7 +1705,7 @@ public class HLog implements Syncable {
     Path tableDir = HTableDescriptor.getTableDir(rootDir,
       logEntry.getKey().getTablename());
     Path regiondir = HRegion.getRegionDir(tableDir,
-      HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
+      HRegionInfo.encodeRegionName(logEntry.getKey().getEncodedRegionName()));
     Path dir = getRegionDirRecoveredEditsDir(regiondir);
     if (!fs.exists(dir)) {
       if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
@@ -1761,32 +1782,6 @@ public class HLog implements Syncable {
     return new Path(regiondir, RECOVERED_EDITS_DIR);
   }
 
-  /**
-   *
-   * @param visitor
-   */
-  public void addLogEntryVisitor(LogEntryVisitor visitor) {
-    this.logEntryVisitors.add(visitor);
-  }
-
-  /**
-   * 
-   * @param visitor
-   */
-  public void removeLogEntryVisitor(LogEntryVisitor visitor) {
-    this.logEntryVisitors.remove(visitor);
-  }
-
-
-  public void addLogActionsListerner(LogActionsListener list) {
-    LOG.info("Adding a listener");
-    this.actionListeners.add(list);
-  }
-
-  public boolean removeLogActionsListener(LogActionsListener list) {
-    return this.actionListeners.remove(list);
-  }
-
   private static void usage() {
     System.err.println("Usage: java org.apache.hbase.HLog" +
         " {--dump <logfile>... | --split <logdir>...}");
@@ -1848,5 +1843,4 @@ public class HLog implements Syncable {
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
       ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
-
-}
+}
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java Fri Aug 27 20:53:15 2010
@@ -39,7 +39,8 @@ import org.apache.hadoop.io.WritableComp
  * associated row.
  */
 public class HLogKey implements WritableComparable<HLogKey> {
-  private byte [] regionName;
+  //  The encoded region name.
+  private byte [] encodedRegionName;
   private byte [] tablename;
   private long logSeqNum;
   // Time at which this edit was written.
@@ -57,27 +58,24 @@ public class HLogKey implements Writable
    * We maintain the tablename mainly for debugging purposes.
    * A regionName is always a sub-table object.
    *
-   * @param regionName  - name of region
+   * @param encodedRegionName Encoded name of the region as returned by
+   * {@link HRegionInfo#getEncodedNameAsBytes()}.
    * @param tablename   - name of table
    * @param logSeqNum   - log sequence number
    * @param now Time at which this edit was written.
    */
-  public HLogKey(final byte [] regionName, final byte [] tablename,
+  public HLogKey(final byte [] encodedRegionName, final byte [] tablename,
       long logSeqNum, final long now) {
-    this.regionName = regionName;
+    this.encodedRegionName = encodedRegionName;
     this.tablename = tablename;
     this.logSeqNum = logSeqNum;
     this.writeTime = now;
     this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
   }
 
-  //////////////////////////////////////////////////////////////////////////////
-  // A bunch of accessors
-  //////////////////////////////////////////////////////////////////////////////
-
-  /** @return region name */
-  public byte [] getRegionName() {
-    return regionName;
+  /** @return encoded region name */
+  public byte [] getEncodedRegionName() {
+    return encodedRegionName;
   }
 
   /** @return table name */
@@ -119,7 +117,7 @@ public class HLogKey implements Writable
 
   @Override
   public String toString() {
-    return Bytes.toString(tablename) + "/" + Bytes.toString(regionName) + "/" +
+    return Bytes.toString(tablename) + "/" + Bytes.toString(encodedRegionName) + "/" +
       logSeqNum;
   }
 
@@ -136,7 +134,7 @@ public class HLogKey implements Writable
 
   @Override
   public int hashCode() {
-    int result = Bytes.hashCode(this.regionName);
+    int result = Bytes.hashCode(this.encodedRegionName);
     result ^= this.logSeqNum;
     result ^= this.writeTime;
     result ^= this.clusterId;
@@ -144,7 +142,7 @@ public class HLogKey implements Writable
   }
 
   public int compareTo(HLogKey o) {
-    int result = Bytes.compareTo(this.regionName, o.regionName);
+    int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName);
     if (result == 0) {
       if (this.logSeqNum < o.logSeqNum) {
         result = -1;
@@ -163,7 +161,7 @@ public class HLogKey implements Writable
   }
 
   public void write(DataOutput out) throws IOException {
-    Bytes.writeByteArray(out, this.regionName);
+    Bytes.writeByteArray(out, this.encodedRegionName);
     Bytes.writeByteArray(out, this.tablename);
     out.writeLong(this.logSeqNum);
     out.writeLong(this.writeTime);
@@ -171,7 +169,7 @@ public class HLogKey implements Writable
   }
 
   public void readFields(DataInput in) throws IOException {
-    this.regionName = Bytes.readByteArray(in);
+    this.encodedRegionName = Bytes.readByteArray(in);
     this.tablename = Bytes.readByteArray(in);
     this.logSeqNum = in.readLong();
     this.writeTime = in.readLong();
@@ -181,5 +179,4 @@ public class HLogKey implements Writable
       // Means it's an old key, just continue
     }
   }
-
-}
+}
\ No newline at end of file

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java?rev=990266&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java Fri Aug 27 20:53:15 2010
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+
+/**
+ * Get notification of {@link HLog}/WAL log events. The invocations are inline
+ * so make sure your implementation is fast else you'll slow hbase.
+ */
+public interface WALObserver {
+  /**
+   * The WAL was rolled.
+   * @param newFile the path to the new hlog
+   */
+  public void logRolled(Path newFile);
+
+  /**
+   * A request was made that the WAL be rolled.
+   */
+  public void logRollRequested();
+
+  /**
+  * Called before each write.
+  * @param info
+  * @param logKey
+  * @param logEdit
+  */
+ public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+   WALEdit logEdit);
+}

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java Fri Aug 27 20:53:15 2010
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -70,425 +71,422 @@ import java.util.concurrent.atomic.Atomi
  * </pre>
  */
 public class ReplicationZookeeperWrapper {
-  // REENABLE
+  private static final Log LOG =
+    LogFactory.getLog(ReplicationZookeeperWrapper.class);
+  // Name of znode we use to lock when failover
+  private final static String RS_LOCK_ZNODE = "lock";
+  // Our handle on zookeeper
+  private final ZooKeeperWatcher zookeeper;
+  // Map of addresses of peer clusters with their ZKW
+  private final Map<String, ReplicationZookeeperWrapper> peerClusters;
+  // Path to the root replication znode
+  private final String replicationZNode;
+  // Path to the peer clusters znode
+  private final String peersZNode;
+  // Path to the znode that contains all RS that replicates
+  private final String rsZNode;
+  // Path to this region server's name under rsZNode
+  private final String rsServerNameZnode;
+  // Name node if the replicationState znode
+  private final String replicationStateNodeName;
+  // If this RS is part of a master cluster
+  private final boolean replicationMaster;
+  private final Configuration conf;
+  // Is this cluster replicating at the moment?
+  private final AtomicBoolean replicating;
+  // Byte (stored as string here) that identifies this cluster
+  private final String clusterId;
 
-//  private static final Log LOG =
-//      LogFactory.getLog(ReplicationZookeeperWrapper.class);
-//  // Name of znode we use to lock when failover
-//  private final static String RS_LOCK_ZNODE = "lock";
-//  /*
-//  // Our handle on zookeeper
-//  private final ZooKeeperWrapper zookeeperWrapper;
-//  // Map of addresses of peer clusters with their ZKW
-//  private final Map<String, ZooKeeperWrapper> peerClusters;*/
-//  // Path to the root replication znode
-//  private final String replicationZNode;
-//  // Path to the peer clusters znode
-//  private final String peersZNode;
-//  // Path to the znode that contains all RS that replicates
-//  private final String rsZNode;
-//  // Path to this region server's name under rsZNode
-//  private final String rsServerNameZnode;
-//  // Name node if the replicationState znode
-//  private final String replicationStateNodeName;
-//  // If this RS is part of a master cluster
-//  private final boolean replicationMaster;
-//  private final Configuration conf;
-//  // Is this cluster replicating at the moment?
-//  private final AtomicBoolean replicating;
-//  // Byte (stored as string here) that identifies this cluster
-//  private final String clusterId;
-//
-//  /**
-//   * Constructor used by region servers, connects to the peer cluster right away.
-//   *
-//   * @param zookeeperWrapper zkw to wrap
-//   * @param conf             conf to use
-//   * @param replicating    atomic boolean to start/stop replication
-//   * @param rsName      the name of this region server, null if
-//   *                         using RZH only to use the helping methods
-//   * @throws IOException
-//   */
-//  public ReplicationZookeeperWrapper(
-//      ZooKeeperWrapper zookeeperWrapper, Configuration conf,
-//      final AtomicBoolean replicating, String rsName) throws IOException {
-//    this.zookeeperWrapper = zookeeperWrapper;
-//    this.conf = conf;
-//    String replicationZNodeName =
-//        conf.get("zookeeper.znode.replication", "replication");
-//    String peersZNodeName =
-//        conf.get("zookeeper.znode.replication.peers", "peers");
-//    String repMasterZNodeName =
-//        conf.get("zookeeper.znode.replication.master", "master");
-//    this.replicationStateNodeName =
-//        conf.get("zookeeper.znode.replication.state", "state");
-//    String clusterIdZNodeName =
-//        conf.get("zookeeper.znode.replication.clusterId", "clusterId");
-//    String rsZNodeName =
-//        conf.get("zookeeper.znode.replication.rs", "rs");
-//    String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
-//          this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
-//          this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
-//
-//    this.peerClusters = new HashMap<String, ZooKeeperWrapper>();
-//    this.replicationZNode = zookeeperWrapper.getZNode(
-//        zookeeperWrapper.getParentZNode(), replicationZNodeName);
-//    this.peersZNode =
-//        zookeeperWrapper.getZNode(replicationZNode, peersZNodeName);
-//    this.rsZNode =
-//        zookeeperWrapper.getZNode(replicationZNode, rsZNodeName);
-//
-//    this.replicating = replicating;
-//    setReplicating();
-//    String idResult = Bytes.toString(
-//        this.zookeeperWrapper.getData(this.replicationZNode,
-//        clusterIdZNodeName));
-//    this.clusterId =
-//        idResult == null ?
-//            Byte.toString(HConstants.DEFAULT_CLUSTER_ID) : idResult;
-//    String address = Bytes.toString(
-//        this.zookeeperWrapper.getData(this.replicationZNode,
-//          repMasterZNodeName));
-//    this.replicationMaster = thisCluster.equals(address);
-//    LOG.info("This cluster (" + thisCluster + ") is a "
-//          + (this.replicationMaster ? "master" : "slave") + " for replication" +
-//          ", compared with (" + address + ")");
-//    if (rsName != null) {
-//      this.rsServerNameZnode =
-//          this.zookeeperWrapper.getZNode(rsZNode, rsName);
-//      List<String> znodes = this.zookeeperWrapper.listZnodes(this.peersZNode,
-//          new ReplicationStatusWatcher());
-//      if (znodes != null) {
-//        for (String znode : znodes) {
-//          connectToPeer(znode);
-//        }
-//      }
-//    } else {
-//      this.rsServerNameZnode = null;
-//    }
-//
-//  }
-//
-//  /**
-//   * Returns all region servers from given peer
-//   *
-//   * @param peerClusterId (byte) the cluster to interrogate
-//   * @return addresses of all region servers
-//   */
-//  public List<HServerAddress> getPeersAddresses(String peerClusterId) {
-//    if (this.peerClusters.size() == 0) {
-//      return new ArrayList<HServerAddress>(0);
-//    }
-//    ZooKeeperWrapper zkw = this.peerClusters.get(peerClusterId);
-//    return zkw == null?
-//        new ArrayList<HServerAddress>(0) : zkw.scanRSDirectory();
-//  }
-//
-//  /**
-//   * This method connects this cluster to another one and registers it
-//   * in this region server's replication znode
-//   * @param peerId id of the peer cluster
-//   */
-//  private void connectToPeer(String peerId) throws IOException {
-//    String[] ensemble =
-//        Bytes.toString(this.zookeeperWrapper.getData(this.peersZNode, peerId)).
-//            split(":");
-//    if (ensemble.length != 3) {
-//      throw new IllegalArgumentException("Wrong format of cluster address: " +
-//          this.zookeeperWrapper.getData(this.peersZNode, peerId));
-//    }
-//    Configuration otherConf = new Configuration(this.conf);
-//    otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
-//    otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
-//    otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
-//    ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf,
-//        "connection to cluster: " + peerId);
-//    zkw.registerListener(new ReplicationStatusWatcher());
-//    this.peerClusters.put(peerId, zkw);
-//    this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode(
-//        this.rsServerNameZnode, peerId));
-//    LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
-//  }
-//
-//  /**
-//   * This reads the state znode for replication and sets the atomic boolean
-//   */
-//  private void setReplicating() {
-//    String value = Bytes.toString(this.zookeeperWrapper.getDataAndWatch(
-//        this.replicationZNode, this.replicationStateNodeName,
-//        new ReplicationStatusWatcher()));
-//    if (value != null) {
-//      this.replicating.set(value.equals("true"));
-//      LOG.info("Replication is now " + (this.replicating.get() ?
-//          "started" : "stopped"));
-//    }
-//  }
-//
-//  /**
-//   * Add a new log to the list of hlogs in zookeeper
-//   * @param filename name of the hlog's znode
-//   * @param clusterId name of the cluster's znode
-//   */
-//  public void addLogToList(String filename, String clusterId) {
-//    try {
-//      this.zookeeperWrapper.writeZNode(
-//          this.zookeeperWrapper.getZNode(
-//              this.rsServerNameZnode, clusterId), filename, "");
-//    } catch (InterruptedException e) {
-//      LOG.error(e);
-//    } catch (KeeperException e) {
-//      LOG.error(e);
-//    }
-//  }
-//
-//  /**
-//   * Remove a log from the list of hlogs in zookeeper
-//   * @param filename name of the hlog's znode
-//   * @param clusterId name of the cluster's znode
-//   */
-//  public void removeLogFromList(String filename, String clusterId) {
-//    try {
-//      this.zookeeperWrapper.deleteZNode(
-//          this.zookeeperWrapper.getZNode(this.rsServerNameZnode,
-//              this.zookeeperWrapper.getZNode(clusterId, filename)));
-//    } catch (InterruptedException e) {
-//      LOG.error(e);
-//    } catch (KeeperException e) {
-//      LOG.error(e);
-//    }
-//  }
-//
-//  /**
-//   * Set the current position of the specified cluster in the current hlog
-//   * @param filename filename name of the hlog's znode
-//   * @param clusterId clusterId name of the cluster's znode
-//   * @param position the position in the file
-//   * @throws IOException
-//   */
-//  public void writeReplicationStatus(String filename, String clusterId,
-//                                     long position) {
-//    try {
-//      String clusterZNode = this.zookeeperWrapper.getZNode(
-//          this.rsServerNameZnode, clusterId);
-//      this.zookeeperWrapper.writeZNode(clusterZNode, filename,
-//          Long.toString(position));
-//    } catch (InterruptedException e) {
-//      LOG.error(e);
-//    } catch (KeeperException e) {
-//      LOG.error(e);
-//    }
-//  }
-//
-//  /**
-//   * Get a list of all the other region servers in this cluster
-//   * and set a watch
-//   * @param watch the watch to set
-//   * @return a list of server nanes
-//   */
-//  public List<String> getRegisteredRegionServers(Watcher watch) {
-//    return this.zookeeperWrapper.listZnodes(
-//        this.zookeeperWrapper.getRsZNode(), watch);
-//  }
-//
-//  /**
-//   * Get the list of the replicators that have queues, they can be alive, dead
-//   * or simply from a previous run
-//   * @param watch the watche to set
-//   * @return a list of server names
-//   */
-//  public List<String> getListOfReplicators(Watcher watch) {
-//    return this.zookeeperWrapper.listZnodes(rsZNode, watch);
-//  }
-//
-//  /**
-//   * Get the list of peer clusters for the specified server names
-//   * @param rs server names of the rs
-//   * @param watch the watch to set
-//   * @return a list of peer cluster
-//   */
-//  public List<String> getListPeersForRS(String rs, Watcher watch) {
-//    return this.zookeeperWrapper.listZnodes(
-//        zookeeperWrapper.getZNode(rsZNode, rs), watch);
-//  }
-//
-//  /**
-//   * Get the list of hlogs for the specified region server and peer cluster
-//   * @param rs server names of the rs
-//   * @param id peer cluster
-//   * @param watch the watch to set
-//   * @return a list of hlogs
-//   */
-//  public List<String> getListHLogsForPeerForRS(String rs, String id, Watcher watch) {
-//    return this.zookeeperWrapper.listZnodes(
-//        zookeeperWrapper.getZNode(zookeeperWrapper.getZNode(rsZNode, rs), id), watch);
-//  }
-//
-//  /**
-//   * Try to set a lock in another server's znode.
-//   * @param znode the server names of the other server
-//   * @return true if the lock was acquired, false in every other cases
-//   */
-//  public boolean lockOtherRS(String znode) {
-//    try {
-//      this.zookeeperWrapper.writeZNode(
-//          this.zookeeperWrapper.getZNode(this.rsZNode, znode),
-//          RS_LOCK_ZNODE, rsServerNameZnode, true);
-//
-//    } catch (InterruptedException e) {
-//      LOG.error(e);
-//      return false;
-//    } catch (KeeperException e) {
-//      LOG.debug("Won't lock " + znode + " because " + e.getMessage());
-//      // TODO see if the other still exists!!
-//      return false;
-//    }
-//    return true;
-//  }
-//
-//  /**
-//   * This methods copies all the hlogs queues from another region server
-//   * and returns them all sorted per peer cluster (appended with the dead
-//   * server's znode)
-//   * @param znode server names to copy
-//   * @return all hlogs for all peers of that cluster, null if an error occurred
-//   */
-//  public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
-//    // TODO this method isn't atomic enough, we could start copying and then
-//    // TODO fail for some reason and we would end up with znodes we don't want.
-//    SortedMap<String,SortedSet<String>> queues =
-//        new TreeMap<String,SortedSet<String>>();
-//    try {
-//      String nodePath = this.zookeeperWrapper.getZNode(rsZNode, znode);
-//      List<String> clusters = this.zookeeperWrapper.listZnodes(nodePath, null);
-//      // We have a lock znode in there, it will count as one.
-//      if (clusters == null || clusters.size() <= 1) {
-//        return queues;
-//      }
-//      // The lock isn't a peer cluster, remove it
-//      clusters.remove(RS_LOCK_ZNODE);
-//      for (String cluster : clusters) {
-//        // We add the name of the recovered RS to the new znode, we can even
-//        // do that for queues that were recovered 10 times giving a znode like
-//        // number-startcode-number-otherstartcode-number-anotherstartcode-etc
-//        String newCluster = cluster+"-"+znode;
-//        String newClusterZnode =
-//            this.zookeeperWrapper.getZNode(rsServerNameZnode, newCluster);
-//        this.zookeeperWrapper.ensureExists(newClusterZnode);
-//        String clusterPath = this.zookeeperWrapper.getZNode(nodePath, cluster);
-//        List<String> hlogs = this.zookeeperWrapper.listZnodes(clusterPath, null);
-//        // That region server didn't have anything to replicate for this cluster
-//        if (hlogs == null || hlogs.size() == 0) {
-//          continue;
-//        }
-//        SortedSet<String> logQueue = new TreeSet<String>();
-//        queues.put(newCluster, logQueue);
-//        for (String hlog : hlogs) {
-//          String position = Bytes.toString(
-//              this.zookeeperWrapper.getData(clusterPath, hlog));
-//          LOG.debug("Creating " + hlog + " with data " + position);
-//          this.zookeeperWrapper.writeZNode(newClusterZnode, hlog, position);
-//          logQueue.add(hlog);
-//        }
-//      }
-//    } catch (InterruptedException e) {
-//      LOG.warn(e);
-//      return null;
-//    } catch (KeeperException e) {
-//      LOG.warn(e);
-//      return null;
-//    }
-//    return queues;
-//  }
-//
-//  /**
-//   * Delete a complete queue of hlogs
-//   * @param peerZnode znode of the peer cluster queue of hlogs to delete
-//   */
-//  public void deleteSource(String peerZnode) {
-//    try {
-//      this.zookeeperWrapper.deleteZNode(
-//          this.zookeeperWrapper.getZNode(rsServerNameZnode, peerZnode), true);
-//    } catch (InterruptedException e) {
-//      LOG.error(e);
-//    } catch (KeeperException e) {
-//      LOG.error(e);
-//    }
-//  }
-//
-//  /**
-//   * Recursive deletion of all znodes in specified rs' znode
-//   * @param znode
-//   */
-//  public void deleteRsQueues(String znode) {
-//    try {
-//      this.zookeeperWrapper.deleteZNode(
-//          this.zookeeperWrapper.getZNode(rsZNode, znode), true);
-//    } catch (InterruptedException e) {
-//      LOG.error(e);
-//    } catch (KeeperException e) {
-//      LOG.error(e);
-//    }
-//  }
-//
-//  /**
-//   * Delete this cluster's queues
-//   */
-//  public void deleteOwnRSZNode() {
-//    deleteRsQueues(this.rsServerNameZnode);
-//  }
-//
-//  /**
-//   * Get the position of the specified hlog in the specified peer znode
-//   * @param peerId znode of the peer cluster
-//   * @param hlog name of the hlog
-//   * @return the position in that hlog
-//   */
-//  public long getHLogRepPosition(String peerId, String hlog) {
-//    String clusterZnode =
-//        this.zookeeperWrapper.getZNode(rsServerNameZnode, peerId);
-//    String data = Bytes.toString(
-//        this.zookeeperWrapper.getData(clusterZnode, hlog));
-//    return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
-//  }
-//
-//  /**
-//   * Tells if this cluster replicates or not
-//   *
-//   * @return if this is a master
-//   */
-//  public boolean isReplicationMaster() {
-//    return this.replicationMaster;
-//  }
-//
-//  /**
-//   * Get the identification of the cluster
-//   *
-//   * @return the id for the cluster
-//   */
-//  public String getClusterId() {
-//    return this.clusterId;
-//  }
-//
-//  /**
-//   * Get a map of all peer clusters
-//   * @return map of peer cluster, zk address to ZKW
-//   */
-//  public Map<String, ZooKeeperWrapper> getPeerClusters() {
-//    return this.peerClusters;
-//  }
-//
-//  /**
-//   * Watcher for the status of the replication
-//   */
-//  public class ReplicationStatusWatcher implements Watcher {
-//    @Override
-//    public void process(WatchedEvent watchedEvent) {
-//      Event.EventType type = watchedEvent.getType();
-//      LOG.info("Got event " + type + " with path " + watchedEvent.getPath());
-//      if (type.equals(Event.EventType.NodeDataChanged)) {
-//        setReplicating();
-//      }
-//    }
-//  }
+  /**
+   * Constructor used by region servers, connects to the peer cluster right away.
+   *
+   * @param zookeeper
+   * @param conf             conf to use
+   * @param replicating    atomic boolean to start/stop replication
+   * @param rsName      the name of this region server, null if
+   *                         using RZH only to use the helping methods
+   * @throws IOException
+   */
+  public ReplicationZookeeperWrapper(final ZooKeeperWatcher zookeeper,
+      final Configuration conf,
+      final AtomicBoolean replicating, String rsName) throws IOException {
+    this.zookeeper = zookeeper;
+    this.conf = conf;
+    String replicationZNodeName =
+        conf.get("zookeeper.znode.replication", "replication");
+    String peersZNodeName =
+        conf.get("zookeeper.znode.replication.peers", "peers");
+    String repMasterZNodeName =
+        conf.get("zookeeper.znode.replication.master", "master");
+    this.replicationStateNodeName =
+        conf.get("zookeeper.znode.replication.state", "state");
+    String clusterIdZNodeName =
+        conf.get("zookeeper.znode.replication.clusterId", "clusterId");
+    String rsZNodeName =
+        conf.get("zookeeper.znode.replication.rs", "rs");
+    String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
+          this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
+          this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+
+    this.peerClusters = new HashMap<String, ReplicationZookeeperWrapper>();
+    this.replicationZNode = this.zookeeper.zookeeperWrapper.getZNode(
+        zookeeperWrapper.getParentZNode(), replicationZNodeName);
+    this.peersZNode =
+        zookeeperWrapper.getZNode(replicationZNode, peersZNodeName);
+    this.rsZNode =
+        zookeeperWrapper.getZNode(replicationZNode, rsZNodeName);
+
+    this.replicating = replicating;
+    setReplicating();
+    String idResult = Bytes.toString(
+        this.zookeeperWrapper.getData(this.replicationZNode,
+        clusterIdZNodeName));
+    this.clusterId =
+        idResult == null ?
+            Byte.toString(HConstants.DEFAULT_CLUSTER_ID) : idResult;
+    String address = Bytes.toString(
+        this.zookeeperWrapper.getData(this.replicationZNode,
+          repMasterZNodeName));
+    this.replicationMaster = thisCluster.equals(address);
+    LOG.info("This cluster (" + thisCluster + ") is a "
+          + (this.replicationMaster ? "master" : "slave") + " for replication" +
+          ", compared with (" + address + ")");
+    if (rsName != null) {
+      this.rsServerNameZnode =
+          this.zookeeperWrapper.getZNode(rsZNode, rsName);
+      List<String> znodes = this.zookeeperWrapper.listZnodes(this.peersZNode,
+          new ReplicationStatusWatcher());
+      if (znodes != null) {
+        for (String znode : znodes) {
+          connectToPeer(znode);
+        }
+      }
+    } else {
+      this.rsServerNameZnode = null;
+    }
+
+  }
+
+  /**
+   * Returns all region servers from given peer
+   *
+   * @param peerClusterId (byte) the cluster to interrogate
+   * @return addresses of all region servers
+   */
+  public List<HServerAddress> getPeersAddresses(String peerClusterId) {
+    if (this.peerClusters.size() == 0) {
+      return new ArrayList<HServerAddress>(0);
+    }
+    ZooKeeperWrapper zkw = this.peerClusters.get(peerClusterId);
+    return zkw == null?
+        new ArrayList<HServerAddress>(0) : zkw.scanRSDirectory();
+  }
+
+  /**
+   * This method connects this cluster to another one and registers it
+   * in this region server's replication znode
+   * @param peerId id of the peer cluster
+   */
+  private void connectToPeer(String peerId) throws IOException {
+    String[] ensemble =
+        Bytes.toString(this.zookeeperWrapper.getData(this.peersZNode, peerId)).
+            split(":");
+    if (ensemble.length != 3) {
+      throw new IllegalArgumentException("Wrong format of cluster address: " +
+          this.zookeeperWrapper.getData(this.peersZNode, peerId));
+    }
+    Configuration otherConf = new Configuration(this.conf);
+    otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
+    otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
+    otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
+    ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf,
+        "connection to cluster: " + peerId);
+    zkw.registerListener(new ReplicationStatusWatcher());
+    this.peerClusters.put(peerId, zkw);
+    this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode(
+        this.rsServerNameZnode, peerId));
+    LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
+  }
+
+  /**
+   * This reads the state znode for replication and sets the atomic boolean
+   */
+  private void setReplicating() {
+    String value = Bytes.toString(this.zookeeperWrapper.getDataAndWatch(
+        this.replicationZNode, this.replicationStateNodeName,
+        new ReplicationStatusWatcher()));
+    if (value != null) {
+      this.replicating.set(value.equals("true"));
+      LOG.info("Replication is now " + (this.replicating.get() ?
+          "started" : "stopped"));
+    }
+  }
+
+  /**
+   * Add a new log to the list of hlogs in zookeeper
+   * @param filename name of the hlog's znode
+   * @param clusterId name of the cluster's znode
+   */
+  public void addLogToList(String filename, String clusterId) {
+    try {
+      this.zookeeperWrapper.writeZNode(
+          this.zookeeperWrapper.getZNode(
+              this.rsServerNameZnode, clusterId), filename, "");
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Remove a log from the list of hlogs in zookeeper
+   * @param filename name of the hlog's znode
+   * @param clusterId name of the cluster's znode
+   */
+  public void removeLogFromList(String filename, String clusterId) {
+    try {
+      this.zookeeperWrapper.deleteZNode(
+          this.zookeeperWrapper.getZNode(this.rsServerNameZnode,
+              this.zookeeperWrapper.getZNode(clusterId, filename)));
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Set the current position of the specified cluster in the current hlog
+   * @param filename filename name of the hlog's znode
+   * @param clusterId clusterId name of the cluster's znode
+   * @param position the position in the file
+   * @throws IOException
+   */
+  public void writeReplicationStatus(String filename, String clusterId,
+                                     long position) {
+    try {
+      String clusterZNode = this.zookeeperWrapper.getZNode(
+          this.rsServerNameZnode, clusterId);
+      this.zookeeperWrapper.writeZNode(clusterZNode, filename,
+          Long.toString(position));
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Get a list of all the other region servers in this cluster
+   * and set a watch
+   * @param watch the watch to set
+   * @return a list of server nanes
+   */
+  public List<String> getRegisteredRegionServers(Watcher watch) {
+    return this.zookeeperWrapper.listZnodes(
+        this.zookeeperWrapper.getRsZNode(), watch);
+  }
+
+  /**
+   * Get the list of the replicators that have queues, they can be alive, dead
+   * or simply from a previous run
+   * @param watch the watche to set
+   * @return a list of server names
+   */
+  public List<String> getListOfReplicators(Watcher watch) {
+    return this.zookeeperWrapper.listZnodes(rsZNode, watch);
+  }
+
+  /**
+   * Get the list of peer clusters for the specified server names
+   * @param rs server names of the rs
+   * @param watch the watch to set
+   * @return a list of peer cluster
+   */
+  public List<String> getListPeersForRS(String rs, Watcher watch) {
+    return this.zookeeperWrapper.listZnodes(
+        zookeeperWrapper.getZNode(rsZNode, rs), watch);
+  }
+
+  /**
+   * Get the list of hlogs for the specified region server and peer cluster
+   * @param rs server names of the rs
+   * @param id peer cluster
+   * @param watch the watch to set
+   * @return a list of hlogs
+   */
+  public List<String> getListHLogsForPeerForRS(String rs, String id, Watcher watch) {
+    return this.zookeeperWrapper.listZnodes(
+        zookeeperWrapper.getZNode(zookeeperWrapper.getZNode(rsZNode, rs), id), watch);
+  }
+
+  /**
+   * Try to set a lock in another server's znode.
+   * @param znode the server names of the other server
+   * @return true if the lock was acquired, false in every other cases
+   */
+  public boolean lockOtherRS(String znode) {
+    try {
+      this.zookeeperWrapper.writeZNode(
+          this.zookeeperWrapper.getZNode(this.rsZNode, znode),
+          RS_LOCK_ZNODE, rsServerNameZnode, true);
+
+    } catch (InterruptedException e) {
+      LOG.error(e);
+      return false;
+    } catch (KeeperException e) {
+      LOG.debug("Won't lock " + znode + " because " + e.getMessage());
+      // TODO see if the other still exists!!
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * This methods copies all the hlogs queues from another region server
+   * and returns them all sorted per peer cluster (appended with the dead
+   * server's znode)
+   * @param znode server names to copy
+   * @return all hlogs for all peers of that cluster, null if an error occurred
+   */
+  public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
+    // TODO this method isn't atomic enough, we could start copying and then
+    // TODO fail for some reason and we would end up with znodes we don't want.
+    SortedMap<String,SortedSet<String>> queues =
+        new TreeMap<String,SortedSet<String>>();
+    try {
+      String nodePath = this.zookeeperWrapper.getZNode(rsZNode, znode);
+      List<String> clusters = this.zookeeperWrapper.listZnodes(nodePath, null);
+      // We have a lock znode in there, it will count as one.
+      if (clusters == null || clusters.size() <= 1) {
+        return queues;
+      }
+      // The lock isn't a peer cluster, remove it
+      clusters.remove(RS_LOCK_ZNODE);
+      for (String cluster : clusters) {
+        // We add the name of the recovered RS to the new znode, we can even
+        // do that for queues that were recovered 10 times giving a znode like
+        // number-startcode-number-otherstartcode-number-anotherstartcode-etc
+        String newCluster = cluster+"-"+znode;
+        String newClusterZnode =
+            this.zookeeperWrapper.getZNode(rsServerNameZnode, newCluster);
+        this.zookeeperWrapper.ensureExists(newClusterZnode);
+        String clusterPath = this.zookeeperWrapper.getZNode(nodePath, cluster);
+        List<String> hlogs = this.zookeeperWrapper.listZnodes(clusterPath, null);
+        // That region server didn't have anything to replicate for this cluster
+        if (hlogs == null || hlogs.size() == 0) {
+          continue;
+        }
+        SortedSet<String> logQueue = new TreeSet<String>();
+        queues.put(newCluster, logQueue);
+        for (String hlog : hlogs) {
+          String position = Bytes.toString(
+              this.zookeeperWrapper.getData(clusterPath, hlog));
+          LOG.debug("Creating " + hlog + " with data " + position);
+          this.zookeeperWrapper.writeZNode(newClusterZnode, hlog, position);
+          logQueue.add(hlog);
+        }
+      }
+    } catch (InterruptedException e) {
+      LOG.warn(e);
+      return null;
+    } catch (KeeperException e) {
+      LOG.warn(e);
+      return null;
+    }
+    return queues;
+  }
+
+  /**
+   * Delete a complete queue of hlogs
+   * @param peerZnode znode of the peer cluster queue of hlogs to delete
+   */
+  public void deleteSource(String peerZnode) {
+    try {
+      this.zookeeperWrapper.deleteZNode(
+          this.zookeeperWrapper.getZNode(rsServerNameZnode, peerZnode), true);
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Recursive deletion of all znodes in specified rs' znode
+   * @param znode
+   */
+  public void deleteRsQueues(String znode) {
+    try {
+      this.zookeeperWrapper.deleteZNode(
+          this.zookeeperWrapper.getZNode(rsZNode, znode), true);
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Delete this cluster's queues
+   */
+  public void deleteOwnRSZNode() {
+    deleteRsQueues(this.rsServerNameZnode);
+  }
+
+  /**
+   * Get the position of the specified hlog in the specified peer znode
+   * @param peerId znode of the peer cluster
+   * @param hlog name of the hlog
+   * @return the position in that hlog
+   */
+  public long getHLogRepPosition(String peerId, String hlog) {
+    String clusterZnode =
+        this.zookeeperWrapper.getZNode(rsServerNameZnode, peerId);
+    String data = Bytes.toString(
+        this.zookeeperWrapper.getData(clusterZnode, hlog));
+    return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
+  }
+
+  /**
+   * Tells if this cluster replicates or not
+   *
+   * @return if this is a master
+   */
+  public boolean isReplicationMaster() {
+    return this.replicationMaster;
+  }
+
+  /**
+   * Get the identification of the cluster
+   *
+   * @return the id for the cluster
+   */
+  public String getClusterId() {
+    return this.clusterId;
+  }
+
+  /**
+   * Get a map of all peer clusters
+   * @return map of peer cluster, zk address to ZKW
+   */
+  public Map<String, ReplicationZookeeperWrapper> getPeerClusters() {
+    return this.peerClusters;
+  }
+
+  /**
+   * Watcher for the status of the replication
+   */
+  public class ReplicationStatusWatcher implements Watcher {
+    @Override
+    public void process(WatchedEvent watchedEvent) {
+      Event.EventType type = watchedEvent.getType();
+      LOG.info("Got event " + type + " with path " + watchedEvent.getPath());
+      if (type.equals(Event.EventType.NodeDataChanged)) {
+        setReplicating();
+      }
+    }
+  }
 
 }