You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/14 13:58:21 UTC

svn commit: r1446147 [17/35] - in /hbase/branches/hbase-7290v2: ./ bin/ conf/ dev-support/ hbase-client/ hbase-common/ hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ hbase-common/src/...

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java Thu Feb 14 12:58:12 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.executor.
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
 import org.apache.zookeeper.KeeperException;
@@ -44,7 +45,7 @@ import org.apache.zookeeper.KeeperExcept
 public class OpenRegionHandler extends EventHandler {
   private static final Log LOG = LogFactory.getLog(OpenRegionHandler.class);
 
-  private final RegionServerServices rsServices;
+  protected final RegionServerServices rsServices;
 
   private final HRegionInfo regionInfo;
   private final HTableDescriptor htd;
@@ -85,30 +86,50 @@ public class OpenRegionHandler extends E
 
   @Override
   public void process() throws IOException {
+    boolean openSuccessful = false;
+    final String regionName = regionInfo.getRegionNameAsString();
+
     try {
-      final String name = regionInfo.getRegionNameAsString();
       if (this.server.isStopped() || this.rsServices.isStopping()) {
         return;
       }
       final String encodedName = regionInfo.getEncodedName();
 
+      // 3 different difficult situations can occur
+      // 1) The opening was cancelled. This is an expected situation
+      // 2) The region was hijacked, we no longer have the znode
+      // 3) The region is now marked as online while we're suppose to open. This would be a bug.
+
       // Check that this region is not already online
-      HRegion region = this.rsServices.getFromOnlineRegions(encodedName);
+      if (this.rsServices.getFromOnlineRegions(encodedName) != null) {
+        LOG.error("Region " + encodedName +
+            " was already online when we started processing the opening. " +
+            "Marking this new attempt as failed");
+        tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, this.version);
+        return;
+      }
 
+      // Check that we're still supposed to open the region and transition.
       // If fails, just return.  Someone stole the region from under us.
-      // Calling transitionZookeeperOfflineToOpening initalizes this.version.
-      if (!transitionZookeeperOfflineToOpening(encodedName,
-          versionOfOfflineNode)) {
-        LOG.warn("Region was hijacked? It no longer exists, encodedName=" +
-          encodedName);
+      // Calling transitionZookeeperOfflineToOpening initializes this.version.
+      if (!isRegionStillOpening()){
+        LOG.error("Region " + encodedName + " opening cancelled");
+        tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, this.version);
+        return;
+      }
+
+      if (!transitionZookeeperOfflineToOpening(encodedName, versionOfOfflineNode)) {
+        LOG.warn("Region was hijacked? Opening cancelled for encodedName=" + encodedName);
+        // This is a desperate attempt: the znode is unlikely to be ours. But we can't do more.
+        tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, this.version);
         return;
       }
 
       // Open region.  After a successful open, failures in subsequent
       // processing needs to do a close as part of cleanup.
-      region = openRegion();
+      HRegion region = openRegion();
       if (region == null) {
-        tryTransitionToFailedOpen(regionInfo);
+        tryTransitionFromOpeningToFailedOpen(regionInfo);
         return;
       }
       boolean failed = true;
@@ -120,37 +141,63 @@ public class OpenRegionHandler extends E
       if (failed || this.server.isStopped() ||
           this.rsServices.isStopping()) {
         cleanupFailedOpen(region);
-        tryTransitionToFailedOpen(regionInfo);
+        tryTransitionFromOpeningToFailedOpen(regionInfo);
         return;
       }
 
-      if (!transitionToOpened(region)) {
+
+      if (!isRegionStillOpening() || !transitionToOpened(region)) {
         // If we fail to transition to opened, it's because of one of two cases:
         //    (a) we lost our ZK lease
         // OR (b) someone else opened the region before us
-        // In either case, we don't need to transition to FAILED_OPEN state.
-        // In case (a), the Master will process us as a dead server. In case
-        // (b) the region is already being handled elsewhere anyway.
+        // OR (c) someone cancelled the open
+        // In all cases, we try to transition to failed_open to be safe.
         cleanupFailedOpen(region);
+        tryTransitionFromOpeningToFailedOpen(regionInfo);
         return;
       }
 
-      // One more check to make sure we are opening instead of closing
-      if (!isRegionStillOpening()) {
-        LOG.warn("Open region aborted since it isn't opening any more");
-        cleanupFailedOpen(region);
-        return;
-      }
+      // We have a znode in the opened state now. We can't really delete it as the master job.
+      // Transitioning to failed open would create a race condition if the master has already
+      // acted the transition to opened.
+      // Cancelling the open is dangerous, because we would have a state where the master thinks
+      // the region is opened while the region is actually closed. It is a dangerous state
+      // to be in. For this reason, from now on, we're not going back. There is a message in the
+      // finally close to let the admin knows where we stand.
+
 
       // Successful region open, and add it to OnlineRegions
       this.rsServices.addToOnlineRegions(region);
+      openSuccessful = true;
 
       // Done!  Successful region open
-      LOG.debug("Opened " + name + " on server:" +
+      LOG.debug("Opened " + regionName + " on server:" +
         this.server.getServerName());
+
+
     } finally {
-      this.rsServices.getRegionsInTransitionInRS().
+      final Boolean current = this.rsServices.getRegionsInTransitionInRS().
           remove(this.regionInfo.getEncodedNameAsBytes());
+
+      // Let's check if we have met a race condition on open cancellation....
+      // A better solution would be to not have any race condition.
+      // this.rsServices.getRegionsInTransitionInRS().remove(
+      //  this.regionInfo.getEncodedNameAsBytes(), Boolean.TRUE);
+      // would help, but we would still have a consistency issue to manage with
+      // 1) this.rsServices.addToOnlineRegions(region);
+      // 2) the ZK state.
+      if (openSuccessful) {
+        if (current == null) {  // Should NEVER happen, but let's be paranoid.
+          LOG.error("Bad state: we've just opened a region that was NOT in transition. Region=" +
+              regionName
+          );
+        } else if (Boolean.FALSE.equals(current)) { // Can happen, if we're really unlucky.
+          LOG.error("Race condition: we've finished to open a region, while a close was requested "
+              + " on region=" + regionName + ". It can be a critical error, as a region that" +
+              " should be closed is now opened."
+          );
+        }
+      }
     }
   }
 
@@ -226,7 +273,8 @@ public class OpenRegionHandler extends E
   /**
    * Thread to run region post open tasks. Call {@link #getException()} after
    * the thread finishes to check for exceptions running
-   * {@link RegionServerServices#postOpenDeployTasks(HRegion, org.apache.hadoop.hbase.catalog.CatalogTracker, boolean)}
+   * {@link RegionServerServices#postOpenDeployTasks(
+   * HRegion, org.apache.hadoop.hbase.catalog.CatalogTracker, boolean)}
    * .
    */
   static class PostOpenDeployTasksThread extends Thread {
@@ -277,10 +325,6 @@ public class OpenRegionHandler extends E
    * @throws IOException
    */
   private boolean transitionToOpened(final HRegion r) throws IOException {
-    if (!isRegionStillOpening()) {
-      LOG.warn("Open region aborted since it isn't opening any more");
-      return false;
-    }
     boolean result = false;
     HRegionInfo hri = r.getRegionInfo();
     final String name = hri.getRegionNameAsString();
@@ -310,11 +354,12 @@ public class OpenRegionHandler extends E
    * @param hri Region we're working on.
    * @return whether znode is successfully transitioned to FAILED_OPEN state.
    */
-  private boolean tryTransitionToFailedOpen(final HRegionInfo hri) {
+  private boolean tryTransitionFromOpeningToFailedOpen(final HRegionInfo hri) {
     boolean result = false;
     final String name = hri.getRegionNameAsString();
     try {
-      LOG.info("Opening of region " + hri + " failed, marking as FAILED_OPEN in ZK");
+      LOG.info("Opening of region " + hri + " failed, transitioning" +
+          " from OPENING to FAILED_OPEN in ZK, expecting version " + this.version);
       if (ZKAssign.transitionNode(
           this.server.getZooKeeper(), hri,
           this.server.getServerName(),
@@ -335,6 +380,43 @@ public class OpenRegionHandler extends E
   }
 
   /**
+   * Try to transition to open. This function is static to make it usable before creating the
+   *  handler.
+   *
+   * This is not guaranteed to succeed, we just do our best.
+   *
+   * @param rsServices
+   * @param hri Region we're working on.
+   * @param versionOfOfflineNode version to checked.
+   * @return whether znode is successfully transitioned to FAILED_OPEN state.
+   */
+  public static boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices,
+       final HRegionInfo hri, final int versionOfOfflineNode) {
+    boolean result = false;
+    final String name = hri.getRegionNameAsString();
+    try {
+      LOG.info("Opening of region " + hri + " failed, transitioning" +
+          " from OFFLINE to FAILED_OPEN in ZK, expecting version " + versionOfOfflineNode);
+      if (ZKAssign.transitionNode(
+          rsServices.getZooKeeper(), hri,
+          rsServices.getServerName(),
+          EventType.M_ZK_REGION_OFFLINE,
+          EventType.RS_ZK_REGION_FAILED_OPEN,
+          versionOfOfflineNode) == -1) {
+        LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
+            "It's likely that the master already timed out this open " +
+            "attempt, and thus another RS already has the region.");
+      } else {
+        result = true;
+      }
+    } catch (KeeperException e) {
+      LOG.error("Failed transitioning node " + name + " from OFFLINE to FAILED_OPEN", e);
+    }
+    return result;
+  }
+
+
+  /**
    * @return Instance of HRegion if successful open else null.
    */
   HRegion openRegion() {
@@ -343,7 +425,8 @@ public class OpenRegionHandler extends E
       // Instantiate the region.  This also periodically tickles our zk OPENING
       // state so master doesn't timeout this region in transition.
       region = HRegion.openHRegion(this.regionInfo, this.htd,
-          this.rsServices.getWAL(), this.server.getConfiguration(),
+          this.rsServices.getWAL(this.regionInfo), 
+          this.server.getConfiguration(),
           this.rsServices,
         new CancelableProgressable() {
           public boolean progress() {
@@ -379,7 +462,7 @@ public class OpenRegionHandler extends E
   private boolean isRegionStillOpening() {
     byte[] encodedName = regionInfo.getEncodedNameAsBytes();
     Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName);
-    return action != null && action.booleanValue();
+    return Boolean.TRUE.equals(action); // true means opening for RIT
   }
 
   /**
@@ -392,10 +475,6 @@ public class OpenRegionHandler extends E
    */
   boolean transitionZookeeperOfflineToOpening(final String encodedName,
       int versionOfOfflineNode) {
-    if (!isRegionStillOpening()) {
-      LOG.warn("Open region aborted since it isn't opening any more");
-      return false;
-    }
     // TODO: should also handle transition from CLOSED?
     try {
       // Initialize the znode version.
@@ -405,6 +484,8 @@ public class OpenRegionHandler extends E
     } catch (KeeperException e) {
       LOG.error("Error transition from OFFLINE to OPENING for region=" +
         encodedName, e);
+      this.version = -1;
+      return false;
     }
     boolean b = isGoodVersion();
     if (!b) {
@@ -436,6 +517,7 @@ public class OpenRegionHandler extends E
       server.abort("Exception refreshing OPENING; region=" + encodedName +
         ", context=" + context, e);
       this.version = -1;
+      return false;
     }
     boolean b = isGoodVersion();
     if (!b) {

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java Thu Feb 14 12:58:12 2013
@@ -144,7 +144,12 @@ public class Compressor {
       // the status byte also acts as the higher order byte of the dictionary
       // entry
       short dictIdx = toShort(status, in.readByte());
-      byte[] entry = dict.getEntry(dictIdx);
+      byte[] entry;
+      try {
+        entry = dict.getEntry(dictIdx);
+      } catch (Exception ex) {
+        throw new IOException("Unable to uncompress the log entry", ex);
+      }
       if (entry == null) {
         throw new IOException("Missing dictionary entry for index "
             + dictIdx);

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Thu Feb 14 12:58:12 2013
@@ -24,8 +24,10 @@ import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URLEncoder;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -57,6 +59,7 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.DrainBarrier;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HasThread;
@@ -132,28 +135,52 @@ class FSHLog implements HLog, Syncable {
   private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
   final static Object [] NO_ARGS = new Object []{};
 
-  /*
+  /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */
+  private DrainBarrier closeBarrier = new DrainBarrier();
+
+  /**
    * Current log file.
    */
   Writer writer;
 
-  /*
+  /**
    * Map of all log files but the current one.
    */
   final SortedMap<Long, Path> outputfiles =
     Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
 
-  /*
-   * Map of encoded region names to their most recent sequence/edit id in their
-   * memstore.
+
+  /**
+   * This lock synchronizes all operations on oldestUnflushedSeqNums and oldestFlushingSeqNums,
+   * with the exception of append's putIfAbsent into oldestUnflushedSeqNums.
+   * We only use these to find out the low bound seqNum, or to find regions with old seqNums to
+   * force flush them, so we don't care about these numbers messing with anything. */
+  private final Object oldestSeqNumsLock = new Object();
+
+  /**
+   * This lock makes sure only one log roll runs at the same time. Should not be taken while
+   * any other lock is held. We don't just use synchronized because that results in bogus and
+   * tedious findbugs warning when it thinks synchronized controls writer thread safety */
+  private final Object rollWriterLock = new Object();
+
+  /**
+   * Map of encoded region names to their most recent sequence/edit id in their memstore.
    */
-  private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
+  private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedSeqNums =
     new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+  /**
+   * Map of encoded region names to their most recent sequence/edit id in their memstore;
+   * contains the regions that are currently flushing. That way we can store two numbers for
+   * flushing and non-flushing (oldestUnflushedSeqNums) memstore for the same region.
+   */
+  private final Map<byte[], Long> oldestFlushingSeqNums = new HashMap<byte[], Long>();
 
   private volatile boolean closed = false;
 
   private final AtomicLong logSeqNum = new AtomicLong(0);
 
+  private boolean forMeta = false;
+
   // The timestamp (in ms) when the log file was created.
   private volatile long filenum = -1;
 
@@ -176,10 +203,6 @@ class FSHLog implements HLog, Syncable {
   // of the default Hdfs block size.
   private final long logrollsize;
 
-  // This lock prevents starting a log roll during a cache flush.
-  // synchronized is insufficient because a cache flush spans two method calls.
-  private final Lock cacheFlushLock = new ReentrantLock();
-
   // We synchronize on updateLock to prevent updates and to prevent a log roll
   // during an update
   // locked during appends
@@ -211,15 +234,15 @@ class FSHLog implements HLog, Syncable {
    *
    * @param fs filesystem handle
    * @param root path for stored and archived hlogs
-   * @param logName dir where hlogs are stored
+   * @param logDir dir where hlogs are stored
    * @param conf configuration to use
    * @throws IOException
    */
-  public FSHLog(final FileSystem fs, final Path root, final String logName,
+  public FSHLog(final FileSystem fs, final Path root, final String logDir,
                 final Configuration conf)
   throws IOException {
-    this(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, 
-        conf, null, true, null);
+    this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, 
+        conf, null, true, null, false);
   }
   
   /**
@@ -227,16 +250,16 @@ class FSHLog implements HLog, Syncable {
    *
    * @param fs filesystem handle
    * @param root path for stored and archived hlogs
-   * @param logName dir where hlogs are stored
-   * @param oldLogName dir where hlogs are archived
+   * @param logDir dir where hlogs are stored
+   * @param oldLogDir dir where hlogs are archived
    * @param conf configuration to use
    * @throws IOException
    */
-  public FSHLog(final FileSystem fs, final Path root, final String logName,
-                final String oldLogName, final Configuration conf)
+  public FSHLog(final FileSystem fs, final Path root, final String logDir,
+                final String oldLogDir, final Configuration conf)
   throws IOException {
-    this(fs, root, logName, oldLogName, 
-        conf, null, true, null);
+    this(fs, root, logDir, oldLogDir, 
+        conf, null, true, null, false);
   }
 
   /**
@@ -248,7 +271,7 @@ class FSHLog implements HLog, Syncable {
    *
    * @param fs filesystem handle
    * @param root path for stored and archived hlogs
-   * @param logName dir where hlogs are stored
+   * @param logDir dir where hlogs are stored
    * @param conf configuration to use
    * @param listeners Listeners on WAL events. Listeners passed here will
    * be registered before we do anything else; e.g. the
@@ -258,11 +281,11 @@ class FSHLog implements HLog, Syncable {
    *        If prefix is null, "hlog" will be used
    * @throws IOException
    */
-  public FSHLog(final FileSystem fs, final Path root, final String logName,
+  public FSHLog(final FileSystem fs, final Path root, final String logDir,
       final Configuration conf, final List<WALActionsListener> listeners,
       final String prefix) throws IOException {
-    this(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, 
-        conf, listeners, true, prefix);
+    this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, 
+        conf, listeners, true, prefix, false);
   }
 
   /**
@@ -274,7 +297,8 @@ class FSHLog implements HLog, Syncable {
    *
    * @param fs filesystem handle
    * @param root path to where logs and oldlogs
-   * @param oldLogName path to where hlogs are archived
+   * @param logDir dir where hlogs are stored
+   * @param oldLogDir dir where hlogs are archived
    * @param conf configuration to use
    * @param listeners Listeners on WAL events. Listeners passed here will
    * be registered before we do anything else; e.g. the
@@ -283,18 +307,20 @@ class FSHLog implements HLog, Syncable {
    * @param prefix should always be hostname and port in distributed env and
    *        it will be URL encoded before being used.
    *        If prefix is null, "hlog" will be used
+   * @param forMeta if this hlog is meant for meta updates
    * @throws IOException
    */
-  private FSHLog(final FileSystem fs, final Path root, final String logName,
-      final String oldLogName, final Configuration conf, 
+  public FSHLog(final FileSystem fs, final Path root, final String logDir,
+      final String oldLogDir, final Configuration conf, 
       final List<WALActionsListener> listeners,
-      final boolean failIfLogDirExists, final String prefix)
+      final boolean failIfLogDirExists, final String prefix, boolean forMeta)
   throws IOException {
     super();
     this.fs = fs;
     this.rootDir = root;
-    this.dir = new Path(this.rootDir, logName);
-    this.oldLogDir = new Path(this.rootDir, oldLogName);
+    this.dir = new Path(this.rootDir, logDir);
+    this.oldLogDir = new Path(this.rootDir, oldLogDir);
+    this.forMeta = forMeta;
     this.conf = conf;
    
     if (listeners != null) {
@@ -333,15 +359,16 @@ class FSHLog implements HLog, Syncable {
     // If prefix is null||empty then just name it hlog
     this.prefix = prefix == null || prefix.isEmpty() ?
         "hlog" : URLEncoder.encode(prefix, "UTF8");
-   
-    if (failIfLogDirExists && this.fs.exists(dir)) {
+
+    boolean dirExists = false;
+    if (failIfLogDirExists && (dirExists = this.fs.exists(dir))) {
       throw new IOException("Target HLog directory already exists: " + dir);
     }
-    if (!fs.mkdirs(dir)) {
+    if (!dirExists && !fs.mkdirs(dir)) {
       throw new IOException("Unable to mkdir " + dir);
     }
 
-    if (!fs.exists(oldLogDir)) {
+    if (!fs.exists(this.oldLogDir)) {
       if (!fs.mkdirs(this.oldLogDir)) {
         throw new IOException("Unable to mkdir " + this.oldLogDir);
       }
@@ -437,7 +464,7 @@ class FSHLog implements HLog, Syncable {
         !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
       // This could spin on occasion but better the occasional spin than locking
       // every increment of sequence number.
-      LOG.debug("Changed sequenceid from " + logSeqNum + " to " + newvalue);
+      LOG.debug("Changed sequenceid from " + id + " to " + newvalue);
     }
   }
 
@@ -466,87 +493,77 @@ class FSHLog implements HLog, Syncable {
   @Override
   public byte [][] rollWriter(boolean force)
       throws FailedLogCloseException, IOException {
-    // Return if nothing to flush.
-    if (!force && this.writer != null && this.numEntries.get() <= 0) {
-      return null;
-    }
-    byte [][] regionsToFlush = null;
-    this.cacheFlushLock.lock();
-    try {
-      this.logRollRunning = true;
-      if (closed) {
-        LOG.debug("HLog closed.  Skipping rolling of writer");
-        return regionsToFlush;
-      }
-      // Do all the preparation outside of the updateLock to block
-      // as less as possible the incoming writes
-      long currentFilenum = this.filenum;
-      Path oldPath = null;
-      if (currentFilenum > 0) {
-        oldPath = computeFilename(currentFilenum);
+    synchronized (rollWriterLock) {
+      // Return if nothing to flush.
+      if (!force && this.writer != null && this.numEntries.get() <= 0) {
+        return null;
       }
-      this.filenum = System.currentTimeMillis();
-      Path newPath = computeFilename();
-
-      // Tell our listeners that a new log is about to be created
-      if (!this.listeners.isEmpty()) {
-        for (WALActionsListener i : this.listeners) {
-          i.preLogRoll(oldPath, newPath);
+      byte [][] regionsToFlush = null;
+      try {
+        this.logRollRunning = true;
+        boolean isClosed = closed;
+        if (isClosed || !closeBarrier.beginOp()) {
+          LOG.debug("HLog " + (isClosed ? "closed" : "closing") + ". Skipping rolling of writer");
+          return regionsToFlush;
+        }
+        // Do all the preparation outside of the updateLock to block
+        // as less as possible the incoming writes
+        long currentFilenum = this.filenum;
+        Path oldPath = null;
+        if (currentFilenum > 0) {
+          //computeFilename  will take care of meta hlog filename
+          oldPath = computeFilename(currentFilenum);
+        }
+        this.filenum = System.currentTimeMillis();
+        Path newPath = computeFilename();
+
+        // Tell our listeners that a new log is about to be created
+        if (!this.listeners.isEmpty()) {
+          for (WALActionsListener i : this.listeners) {
+            i.preLogRoll(oldPath, newPath);
+          }
         }
-      }
-      FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
-      // Can we get at the dfsclient outputstream?  If an instance of
-      // SFLW, it'll have done the necessary reflection to get at the
-      // protected field name.
-      FSDataOutputStream nextHdfsOut = null;
-      if (nextWriter instanceof SequenceFileLogWriter) {
-        nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
-      }
-
-      synchronized (updateLock) {
-        // Clean up current writer.
-        Path oldFile = cleanupCurrentWriter(currentFilenum);
-        this.writer = nextWriter;
-        this.hdfs_out = nextHdfsOut;
-
-        LOG.info((oldFile != null?
-            "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
-            this.numEntries.get() +
-            ", filesize=" +
-            this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
-          " for " + FSUtils.getPath(newPath));
-        this.numEntries.set(0);
-      }
-      // Tell our listeners that a new log was created
-      if (!this.listeners.isEmpty()) {
-        for (WALActionsListener i : this.listeners) {
-          i.postLogRoll(oldPath, newPath);
+        FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
+        // Can we get at the dfsclient outputstream?  If an instance of
+        // SFLW, it'll have done the necessary reflection to get at the
+        // protected field name.
+        FSDataOutputStream nextHdfsOut = null;
+        if (nextWriter instanceof SequenceFileLogWriter) {
+          nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
+        }
+
+        Path oldFile = null;
+        int oldNumEntries = 0;
+        synchronized (updateLock) {
+          // Clean up current writer.
+          oldNumEntries = this.numEntries.get();
+          oldFile = cleanupCurrentWriter(currentFilenum);
+          this.writer = nextWriter;
+          this.hdfs_out = nextHdfsOut;
+          this.numEntries.set(0);
+        }
+        LOG.info("Rolled log" + (oldFile != null ? " for file=" + FSUtils.getPath(oldFile)
+          + ", entries=" + oldNumEntries + ", filesize=" + this.fs.getFileStatus(oldFile).getLen()
+          : "" ) + "; new path=" + FSUtils.getPath(newPath));
+
+        // Tell our listeners that a new log was created
+        if (!this.listeners.isEmpty()) {
+          for (WALActionsListener i : this.listeners) {
+            i.postLogRoll(oldPath, newPath);
+          }
         }
-      }
 
-      // Can we delete any of the old log files?
-      if (this.outputfiles.size() > 0) {
-        if (this.lastSeqWritten.isEmpty()) {
-          LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
-          // If so, then no new writes have come in since all regions were
-          // flushed (and removed from the lastSeqWritten map). Means can
-          // remove all but currently open log file.
-          for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
-            archiveLogFile(e.getValue(), e.getKey());
-          }
-          this.outputfiles.clear();
-        } else {
-          regionsToFlush = cleanOldLogs();
+        // Can we delete any of the old log files?
+        if (getNumLogFiles() > 0) {
+          cleanOldLogs();
+          regionsToFlush = getRegionsToForceFlush();
         }
-      }
-    } finally {
-      try {
-        this.logRollRunning = false;
       } finally {
-        this.cacheFlushLock.unlock();
+        this.logRollRunning = false;
+        closeBarrier.endOp();
       }
+      return regionsToFlush;
     }
-    return regionsToFlush;
   }
 
   /**
@@ -561,6 +578,9 @@ class FSHLog implements HLog, Syncable {
    */
   protected Writer createWriterInstance(final FileSystem fs, final Path path,
       final Configuration conf) throws IOException {
+    if (forMeta) {
+      //TODO: set a higher replication for the hlog files (HBASE-6773)
+    }
     return HLogFactory.createWriter(fs, path, conf);
   }
 
@@ -571,36 +591,64 @@ class FSHLog implements HLog, Syncable {
    * encoded region names to flush.
    * @throws IOException
    */
-  private byte [][] cleanOldLogs() throws IOException {
-    Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
+  private void cleanOldLogs() throws IOException {
+    long oldestOutstandingSeqNum = Long.MAX_VALUE;
+    synchronized (oldestSeqNumsLock) {
+      Long oldestFlushing = (oldestFlushingSeqNums.size() > 0)
+        ? Collections.min(oldestFlushingSeqNums.values()) : Long.MAX_VALUE;
+      Long oldestUnflushed = (oldestUnflushedSeqNums.size() > 0)
+        ? Collections.min(oldestUnflushedSeqNums.values()) : Long.MAX_VALUE;
+      oldestOutstandingSeqNum = Math.min(oldestFlushing, oldestUnflushed);
+    }
+
     // Get the set of all log files whose last sequence number is smaller than
     // the oldest edit's sequence number.
     TreeSet<Long> sequenceNumbers = new TreeSet<Long>(this.outputfiles.headMap(
         oldestOutstandingSeqNum).keySet());
     // Now remove old log files (if any)
-    int logsToRemove = sequenceNumbers.size();
-    if (logsToRemove > 0) {
-      if (LOG.isDebugEnabled()) {
-        // Find associated region; helps debugging.
-        byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
-        LOG.debug("Found " + logsToRemove + " hlogs to remove" +
+    if (LOG.isDebugEnabled()) {
+      if (sequenceNumbers.size() > 0) {
+        LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove" +
           " out of total " + this.outputfiles.size() + ";" +
-          " oldest outstanding sequenceid is " + oldestOutstandingSeqNum +
-          " from region " + Bytes.toStringBinary(oldestRegion));
+          " oldest outstanding sequenceid is " + oldestOutstandingSeqNum);
       }
-      for (Long seq : sequenceNumbers) {
-        archiveLogFile(this.outputfiles.remove(seq), seq);
+    }
+    for (Long seq : sequenceNumbers) {
+      archiveLogFile(this.outputfiles.remove(seq), seq);
+    }
+  }
+
+  /**
+   * Return regions that have edits that are equal or less than a certain sequence number.
+   * Static due to some old unit test.
+   * @param walSeqNum The sequence number to compare with.
+   * @param regionsToSeqNums Encoded region names to sequence ids
+   * @return All regions whose seqNum <= walSeqNum. Null if no regions found.
+   */
+  static byte[][] findMemstoresWithEditsEqualOrOlderThan(
+      final long walSeqNum, final Map<byte[], Long> regionsToSeqNums) {
+    List<byte[]> regions = null;
+    for (Map.Entry<byte[], Long> e : regionsToSeqNums.entrySet()) {
+      if (e.getValue().longValue() <= walSeqNum) {
+        if (regions == null) regions = new ArrayList<byte[]>();
+        regions.add(e.getKey());
       }
     }
+    return regions == null ? null : regions
+        .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
+  }
 
+  private byte[][] getRegionsToForceFlush() throws IOException {
     // If too many log files, figure which regions we need to flush.
     // Array is an array of encoded region names.
     byte [][] regions = null;
-    int logCount = this.outputfiles.size();
+    int logCount = getNumLogFiles();
     if (logCount > this.maxLogs && logCount > 0) {
       // This is an array of encoded region names.
-      regions = HLogUtil.findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
-        this.lastSeqWritten);
+      synchronized (oldestSeqNumsLock) {
+        regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
+          this.oldestUnflushedSeqNums);
+      }
       if (regions != null) {
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < regions.length; i++) {
@@ -616,29 +664,6 @@ class FSHLog implements HLog, Syncable {
   }
 
   /*
-   * @return Logs older than this id are safe to remove.
-   */
-  private Long getOldestOutstandingSeqNum() {
-    return Collections.min(this.lastSeqWritten.values());
-  }
-
-  /**
-   * @param oldestOutstandingSeqNum
-   * @return (Encoded) name of oldest outstanding region.
-   */
-  private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
-    byte [] oldestRegion = null;
-    for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
-      if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
-        // Key is encoded region name.
-        oldestRegion = e.getKey();
-        break;
-      }
-    }
-    return oldestRegion;
-  }
-
-  /*
    * Cleans up current writer closing and adding to outputfiles.
    * Presumes we're operating inside an updateLock scope.
    * @return Path to current writer or null if none.
@@ -729,7 +754,11 @@ class FSHLog implements HLog, Syncable {
     if (filenum < 0) {
       throw new RuntimeException("hlog file number can't be < 0");
     }
-    return new Path(dir, prefix + "." + filenum);
+    String child = prefix + "." + filenum;
+    if (forMeta) {
+      child += HLog.META_HLOG_FILE_EXTN;
+    }
+    return new Path(dir, child);
   }
 
   @Override
@@ -766,33 +795,39 @@ class FSHLog implements HLog, Syncable {
 
   @Override
   public void close() throws IOException {
+    if (this.closed) {
+      return;
+    }
     try {
       logSyncerThread.close();
       // Make sure we synced everything
       logSyncerThread.join(this.optionalFlushInterval*2);
     } catch (InterruptedException e) {
       LOG.error("Exception while waiting for syncer thread to die", e);
+      Thread.currentThread().interrupt();
     }
-
-    cacheFlushLock.lock();
     try {
-      // Tell our listeners that the log is closing
-      if (!this.listeners.isEmpty()) {
-        for (WALActionsListener i : this.listeners) {
-          i.logCloseRequested();
-        }
+      // Prevent all further flushing and rolling.
+      closeBarrier.stopAndDrainOps();
+    } catch (InterruptedException e) {
+      LOG.error("Exception while waiting for cache flushes and log rolls", e);
+      Thread.currentThread().interrupt();
+    }
+
+    // Tell our listeners that the log is closing
+    if (!this.listeners.isEmpty()) {
+      for (WALActionsListener i : this.listeners) {
+        i.logCloseRequested();
       }
-      synchronized (updateLock) {
-        this.closed = true;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("closing hlog writer in " + this.dir.toString());
-        }
-        if (this.writer != null) {
-          this.writer.close();
-        }
+    }
+    synchronized (updateLock) {
+      this.closed = true;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("closing hlog writer in " + this.dir.toString());
+      }
+      if (this.writer != null) {
+        this.writer.close();
       }
-    } finally {
-      cacheFlushLock.unlock();
     }
   }
 
@@ -824,7 +859,7 @@ class FSHLog implements HLog, Syncable {
       // memstore). When the cache is flushed, the entry for the
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
-      this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
+      this.oldestUnflushedSeqNums.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
         Long.valueOf(seqNum));
       doWrite(regionInfo, logKey, logEdit, htd);
       txid = this.unflushedEntries.incrementAndGet();
@@ -896,7 +931,7 @@ class FSHLog implements HLog, Syncable {
         // Use encoded name.  Its shorter, guaranteed unique and a subset of
         // actual  name.
         byte [] encodedRegionName = info.getEncodedNameAsBytes();
-        this.lastSeqWritten.putIfAbsent(encodedRegionName, seqNum);
+        this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
         HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
         doWrite(info, logKey, edits, htd);
         this.numEntries.incrementAndGet();
@@ -1028,7 +1063,11 @@ class FSHLog implements HLog, Syncable {
     Writer tempWriter;
     synchronized (this.updateLock) {
       if (this.closed) return;
-      tempWriter = this.writer; // guaranteed non-null
+      // Guaranteed non-null.
+      // Note that parallel sync can close tempWriter.
+      // The current method of dealing with this is to catch exceptions.
+      // See HBASE-4387, HBASE-5623, HBASE-7329.
+      tempWriter = this.writer;
     }
     // if the transaction that we are interested in is already 
     // synced, then return immediately.
@@ -1064,9 +1103,11 @@ class FSHLog implements HLog, Syncable {
       }
       try {
         tempWriter.sync();
-      } catch(IOException io) {
+      } catch(IOException ex) {
         synchronized (this.updateLock) {
           // HBASE-4387, HBASE-5623, retry with updateLock held
+          // TODO: we don't actually need to do it for concurrent close - what is the point
+          //       of syncing new unrelated writer? Keep behavior for now.
           tempWriter = this.writer;
           tempWriter.sync();
         }
@@ -1074,6 +1115,9 @@ class FSHLog implements HLog, Syncable {
       this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
 
       this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
+      // TODO: preserving the old behavior for now, but this check is strange. It's not
+      //       protected by any locks here, so for all we know rolling locks might start
+      //       as soon as we enter the "if". Is this best-effort optimization check?
       if (!this.logRollRunning) {
         checkLowReplication();
         try {
@@ -1236,107 +1280,61 @@ class FSHLog implements HLog, Syncable {
     return outputfiles.size();
   }
 
-  private byte[] getSnapshotName(byte[] encodedRegionName) {
-    byte snp[] = new byte[encodedRegionName.length + 3];
-    // an encoded region name has only hex digits. s, n or p are not hex
-    // and therefore snapshot-names will never collide with
-    // encoded-region-names
-    snp[0] = 's'; snp[1] = 'n'; snp[2] = 'p';
-    for (int i = 0; i < encodedRegionName.length; i++) {
-      snp[i+3] = encodedRegionName[i];
-    }
-    return snp;
-  }
-
   @Override
-  public long startCacheFlush(final byte[] encodedRegionName) {
-    this.cacheFlushLock.lock();
-    Long seq = this.lastSeqWritten.remove(encodedRegionName);
-    // seq is the lsn of the oldest edit associated with this region. If a
-    // snapshot already exists - because the last flush failed - then seq will
-    // be the lsn of the oldest edit in the snapshot
-    if (seq != null) {
-      // keeping the earliest sequence number of the snapshot in
-      // lastSeqWritten maintains the correctness of
-      // getOldestOutstandingSeqNum(). But it doesn't matter really because
-      // everything is being done inside of cacheFlush lock.
-      Long oldseq =
-        lastSeqWritten.put(getSnapshotName(encodedRegionName), seq);
-      if (oldseq != null) {
-        LOG.error("Logic Error Snapshot seq id from earlier flush still" +
-            " present! for region " + Bytes.toString(encodedRegionName) +
-            " overwritten oldseq=" + oldseq + "with new seq=" + seq);
-        Runtime.getRuntime().halt(1);
-      }
+  public Long startCacheFlush(final byte[] encodedRegionName) {
+    Long oldRegionSeqNum = null;
+    if (!closeBarrier.beginOp()) {
+      return null;
+    }
+    synchronized (oldestSeqNumsLock) {
+      oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName);
+      if (oldRegionSeqNum != null) {
+        Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum);
+        assert oldValue == null : "Flushing map not cleaned up for "
+          + Bytes.toString(encodedRegionName);
+      }
+    }
+    if (oldRegionSeqNum == null) {
+      // TODO: if we have no oldRegionSeqNum, and WAL is not disabled, presumably either
+      //       the region is already flushing (which would make this call invalid), or there
+      //       were no appends after last flush, so why are we starting flush? Maybe we should
+      //       assert not null, and switch to "long" everywhere. Less rigorous, but safer,
+      //       alternative is telling the caller to stop. For now preserve old logic.
+      LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
+        + Bytes.toString(encodedRegionName) + "]");
     }
     return obtainSeqNum();
   }
 
   @Override
-  public void completeCacheFlush(final byte [] encodedRegionName,
-      final byte [] tableName, final long logSeqId, final boolean isMetaRegion)
-  throws IOException {
-    try {
-      if (this.closed) {
-        return;
-      }
-      long txid = 0;
-      synchronized (updateLock) {
-        long now = EnvironmentEdgeManager.currentTimeMillis();
-        WALEdit edit = completeCacheFlushLogEdit();
-        HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
-            System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
-        logSyncerThread.append(new Entry(key, edit));
-        txid = this.unflushedEntries.incrementAndGet();
-        long took = EnvironmentEdgeManager.currentTimeMillis() - now;
-        long len = 0;
-        for (KeyValue kv : edit.getKeyValues()) {
-          len += kv.getLength();
-        }
-        this.metrics.finishAppend(took, len);
-        this.numEntries.incrementAndGet();
-      }
-      // sync txn to file system
-      this.sync(txid);
-
-    } finally {
-      // updateLock not needed for removing snapshot's entry
-      // Cleaning up of lastSeqWritten is in the finally clause because we
-      // don't want to confuse getOldestOutstandingSeqNum()
-      this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
-      this.cacheFlushLock.unlock();
+  public void completeCacheFlush(final byte [] encodedRegionName)
+  {
+    synchronized (oldestSeqNumsLock) {
+      this.oldestFlushingSeqNums.remove(encodedRegionName);
     }
-  }
-
-  private WALEdit completeCacheFlushLogEdit() {
-    KeyValue kv = new KeyValue(HLog.METAROW, HLog.METAFAMILY, null,
-      System.currentTimeMillis(), HLogUtil.COMPLETE_CACHE_FLUSH);
-    WALEdit e = new WALEdit();
-    e.add(kv);
-    return e;
+    closeBarrier.endOp();
   }
 
   @Override
   public void abortCacheFlush(byte[] encodedRegionName) {
-    Long snapshot_seq =
-      this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
-    if (snapshot_seq != null) {
-      // updateLock not necessary because we are racing against
-      // lastSeqWritten.putIfAbsent() in append() and we will always win
-      // before releasing cacheFlushLock make sure that the region's entry in
-      // lastSeqWritten points to the earliest edit in the region
-      Long current_memstore_earliest_seq =
-        this.lastSeqWritten.put(encodedRegionName, snapshot_seq);
-      if (current_memstore_earliest_seq != null &&
-          (current_memstore_earliest_seq.longValue() <=
-            snapshot_seq.longValue())) {
-        LOG.error("Logic Error region " + Bytes.toString(encodedRegionName) +
-            "acquired edits out of order current memstore seq=" +
-            current_memstore_earliest_seq + " snapshot seq=" + snapshot_seq);
-        Runtime.getRuntime().halt(1);
-      }
+    Long currentSeqNum = null, seqNumBeforeFlushStarts = null;
+    synchronized (oldestSeqNumsLock) {
+      seqNumBeforeFlushStarts = this.oldestFlushingSeqNums.remove(encodedRegionName);
+      if (seqNumBeforeFlushStarts != null) {
+        currentSeqNum =
+          this.oldestUnflushedSeqNums.put(encodedRegionName, seqNumBeforeFlushStarts);
+      }
+    }
+    closeBarrier.endOp();
+    if ((currentSeqNum != null)
+        && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) {
+      String errorStr = "Region " + Bytes.toString(encodedRegionName) +
+          "acquired edits out of order current memstore seq=" + currentSeqNum
+          + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts;
+      LOG.error(errorStr);
+      assert false : errorStr;
+      Runtime.getRuntime().halt(1);
     }
-    this.cacheFlushLock.unlock();
   }
 
   @Override
@@ -1401,6 +1399,12 @@ class FSHLog implements HLog, Syncable {
     return lastDeferredTxid > syncedTillHere;
   }
 
+  @Override
+  public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
+    Long result = oldestUnflushedSeqNums.get(encodedRegionName);
+    return result == null ? HConstants.NO_SEQNUM : result.longValue();
+  }
+
   /**
    * Pass one or more log file names and it will either dump out a text version
    * on <code>stdout</code> or split the specified log files.

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Thu Feb 14 12:58:12 2013
@@ -50,6 +50,8 @@ public interface HLog {
   /** File Extension used while splitting an HLog into regions (HBASE-2312) */
   public static final String SPLITTING_EXT = "-splitting";
   public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
+  /** The META region's HLog filename extension */
+  public static final String META_HLOG_FILE_EXTN = ".meta";
 
   /*
    * Name of directory that holds recovered edits written by the wal log
@@ -71,6 +73,7 @@ public interface HLog {
     void seek(long pos) throws IOException;
 
     long getPosition() throws IOException;
+    void reset() throws IOException;
   }
 
   public interface Writer {
@@ -159,14 +162,14 @@ public interface HLog {
     }
   }
 
-  /*
+  /**
    * registers WALActionsListener
    * 
    * @param listener
    */
   public void registerWALActionsListener(final WALActionsListener listener);
 
-  /*
+  /**
    * unregisters WALActionsListener
    * 
    * @param listener
@@ -197,18 +200,10 @@ public interface HLog {
   /**
    * Roll the log writer. That is, start writing log messages to a new file.
    * 
-   * Because a log cannot be rolled during a cache flush, and a cache flush
-   * spans two method calls, a special lock needs to be obtained so that a cache
-   * flush cannot start when the log is being rolled and the log cannot be
-   * rolled during a cache flush.
-   * 
    * <p>
-   * Note that this method cannot be synchronized because it is possible that
-   * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
-   * start which would obtain the lock on this but block on obtaining the
-   * cacheFlushLock and then completeCacheFlush could be called which would wait
-   * for the lock on this and consequently never release the cacheFlushLock
-   * 
+   * The implementation is synchronized in order to make sure there's one rollWriter
+   * running at any given time.
+   *
    * @return If lots of logs, flush the returned regions so next time through we
    *         can clean logs. Returns null if nothing to flush. Names are actual
    *         region names as returned by {@link HRegionInfo#getEncodedName()}
@@ -220,17 +215,9 @@ public interface HLog {
   /**
    * Roll the log writer. That is, start writing log messages to a new file.
    * 
-   * Because a log cannot be rolled during a cache flush, and a cache flush
-   * spans two method calls, a special lock needs to be obtained so that a cache
-   * flush cannot start when the log is being rolled and the log cannot be
-   * rolled during a cache flush.
-   * 
    * <p>
-   * Note that this method cannot be synchronized because it is possible that
-   * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
-   * start which would obtain the lock on this but block on obtaining the
-   * cacheFlushLock and then completeCacheFlush could be called which would wait
-   * for the lock on this and consequently never release the cacheFlushLock
+   * The implementation is synchronized in order to make sure there's one rollWriter
+   * running at any given time.
    * 
    * @param force
    *          If true, force creation of a new writer even if no entries have
@@ -334,53 +321,33 @@ public interface HLog {
   public long obtainSeqNum();
 
   /**
-   * By acquiring a log sequence ID, we can allow log messages to continue while
-   * we flush the cache.
-   * 
-   * Acquire a lock so that we do not roll the log between the start and
-   * completion of a cache-flush. Otherwise the log-seq-id for the flush will
-   * not appear in the correct logfile.
-   * 
-   * Ensuring that flushes and log-rolls don't happen concurrently also allows
-   * us to temporarily put a log-seq-number in lastSeqWritten against the region
-   * being flushed that might not be the earliest in-memory log-seq-number for
-   * that region. By the time the flush is completed or aborted and before the
-   * cacheFlushLock is released it is ensured that lastSeqWritten again has the
-   * oldest in-memory edit's lsn for the region that was being flushed.
-   * 
-   * In this method, by removing the entry in lastSeqWritten for the region
-   * being flushed we ensure that the next edit inserted in this region will be
-   * correctly recorded in
-   * {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)} The
-   * lsn of the earliest in-memory lsn - which is now in the memstore snapshot -
-   * is saved temporarily in the lastSeqWritten map while the flush is active.
-   * 
-   * @return sequence ID to pass
-   *         {@link #completeCacheFlush(byte[], byte[], long, boolean)} (byte[],
-   *         byte[], long)}
-   * @see #completeCacheFlush(byte[], byte[], long, boolean)
-   * @see #abortCacheFlush(byte[])
+   * WAL keeps track of the sequence numbers that were not yet flushed from memstores
+   * in order to be able to do cleanup. This method tells WAL that some region is about
+   * to flush memstore.
+   *
+   * We stash the oldest seqNum for the region, and let the the next edit inserted in this
+   * region be recorded in {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)}
+   * as new oldest seqnum. In case of flush being aborted, we put the stashed value back;
+   * in case of flush succeeding, the seqNum of that first edit after start becomes the
+   * valid oldest seqNum for this region.
+   *
+   * @return current seqNum, to pass on to flushers (who will put it into the metadata of
+   *         the resulting file as an upper-bound seqNum for that file), or NULL if flush
+   *         should not be started.
    */
-  public long startCacheFlush(final byte[] encodedRegionName);
+  public Long startCacheFlush(final byte[] encodedRegionName);
 
   /**
-   * Complete the cache flush
-   * 
-   * Protected by cacheFlushLock
-   * 
-   * @param encodedRegionName
-   * @param tableName
-   * @param logSeqId
-   * @throws IOException
+   * Complete the cache flush.
+   * @param encodedRegionName Encoded region name.
    */
-  public void completeCacheFlush(final byte[] encodedRegionName,
-      final byte[] tableName, final long logSeqId, final boolean isMetaRegion)
-      throws IOException;
+  public void completeCacheFlush(final byte[] encodedRegionName);
 
   /**
    * Abort a cache flush. Call if the flush fails. Note that the only recovery
    * for an aborted flush currently is a restart of the regionserver so the
-   * snapshot content dropped by the failure gets restored to the memstore.
+   * snapshot content dropped by the failure gets restored to the memstore.v
+   * @param encodedRegionName Encoded region name.
    */
   public void abortCacheFlush(byte[] encodedRegionName);
 
@@ -395,4 +362,11 @@ public interface HLog {
    * @return lowReplicationRollEnabled
    */
   public boolean isLowReplicationRollEnabled();
+
+  /** Gets the earliest sequence number in the memstore for this particular region.
+   * This can serve as best-effort "recent" WAL number for this region.
+   * @param encodedRegionName The region to get the number for.
+   * @return The number if present, HConstants.NO_SEQNUM if absent.
+   */
+  public long getEarliestMemstoreSeqNum(byte[] encodedRegionName);
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java Thu Feb 14 12:58:12 2013
@@ -26,9 +26,9 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
 
@@ -50,6 +50,13 @@ public class HLogFactory {
         final String prefix) throws IOException {
       return new FSHLog(fs, root, logName, conf, listeners, prefix);
     }
+
+    public static HLog createMetaHLog(final FileSystem fs, final Path root, final String logName,
+        final Configuration conf, final List<WALActionsListener> listeners,
+        final String prefix) throws IOException {
+      return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, 
+            conf, listeners, false, prefix, true);
+    }
     
     /*
      * WAL Reader
@@ -62,7 +69,9 @@ public class HLogFactory {
     }
     
     /**
-     * Create a reader for the WAL.
+     * Create a reader for the WAL. If you are reading from a file that's being written to
+     * and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method
+     * then just seek back to the last known good position.
      * @return A WAL reader.  Close when done with it.
      * @throws IOException
      */

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Thu Feb 14 12:58:12 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
@@ -302,6 +303,11 @@ public class HLogSplitter {
             + ": " + logPath + ", length=" + logLength);
         Reader in = null;
         try {
+          //actually, for meta-only hlogs, we don't need to go thru the process
+          //of parsing and segregating by regions since all the logs are for
+          //meta only. However, there is a sequence number that can be obtained
+          //only by parsing.. so we parse for all files currently
+          //TODO: optimize this part somehow
           in = getReader(fs, log, conf, skipErrors);
           if (in != null) {
             parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java Thu Feb 14 12:58:12 2013
@@ -46,8 +46,6 @@ import org.apache.hadoop.hbase.util.Byte
 public class HLogUtil {
   static final Log LOG = LogFactory.getLog(HLogUtil.class);
 
-  static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH");
-
   /**
    * @param family
    * @return true if the column is a meta column
@@ -76,7 +74,8 @@ public class HLogUtil {
   /**
    * Pattern used to validate a HLog file name
    */
-  private static final Pattern pattern = Pattern.compile(".*\\.\\d*");
+  private static final Pattern pattern = 
+      Pattern.compile(".*\\.\\d*("+HLog.META_HLOG_FILE_EXTN+")*");
 
   /**
    * @param filename
@@ -243,32 +242,6 @@ public class HLogUtil {
   }
 
   /**
-   * Return regions (memstores) that have edits that are equal or less than the
-   * passed <code>oldestWALseqid</code>.
-   * 
-   * @param oldestWALseqid
-   * @param regionsToSeqids
-   *          Encoded region names to sequence ids
-   * @return All regions whose seqid is < than <code>oldestWALseqid</code> (Not
-   *         necessarily in order). Null if no regions found.
-   */
-  static byte[][] findMemstoresWithEditsEqualOrOlderThan(
-      final long oldestWALseqid, final Map<byte[], Long> regionsToSeqids) {
-    // This method is static so it can be unit tested the easier.
-    List<byte[]> regions = null;
-    for (Map.Entry<byte[], Long> e : regionsToSeqids.entrySet()) {
-      if (e.getValue().longValue() <= oldestWALseqid) {
-        if (regions == null)
-          regions = new ArrayList<byte[]>();
-        // Key is encoded region name.
-        regions.add(e.getKey());
-      }
-    }
-    return regions == null ? null : regions
-        .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
-  }
-
-  /**
    * Returns sorted set of edit files made by wal-log splitter, excluding files
    * with '.temp' suffix.
    * 
@@ -312,4 +285,11 @@ public class HLogUtil {
     }
     return filesSorted;
   }
+
+  public static boolean isMetaFile(Path p) {
+    if (p.getName().endsWith(HLog.META_HLOG_FILE_EXTN)) {
+      return true;
+    }
+    return false;
+  }
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java Thu Feb 14 12:58:12 2013
@@ -140,15 +140,17 @@ public class SequenceFileLogReader imple
 
   Configuration conf;
   WALReader reader;
+  FileSystem fs;
 
   // Needed logging exceptions
   Path path;
   int edit = 0;
   long entryStart = 0;
+  boolean emptyCompressionContext = true;
   /**
    * Compression context to use reading.  Can be null if no compression.
    */
-  private CompressionContext compressionContext = null;
+  protected CompressionContext compressionContext = null;
 
   protected Class<? extends HLogKey> keyClass;
 
@@ -174,6 +176,7 @@ public class SequenceFileLogReader imple
     this.conf = conf;
     this.path = path;
     reader = new WALReader(fs, path, conf);
+    this.fs = fs;
 
     // If compression is enabled, new dictionaries are created here.
     boolean compression = reader.isWALCompressionEnabled();
@@ -238,11 +241,22 @@ public class SequenceFileLogReader imple
       throw addFileInfoToException(ioe);
     }
     edit++;
+    if (compressionContext != null && emptyCompressionContext) {
+      emptyCompressionContext = false;
+    }
     return b? e: null;
   }
 
   @Override
   public void seek(long pos) throws IOException {
+    if (compressionContext != null && emptyCompressionContext) {
+      while (next() != null) {
+        if (getPosition() == pos) {
+          emptyCompressionContext = false;
+          break;
+        }
+      }
+    }
     try {
       reader.seek(pos);
     } catch (IOException ioe) {
@@ -252,7 +266,7 @@ public class SequenceFileLogReader imple
 
   @Override
   public long getPosition() throws IOException {
-    return reader.getPosition();
+    return reader != null ? reader.getPosition() : 0;
   }
 
   protected IOException addFileInfoToException(final IOException ioe)
@@ -287,4 +301,11 @@ public class SequenceFileLogReader imple
 
     return ioe;
   }
+
+  @Override
+  public void reset() throws IOException {
+    // Resetting the reader lets us see newly added data if the file is being written to
+    // We also keep the same compressionContext which was previously populated for this file
+    reader = new WALReader(fs, path, conf);
+  }
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java Thu Feb 14 12:58:12 2013
@@ -40,7 +40,7 @@ import org.apache.hadoop.io.compress.Com
 import org.apache.hadoop.io.compress.DefaultCodec;
 
 /**
- * Implementation of {@link FSHLog.Writer} that delegates to
+ * Implementation of {@link HLog.Writer} that delegates to
  * SequenceFile.Writer.
  */
 @InterfaceAudience.Private
@@ -244,7 +244,12 @@ public class SequenceFileLogWriter imple
 
   @Override
   public void sync() throws IOException {
-    this.writer.syncFs();
+    try {
+      this.writer.syncFs();
+    } catch (NullPointerException npe) {
+      // Concurrent close...
+      throw new IOException(npe);
+    }
   }
 
   @Override

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Thu Feb 14 12:58:12 2013
@@ -90,7 +90,7 @@ public class ReplicationPeer implements 
   }
 
   private void readPeerStateZnode() throws DeserializationException {
-    this.peerEnabled.set(ReplicationZookeeper.isPeerEnabled(this.peerStateTracker.getData(false)));
+    this.peerEnabled.set(ReplicationZookeeper.isStateEnabled(this.peerStateTracker.getData(false)));
   }
 
   /**

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Thu Feb 14 12:58:12 2013
@@ -43,13 +43,12 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -87,7 +86,7 @@ import com.google.protobuf.InvalidProtoc
  * </pre>
  */
 @InterfaceAudience.Private
-public class ReplicationZookeeper implements Closeable{
+public class ReplicationZookeeper implements Closeable {
   private static final Log LOG =
     LogFactory.getLog(ReplicationZookeeper.class);
   // Name of znode we use to lock when failover
@@ -111,24 +110,24 @@ public class ReplicationZookeeper implem
   // peers' id node; e.g. /hbase/replication/peers/PEER_ID/peer-state
   private String peerStateNodeName;
   private final Configuration conf;
-  // Is this cluster replicating at the moment?
-  private AtomicBoolean replicating;
   // The key to our own cluster
   private String ourClusterKey;
   // Abortable
   private Abortable abortable;
-  private ReplicationStatusTracker statusTracker;
+  private final ReplicationStateInterface replicationState;
 
   /**
    * ZNode content if enabled state.
    */
   // Public so it can be seen by test code.
-  public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
+  public static final byte[] ENABLED_ZNODE_BYTES =
+      toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
 
   /**
    * ZNode content if disabled state.
    */
-  static final byte[] DISABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
+  static final byte[] DISABLED_ZNODE_BYTES =
+      toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
 
   /**
    * Constructor used by clients of replication (like master and HBase clients)
@@ -140,8 +139,9 @@ public class ReplicationZookeeper implem
       final ZooKeeperWatcher zk) throws KeeperException {
     this.conf = conf;
     this.zookeeper = zk;
-    this.replicating = new AtomicBoolean();
     setZNodes(abortable);
+    this.replicationState =
+        new ReplicationStateImpl(this.zookeeper, getRepStateNode(), abortable, new AtomicBoolean());
   }
 
   /**
@@ -157,9 +157,10 @@ public class ReplicationZookeeper implem
     this.abortable = server;
     this.zookeeper = server.getZooKeeper();
     this.conf = server.getConfiguration();
-    this.replicating = replicating;
     setZNodes(server);
 
+    this.replicationState =
+        new ReplicationStateImpl(this.zookeeper, getRepStateNode(), server, replicating);
     this.peerClusters = new HashMap<String, ReplicationPeer>();
     ZKUtil.createWithParents(this.zookeeper,
         ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
@@ -180,11 +181,6 @@ public class ReplicationZookeeper implem
     ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
     this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
     ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
-
-    // Set a tracker on replicationStateNodeNode
-    this.statusTracker = new ReplicationStatusTracker(this.zookeeper, abortable);
-    statusTracker.start();
-    readReplicationStateZnode();
   }
 
   private void connectExistingPeers() throws IOException, KeeperException {
@@ -366,18 +362,6 @@ public class ReplicationZookeeper implem
   }
 
   /**
-   * Set the new replication state for this cluster
-   * @param newState
-   */
-  public void setReplicating(boolean newState) throws KeeperException {
-    ZKUtil.createWithParents(this.zookeeper,
-        ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
-    byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
-    ZKUtil.setData(this.zookeeper,
-      ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName), stateBytes);
-  }
-
-  /**
    * Remove the peer from zookeeper. which will trigger the watchers on every
    * region server and close their sources
    * @param id
@@ -411,8 +395,12 @@ public class ReplicationZookeeper implem
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
       ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
         toByteArray(clusterKey));
+      // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+      // peer-state znode. This happens while adding a peer.
+      // The peer state data is set as "ENABLED" by default.
+      ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
+        ENABLED_ZNODE_BYTES);
       // A peer is enabled by default
-      ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id), ENABLED_ZNODE_BYTES);
     } catch (KeeperException e) {
       throw new IOException("Unable to add peer", e);
     }
@@ -637,40 +625,27 @@ public class ReplicationZookeeper implem
     return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
   }
 
-  /**
-   * This reads the state znode for replication and sets the atomic boolean
-   */
-  private void readReplicationStateZnode() {
-    try {
-      this.replicating.set(getReplication());
-      LOG.info("Replication is now " + (this.replicating.get()?
-        "started" : "stopped"));
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed getting data on from " + getRepStateNode(), e);
-    }
+  private String getRepStateNode() {
+    return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
   }
 
   /**
-   * Get the replication status of this cluster. If the state znode doesn't
-   * exist it will also create it and set it true.
+   * Get the replication status of this cluster. If the state znode doesn't exist it will also
+   * create it and set it true.
    * @return returns true when it's enabled, else false
    * @throws KeeperException
    */
   public boolean getReplication() throws KeeperException {
-    byte [] data = this.statusTracker.getData(false);
-    if (data == null || data.length == 0) {
-      setReplicating(true);
-      return true;
-    }
-    try {
-      return isPeerEnabled(data);
-    } catch (DeserializationException e) {
-      throw ZKUtil.convert(e);
-    }
+    return this.replicationState.getState();
   }
 
-  private String getRepStateNode() {
-    return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
+  /**
+   * Set the new replication state for this cluster
+   * @param newState
+   * @throws KeeperException
+   */
+  public void setReplication(boolean newState) throws KeeperException {
+    this.replicationState.setState(newState);
   }
 
   /**
@@ -817,6 +792,58 @@ public class ReplicationZookeeper implem
   }
 
   /**
+   * It "atomically" copies all the hlogs queues from another region server and returns them all
+   * sorted per peer cluster (appended with the dead server's znode).
+   * @param znode
+   * @return HLog queues sorted per peer cluster
+   */
+  public SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
+    SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
+    String deadRSZnodePath = ZKUtil.joinZNode(rsZNode, znode);// hbase/replication/rs/deadrs
+    List<String> peerIdsToProcess = null;
+    List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+    try {
+      peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
+      if (peerIdsToProcess == null) return null; // node already processed
+      for (String peerId : peerIdsToProcess) {
+        String newPeerId = peerId + "-" + znode;
+        String newPeerZnode = ZKUtil.joinZNode(this.rsServerNameZnode, newPeerId);
+        // check the logs queue for the old peer cluster
+        String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
+        List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
+        if (hlogs == null || hlogs.size() == 0) continue; // empty log queue.
+        // create the new cluster znode
+        SortedSet<String> logQueue = new TreeSet<String>();
+        queues.put(newPeerId, logQueue);
+        ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
+        listOfOps.add(op);
+        // get the offset of the logs and set it to new znodes
+        for (String hlog : hlogs) {
+          String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog);
+          byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode);
+          LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset));
+          String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog);
+          listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
+          // add ops for deleting
+          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode));
+          logQueue.add(hlog);
+        }
+        // add delete op for peer
+        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+      }
+      // add delete op for dead rs
+      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
+      LOG.debug(" The multi list size is: " + listOfOps.size());
+      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
+      LOG.info("Atomically moved the dead regionserver logs. ");
+    } catch (KeeperException e) {
+      // Multi call failed; it looks like some other regionserver took away the logs.
+      LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
+    }
+    return queues;
+  }
+
+  /**
    * This methods copies all the hlogs queues from another region server
    * and returns them all sorted per peer cluster (appended with the dead
    * server's znode)
@@ -1021,6 +1048,15 @@ public class ReplicationZookeeper implem
   public Map<String, ReplicationPeer> getPeerClusters() {
     return this.peerClusters;
   }
+  
+  /**
+   * Determine if a ZK path points to a peer node.
+   * @param path path to be checked
+   * @return true if the path points to a peer node, otherwise false
+   */
+  public boolean isPeerPath(String path) {
+    return path.split("/").length == peersZNode.split("/").length + 1;
+  }
 
   /**
    * Extracts the znode name of a peer cluster from a ZK path
@@ -1051,8 +1087,7 @@ public class ReplicationZookeeper implem
 
   @Override
   public void close() throws IOException {
-    if (statusTracker != null)
-      statusTracker.stop();
+    if (replicationState != null) replicationState.close();
   }
 
   /**
@@ -1067,7 +1102,10 @@ public class ReplicationZookeeper implem
   static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
       throws NodeExistsException, KeeperException {
     if (ZKUtil.checkExists(zookeeper, path) == -1) {
-      ZKUtil.createAndWatch(zookeeper, path, ENABLED_ZNODE_BYTES);
+      // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+      // peer-state znode. This happens while adding a peer.
+      // The peer state data is set as "ENABLED" by default.
+      ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path, ENABLED_ZNODE_BYTES);
       return true;
     }
     return false;
@@ -1079,26 +1117,8 @@ public class ReplicationZookeeper implem
    *         serialized ENABLED state.
    * @throws DeserializationException
    */
-  static boolean isPeerEnabled(final byte[] bytes) throws DeserializationException {
+  static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
     ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
     return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
   }
-
-  /**
-   * Tracker for status of the replication
-   */
-  public class ReplicationStatusTracker extends ZooKeeperNodeTracker {
-    public ReplicationStatusTracker(ZooKeeperWatcher watcher,
-        Abortable abortable) {
-      super(watcher, getRepStateNode(), abortable);
-    }
-
-    @Override
-    public synchronized void nodeDataChanged(String path) {
-      if (path.equals(node)) {
-        super.nodeDataChanged(path);
-        readReplicationStateZnode();
-      }
-    }
-  }
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Thu Feb 14 12:58:12 2013
@@ -148,7 +148,7 @@ public class ReplicationLogCleaner exten
       this.zkHelper.getZookeeperWatcher().close();
     }
     // Not sure why we're deleting a connection that we never acquired or used
-    HConnectionManager.deleteConnection(this.getConf(), true);
+    HConnectionManager.deleteConnection(this.getConf());
   }
 
   @Override