You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/01/21 22:36:25 UTC

svn commit: r1436633 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/util/ hbase-common/src/test/java/org/apache/hadoop/hbase/util/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ hbase-server/src/main/java/org/apach...

Author: tedyu
Date: Mon Jan 21 21:36:25 2013
New Revision: 1436633

URL: http://svn.apache.org/viewvc?rev=1436633&view=rev
Log:
HBASE-7329 Revert due to compilation error


Removed:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java
    hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java Mon Jan 21 21:36:25 2013
@@ -126,7 +126,7 @@ public class WALPlayer extends Configure
           Delete del = null;
           KeyValue lastKV = null;
           for (KeyValue kv : value.getKeyValues()) {
-            // filtering HLog meta entries
+            // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit
             if (HLogUtil.isMetaFamily(kv.getFamily())) continue;
 
             // A WALEdit may contain multiple operations (HBASE-3584) and/or

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Jan 21 21:36:25 2013
@@ -1553,26 +1553,17 @@ public class HRegion implements HeapSize
     long flushsize = this.memstoreSize.get();
     status.setStatus("Preparing to flush by snapshotting stores");
     List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
-    long flushSeqId = -1L;
+    long completeSeqId = -1L;
     try {
       // Record the mvcc for all transactions in progress.
       w = mvcc.beginMemstoreInsert();
       mvcc.advanceMemstore(w);
 
-      if (wal != null) {
-        Long startSeqId = wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
-        if (startSeqId == null) {
-          status.setStatus("Flush will not be started for [" + this.regionInfo.getEncodedName()
-              + "] - WAL is going away");
-          return false;
-        }
-        flushSeqId = startSeqId.longValue();
-      } else {
-        flushSeqId = myseqid;
-      }
-
+      sequenceId = (wal == null)? myseqid:
+        wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
+      completeSeqId = this.getCompleteCacheFlushSequenceId(sequenceId);
       for (Store s : stores.values()) {
-        storeFlushers.add(s.getStoreFlusher(flushSeqId));
+        storeFlushers.add(s.getStoreFlusher(completeSeqId));
       }
 
       // prepare flush (take a snapshot)
@@ -1641,14 +1632,22 @@ public class HRegion implements HeapSize
       throw dse;
     }
 
-    // If we get to here, the HStores have been written.
+    // If we get to here, the HStores have been written. If we get an
+    // error in completeCacheFlush it will release the lock it is holding
+
+    // B.  Write a FLUSHCACHE-COMPLETE message to the log.
+    //     This tells future readers that the HStores were emitted correctly,
+    //     and that all updates to the log for this regionName that have lower
+    //     log-sequence-ids can be safely ignored.
     if (wal != null) {
-      wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes());
+      wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(),
+        regionInfo.getTableName(), completeSeqId,
+        this.getRegionInfo().isMetaRegion());
     }
 
     // Update the last flushed sequence id for region
     if (this.rsServices != null) {
-      completeSequenceId = flushSeqId;
+      completeSequenceId = completeSeqId;
     }
 
     // C. Finally notify anyone waiting on memstore to clear:
@@ -1673,6 +1672,18 @@ public class HRegion implements HeapSize
     return compactionRequested;
   }
 
+   /**
+   * Get the sequence number to be associated with this cache flush. Used by
+   * TransactionalRegion to not complete pending transactions.
+   *
+   *
+   * @param currentSequenceId
+   * @return sequence id to complete the cache flush with
+   */
+  protected long getCompleteCacheFlushSequenceId(long currentSequenceId) {
+    return currentSequenceId;
+  }
+
   //////////////////////////////////////////////////////////////////////////////
   // get() methods for client use.
   //////////////////////////////////////////////////////////////////////////////

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Mon Jan 21 21:36:25 2013
@@ -24,10 +24,8 @@ import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URLEncoder;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -59,7 +57,6 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.DrainBarrier;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HasThread;
@@ -135,45 +132,23 @@ class FSHLog implements HLog, Syncable {
   private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
   final static Object [] NO_ARGS = new Object []{};
 
-  /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */
-  private DrainBarrier closeBarrier = new DrainBarrier();
-
-  /**
+  /*
    * Current log file.
    */
   Writer writer;
 
-  /**
+  /*
    * Map of all log files but the current one.
    */
   final SortedMap<Long, Path> outputfiles =
     Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
 
-
-  /**
-   * This lock synchronizes all operations on oldestUnflushedSeqNums and oldestFlushingSeqNums,
-   * with the exception of append's putIfAbsent into oldestUnflushedSeqNums.
-   * We only use these to find out the low bound seqNum, or to find regions with old seqNums to
-   * force flush them, so we don't care about these numbers messing with anything. */
-  private final Object oldestSeqNumsLock = new Object();
-
-  /**
-   * This lock makes sure only one log roll runs at the same time. Should not be taken while
-   * any other lock is held. We don't just use synchronized because that results in bogus and
-   * tedious findbugs warning when it thinks synchronized controls writer thread safety */
-  private final Object rollWriterLock = new Object();
-
-  /**
-   * Map of encoded region names to their most recent sequence/edit id in their memstore.
+  /*
+   * Map of encoded region names to their most recent sequence/edit id in their
+   * memstore.
    */
-  private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedSeqNums =
+  private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
     new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
-  /**
-   * Map of encoded region names to their most recent sequence/edit id in their memstore;
-   * contains the regions that are currently flushing. That way we can store two numbers for
-   * flushing and non-flushing (oldestUnflushedSeqNums) memstore for the same region.
-   */
-  private final Map<byte[], Long> oldestFlushingSeqNums = new HashMap<byte[], Long>();
 
   private volatile boolean closed = false;
 
@@ -203,6 +178,10 @@ class FSHLog implements HLog, Syncable {
   // of the default Hdfs block size.
   private final long logrollsize;
 
+  // This lock prevents starting a log roll during a cache flush.
+  // synchronized is insufficient because a cache flush spans two method calls.
+  private final Lock cacheFlushLock = new ReentrantLock();
+
   // We synchronize on updateLock to prevent updates and to prevent a log roll
   // during an update
   // locked during appends
@@ -493,77 +472,88 @@ class FSHLog implements HLog, Syncable {
   @Override
   public byte [][] rollWriter(boolean force)
       throws FailedLogCloseException, IOException {
-    synchronized (rollWriterLock) {
-      // Return if nothing to flush.
-      if (!force && this.writer != null && this.numEntries.get() <= 0) {
-        return null;
+    // Return if nothing to flush.
+    if (!force && this.writer != null && this.numEntries.get() <= 0) {
+      return null;
+    }
+    byte [][] regionsToFlush = null;
+    this.cacheFlushLock.lock();
+    try {
+      this.logRollRunning = true;
+      if (closed) {
+        LOG.debug("HLog closed.  Skipping rolling of writer");
+        return regionsToFlush;
+      }
+      // Do all the preparation outside of the updateLock to block
+      // as less as possible the incoming writes
+      long currentFilenum = this.filenum;
+      Path oldPath = null;
+      if (currentFilenum > 0) {
+        //computeFilename  will take care of meta hlog filename
+        oldPath = computeFilename(currentFilenum);
       }
-      byte [][] regionsToFlush = null;
-      try {
-        this.logRollRunning = true;
-        boolean isClosed = closed;
-        if (isClosed || !closeBarrier.beginOp()) {
-          LOG.debug("HLog " + (isClosed ? "closed" : "closing") + ". Skipping rolling of writer");
-          return regionsToFlush;
-        }
-        // Do all the preparation outside of the updateLock to block
-        // as less as possible the incoming writes
-        long currentFilenum = this.filenum;
-        Path oldPath = null;
-        if (currentFilenum > 0) {
-          //computeFilename  will take care of meta hlog filename
-          oldPath = computeFilename(currentFilenum);
-        }
-        this.filenum = System.currentTimeMillis();
-        Path newPath = computeFilename();
-
-        // Tell our listeners that a new log is about to be created
-        if (!this.listeners.isEmpty()) {
-          for (WALActionsListener i : this.listeners) {
-            i.preLogRoll(oldPath, newPath);
-          }
+      this.filenum = System.currentTimeMillis();
+      Path newPath = computeFilename();
+
+      // Tell our listeners that a new log is about to be created
+      if (!this.listeners.isEmpty()) {
+        for (WALActionsListener i : this.listeners) {
+          i.preLogRoll(oldPath, newPath);
         }
-        FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
-        // Can we get at the dfsclient outputstream?  If an instance of
-        // SFLW, it'll have done the necessary reflection to get at the
-        // protected field name.
-        FSDataOutputStream nextHdfsOut = null;
-        if (nextWriter instanceof SequenceFileLogWriter) {
-          nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
-        }
-
-        Path oldFile = null;
-        int oldNumEntries = 0;
-        synchronized (updateLock) {
-          // Clean up current writer.
-          oldNumEntries = this.numEntries.get();
-          oldFile = cleanupCurrentWriter(currentFilenum);
-          this.writer = nextWriter;
-          this.hdfs_out = nextHdfsOut;
-          this.numEntries.set(0);
-        }
-        LOG.info("Rolled log" + (oldFile != null ? " for file=" + FSUtils.getPath(oldFile)
-          + ", entries=" + oldNumEntries + ", filesize=" + this.fs.getFileStatus(oldFile).getLen()
-          : "" ) + "; new path=" + FSUtils.getPath(newPath));
-
-        // Tell our listeners that a new log was created
-        if (!this.listeners.isEmpty()) {
-          for (WALActionsListener i : this.listeners) {
-            i.postLogRoll(oldPath, newPath);
-          }
+      }
+      FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
+      // Can we get at the dfsclient outputstream?  If an instance of
+      // SFLW, it'll have done the necessary reflection to get at the
+      // protected field name.
+      FSDataOutputStream nextHdfsOut = null;
+      if (nextWriter instanceof SequenceFileLogWriter) {
+        nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
+      }
+
+      synchronized (updateLock) {
+        // Clean up current writer.
+        Path oldFile = cleanupCurrentWriter(currentFilenum);
+        this.writer = nextWriter;
+        this.hdfs_out = nextHdfsOut;
+
+        LOG.info((oldFile != null?
+            "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
+            this.numEntries.get() +
+            ", filesize=" +
+            this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
+          " for " + FSUtils.getPath(newPath));
+        this.numEntries.set(0);
+      }
+      // Tell our listeners that a new log was created
+      if (!this.listeners.isEmpty()) {
+        for (WALActionsListener i : this.listeners) {
+          i.postLogRoll(oldPath, newPath);
         }
+      }
 
-        // Can we delete any of the old log files?
-        if (getNumLogFiles() > 0) {
-          cleanOldLogs();
-          regionsToFlush = getRegionsToForceFlush();
+      // Can we delete any of the old log files?
+      if (this.outputfiles.size() > 0) {
+        if (this.lastSeqWritten.isEmpty()) {
+          LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
+          // If so, then no new writes have come in since all regions were
+          // flushed (and removed from the lastSeqWritten map). Means can
+          // remove all but currently open log file.
+          for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
+            archiveLogFile(e.getValue(), e.getKey());
+          }
+          this.outputfiles.clear();
+        } else {
+          regionsToFlush = cleanOldLogs();
         }
-      } finally {
+      }
+    } finally {
+      try {
         this.logRollRunning = false;
-        closeBarrier.endOp();
+      } finally {
+        this.cacheFlushLock.unlock();
       }
-      return regionsToFlush;
     }
+    return regionsToFlush;
   }
 
   /**
@@ -591,64 +581,36 @@ class FSHLog implements HLog, Syncable {
    * encoded region names to flush.
    * @throws IOException
    */
-  private void cleanOldLogs() throws IOException {
-    long oldestOutstandingSeqNum = Long.MAX_VALUE;
-    synchronized (oldestSeqNumsLock) {
-      Long oldestFlushing = (oldestFlushingSeqNums.size() > 0)
-        ? Collections.min(oldestFlushingSeqNums.values()) : Long.MAX_VALUE;
-      Long oldestUnflushed = (oldestUnflushedSeqNums.size() > 0)
-        ? Collections.min(oldestUnflushedSeqNums.values()) : Long.MAX_VALUE;
-      oldestOutstandingSeqNum = Math.min(oldestFlushing, oldestUnflushed);
-    }
-
+  private byte [][] cleanOldLogs() throws IOException {
+    Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
     // Get the set of all log files whose last sequence number is smaller than
     // the oldest edit's sequence number.
     TreeSet<Long> sequenceNumbers = new TreeSet<Long>(this.outputfiles.headMap(
         oldestOutstandingSeqNum).keySet());
     // Now remove old log files (if any)
-    if (LOG.isDebugEnabled()) {
-      if (sequenceNumbers.size() > 0) {
-        LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove" +
+    int logsToRemove = sequenceNumbers.size();
+    if (logsToRemove > 0) {
+      if (LOG.isDebugEnabled()) {
+        // Find associated region; helps debugging.
+        byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
+        LOG.debug("Found " + logsToRemove + " hlogs to remove" +
           " out of total " + this.outputfiles.size() + ";" +
-          " oldest outstanding sequenceid is " + oldestOutstandingSeqNum);
+          " oldest outstanding sequenceid is " + oldestOutstandingSeqNum +
+          " from region " + Bytes.toStringBinary(oldestRegion));
       }
-    }
-    for (Long seq : sequenceNumbers) {
-      archiveLogFile(this.outputfiles.remove(seq), seq);
-    }
-  }
-
-  /**
-   * Return regions that have edits that are equal or less than a certain sequence number.
-   * Static due to some old unit test.
-   * @param walSeqNum The sequence number to compare with.
-   * @param regionsToSeqNums Encoded region names to sequence ids
-   * @return All regions whose seqNum <= walSeqNum. Null if no regions found.
-   */
-  static byte[][] findMemstoresWithEditsEqualOrOlderThan(
-      final long walSeqNum, final Map<byte[], Long> regionsToSeqNums) {
-    List<byte[]> regions = null;
-    for (Map.Entry<byte[], Long> e : regionsToSeqNums.entrySet()) {
-      if (e.getValue().longValue() <= walSeqNum) {
-        if (regions == null) regions = new ArrayList<byte[]>();
-        regions.add(e.getKey());
+      for (Long seq : sequenceNumbers) {
+        archiveLogFile(this.outputfiles.remove(seq), seq);
       }
     }
-    return regions == null ? null : regions
-        .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
-  }
 
-  private byte[][] getRegionsToForceFlush() throws IOException {
     // If too many log files, figure which regions we need to flush.
     // Array is an array of encoded region names.
     byte [][] regions = null;
-    int logCount = getNumLogFiles();
+    int logCount = this.outputfiles.size();
     if (logCount > this.maxLogs && logCount > 0) {
       // This is an array of encoded region names.
-      synchronized (oldestSeqNumsLock) {
-        regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
-          this.oldestUnflushedSeqNums);
-      }
+      regions = HLogUtil.findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
+        this.lastSeqWritten);
       if (regions != null) {
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < regions.length; i++) {
@@ -664,6 +626,29 @@ class FSHLog implements HLog, Syncable {
   }
 
   /*
+   * @return Logs older than this id are safe to remove.
+   */
+  private Long getOldestOutstandingSeqNum() {
+    return Collections.min(this.lastSeqWritten.values());
+  }
+
+  /**
+   * @param oldestOutstandingSeqNum
+   * @return (Encoded) name of oldest outstanding region.
+   */
+  private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
+    byte [] oldestRegion = null;
+    for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
+      if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
+        // Key is encoded region name.
+        oldestRegion = e.getKey();
+        break;
+      }
+    }
+    return oldestRegion;
+  }
+
+  /*
    * Cleans up current writer closing and adding to outputfiles.
    * Presumes we're operating inside an updateLock scope.
    * @return Path to current writer or null if none.
@@ -795,39 +780,33 @@ class FSHLog implements HLog, Syncable {
 
   @Override
   public void close() throws IOException {
-    if (this.closed) {
-      return;
-    }
     try {
       logSyncerThread.close();
       // Make sure we synced everything
       logSyncerThread.join(this.optionalFlushInterval*2);
     } catch (InterruptedException e) {
       LOG.error("Exception while waiting for syncer thread to die", e);
-      Thread.currentThread().interrupt();
-    }
-    try {
-      // Prevent all further flushing and rolling.
-      closeBarrier.stopAndDrainOps();
-    } catch (InterruptedException e) {
-      LOG.error("Exception while waiting for cache flushes and log rolls", e);
-      Thread.currentThread().interrupt();
     }
 
-    // Tell our listeners that the log is closing
-    if (!this.listeners.isEmpty()) {
-      for (WALActionsListener i : this.listeners) {
-        i.logCloseRequested();
-      }
-    }
-    synchronized (updateLock) {
-      this.closed = true;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("closing hlog writer in " + this.dir.toString());
+    cacheFlushLock.lock();
+    try {
+      // Tell our listeners that the log is closing
+      if (!this.listeners.isEmpty()) {
+        for (WALActionsListener i : this.listeners) {
+          i.logCloseRequested();
+        }
       }
-      if (this.writer != null) {
-        this.writer.close();
+      synchronized (updateLock) {
+        this.closed = true;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("closing hlog writer in " + this.dir.toString());
+        }
+        if (this.writer != null) {
+          this.writer.close();
+        }
       }
+    } finally {
+      cacheFlushLock.unlock();
     }
   }
 
@@ -859,7 +838,7 @@ class FSHLog implements HLog, Syncable {
       // memstore). When the cache is flushed, the entry for the
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
-      this.oldestUnflushedSeqNums.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
+      this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
         Long.valueOf(seqNum));
       doWrite(regionInfo, logKey, logEdit, htd);
       txid = this.unflushedEntries.incrementAndGet();
@@ -931,7 +910,7 @@ class FSHLog implements HLog, Syncable {
         // Use encoded name.  Its shorter, guaranteed unique and a subset of
         // actual  name.
         byte [] encodedRegionName = info.getEncodedNameAsBytes();
-        this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
+        this.lastSeqWritten.putIfAbsent(encodedRegionName, seqNum);
         HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
         doWrite(info, logKey, edits, htd);
         this.numEntries.incrementAndGet();
@@ -1063,11 +1042,7 @@ class FSHLog implements HLog, Syncable {
     Writer tempWriter;
     synchronized (this.updateLock) {
       if (this.closed) return;
-      // Guaranteed non-null.
-      // Note that parallel sync can close tempWriter.
-      // The current method of dealing with this is to catch exceptions.
-      // See HBASE-4387, HBASE-5623, HBASE-7329.
-      tempWriter = this.writer;
+      tempWriter = this.writer; // guaranteed non-null
     }
     // if the transaction that we are interested in is already 
     // synced, then return immediately.
@@ -1103,11 +1078,9 @@ class FSHLog implements HLog, Syncable {
       }
       try {
         tempWriter.sync();
-      } catch(IOException ex) {
+      } catch(IOException io) {
         synchronized (this.updateLock) {
           // HBASE-4387, HBASE-5623, retry with updateLock held
-          // TODO: we don't actually need to do it for concurrent close - what is the point
-          //       of syncing new unrelated writer? Keep behavior for now.
           tempWriter = this.writer;
           tempWriter.sync();
         }
@@ -1115,9 +1088,6 @@ class FSHLog implements HLog, Syncable {
       this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
 
       this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
-      // TODO: preserving the old behavior for now, but this check is strange. It's not
-      //       protected by any locks here, so for all we know rolling locks might start
-      //       as soon as we enter the "if". Is this best-effort optimization check?
       if (!this.logRollRunning) {
         checkLowReplication();
         try {
@@ -1280,61 +1250,107 @@ class FSHLog implements HLog, Syncable {
     return outputfiles.size();
   }
 
-  @Override
-  public Long startCacheFlush(final byte[] encodedRegionName) {
-    Long oldRegionSeqNum = null;
-    if (!closeBarrier.beginOp()) {
-      return null;
+  private byte[] getSnapshotName(byte[] encodedRegionName) {
+    byte snp[] = new byte[encodedRegionName.length + 3];
+    // an encoded region name has only hex digits. s, n or p are not hex
+    // and therefore snapshot-names will never collide with
+    // encoded-region-names
+    snp[0] = 's'; snp[1] = 'n'; snp[2] = 'p';
+    for (int i = 0; i < encodedRegionName.length; i++) {
+      snp[i+3] = encodedRegionName[i];
     }
-    synchronized (oldestSeqNumsLock) {
-      oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName);
-      if (oldRegionSeqNum != null) {
-        Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum);
-        assert oldValue == null : "Flushing map not cleaned up for "
-          + Bytes.toString(encodedRegionName);
-      }
-    }
-    if (oldRegionSeqNum == null) {
-      // TODO: if we have no oldRegionSeqNum, and WAL is not disabled, presumably either
-      //       the region is already flushing (which would make this call invalid), or there
-      //       were no appends after last flush, so why are we starting flush? Maybe we should
-      //       assert not null, and switch to "long" everywhere. Less rigorous, but safer,
-      //       alternative is telling the caller to stop. For now preserve old logic.
-      LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
-        + Bytes.toString(encodedRegionName) + "]");
+    return snp;
+  }
+
+  @Override
+  public long startCacheFlush(final byte[] encodedRegionName) {
+    this.cacheFlushLock.lock();
+    Long seq = this.lastSeqWritten.remove(encodedRegionName);
+    // seq is the lsn of the oldest edit associated with this region. If a
+    // snapshot already exists - because the last flush failed - then seq will
+    // be the lsn of the oldest edit in the snapshot
+    if (seq != null) {
+      // keeping the earliest sequence number of the snapshot in
+      // lastSeqWritten maintains the correctness of
+      // getOldestOutstandingSeqNum(). But it doesn't matter really because
+      // everything is being done inside of cacheFlush lock.
+      Long oldseq =
+        lastSeqWritten.put(getSnapshotName(encodedRegionName), seq);
+      if (oldseq != null) {
+        LOG.error("Logic Error Snapshot seq id from earlier flush still" +
+            " present! for region " + Bytes.toString(encodedRegionName) +
+            " overwritten oldseq=" + oldseq + "with new seq=" + seq);
+        Runtime.getRuntime().halt(1);
+      }
     }
     return obtainSeqNum();
   }
 
   @Override
-  public void completeCacheFlush(final byte [] encodedRegionName)
-  {
-    synchronized (oldestSeqNumsLock) {
-      this.oldestFlushingSeqNums.remove(encodedRegionName);
+  public void completeCacheFlush(final byte [] encodedRegionName,
+      final byte [] tableName, final long logSeqId, final boolean isMetaRegion)
+  throws IOException {
+    try {
+      if (this.closed) {
+        return;
+      }
+      long txid = 0;
+      synchronized (updateLock) {
+        long now = EnvironmentEdgeManager.currentTimeMillis();
+        WALEdit edit = completeCacheFlushLogEdit();
+        HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
+            System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
+        logSyncerThread.append(new Entry(key, edit));
+        txid = this.unflushedEntries.incrementAndGet();
+        long took = EnvironmentEdgeManager.currentTimeMillis() - now;
+        long len = 0;
+        for (KeyValue kv : edit.getKeyValues()) {
+          len += kv.getLength();
+        }
+        this.metrics.finishAppend(took, len);
+        this.numEntries.incrementAndGet();
+      }
+      // sync txn to file system
+      this.sync(txid);
+
+    } finally {
+      // updateLock not needed for removing snapshot's entry
+      // Cleaning up of lastSeqWritten is in the finally clause because we
+      // don't want to confuse getOldestOutstandingSeqNum()
+      this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
+      this.cacheFlushLock.unlock();
     }
-    closeBarrier.endOp();
+  }
+
+  private WALEdit completeCacheFlushLogEdit() {
+    KeyValue kv = new KeyValue(HLog.METAROW, HLog.METAFAMILY, null,
+      System.currentTimeMillis(), HLogUtil.COMPLETE_CACHE_FLUSH);
+    WALEdit e = new WALEdit();
+    e.add(kv);
+    return e;
   }
 
   @Override
   public void abortCacheFlush(byte[] encodedRegionName) {
-    Long currentSeqNum = null, seqNumBeforeFlushStarts = null;
-    synchronized (oldestSeqNumsLock) {
-      seqNumBeforeFlushStarts = this.oldestFlushingSeqNums.remove(encodedRegionName);
-      if (seqNumBeforeFlushStarts != null) {
-        currentSeqNum =
-          this.oldestUnflushedSeqNums.put(encodedRegionName, seqNumBeforeFlushStarts);
-      }
-    }
-    closeBarrier.endOp();
-    if ((currentSeqNum != null)
-        && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) {
-      String errorStr = "Region " + Bytes.toString(encodedRegionName) +
-          "acquired edits out of order current memstore seq=" + currentSeqNum
-          + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts;
-      LOG.error(errorStr);
-      assert false : errorStr;
-      Runtime.getRuntime().halt(1);
+    Long snapshot_seq =
+      this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
+    if (snapshot_seq != null) {
+      // updateLock not necessary because we are racing against
+      // lastSeqWritten.putIfAbsent() in append() and we will always win
+      // before releasing cacheFlushLock make sure that the region's entry in
+      // lastSeqWritten points to the earliest edit in the region
+      Long current_memstore_earliest_seq =
+        this.lastSeqWritten.put(encodedRegionName, snapshot_seq);
+      if (current_memstore_earliest_seq != null &&
+          (current_memstore_earliest_seq.longValue() <=
+            snapshot_seq.longValue())) {
+        LOG.error("Logic Error region " + Bytes.toString(encodedRegionName) +
+            "acquired edits out of order current memstore seq=" +
+            current_memstore_earliest_seq + " snapshot seq=" + snapshot_seq);
+        Runtime.getRuntime().halt(1);
+      }
     }
+    this.cacheFlushLock.unlock();
   }
 
   @Override

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Mon Jan 21 21:36:25 2013
@@ -162,14 +162,14 @@ public interface HLog {
     }
   }
 
-  /**
+  /*
    * registers WALActionsListener
    * 
    * @param listener
    */
   public void registerWALActionsListener(final WALActionsListener listener);
 
-  /**
+  /*
    * unregisters WALActionsListener
    * 
    * @param listener
@@ -200,10 +200,18 @@ public interface HLog {
   /**
    * Roll the log writer. That is, start writing log messages to a new file.
    * 
+   * Because a log cannot be rolled during a cache flush, and a cache flush
+   * spans two method calls, a special lock needs to be obtained so that a cache
+   * flush cannot start when the log is being rolled and the log cannot be
+   * rolled during a cache flush.
+   * 
    * <p>
-   * The implementation is synchronized in order to make sure there's one rollWriter
-   * running at any given time.
-   *
+   * Note that this method cannot be synchronized because it is possible that
+   * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
+   * start which would obtain the lock on this but block on obtaining the
+   * cacheFlushLock and then completeCacheFlush could be called which would wait
+   * for the lock on this and consequently never release the cacheFlushLock
+   * 
    * @return If lots of logs, flush the returned regions so next time through we
    *         can clean logs. Returns null if nothing to flush. Names are actual
    *         region names as returned by {@link HRegionInfo#getEncodedName()}
@@ -215,9 +223,17 @@ public interface HLog {
   /**
    * Roll the log writer. That is, start writing log messages to a new file.
    * 
+   * Because a log cannot be rolled during a cache flush, and a cache flush
+   * spans two method calls, a special lock needs to be obtained so that a cache
+   * flush cannot start when the log is being rolled and the log cannot be
+   * rolled during a cache flush.
+   * 
    * <p>
-   * The implementation is synchronized in order to make sure there's one rollWriter
-   * running at any given time.
+   * Note that this method cannot be synchronized because it is possible that
+   * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
+   * start which would obtain the lock on this but block on obtaining the
+   * cacheFlushLock and then completeCacheFlush could be called which would wait
+   * for the lock on this and consequently never release the cacheFlushLock
    * 
    * @param force
    *          If true, force creation of a new writer even if no entries have
@@ -321,33 +337,53 @@ public interface HLog {
   public long obtainSeqNum();
 
   /**
-   * WAL keeps track of the sequence numbers that were not yet flushed from memstores
-   * in order to be able to do cleanup. This method tells WAL that some region is about
-   * to flush memstore.
-   *
-   * We stash the oldest seqNum for the region, and let the the next edit inserted in this
-   * region be recorded in {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)}
-   * as new oldest seqnum. In case of flush being aborted, we put the stashed value back;
-   * in case of flush succeeding, the seqNum of that first edit after start becomes the
-   * valid oldest seqNum for this region.
-   *
-   * @return current seqNum, to pass on to flushers (who will put it into the metadata of
-   *         the resulting file as an upper-bound seqNum for that file), or NULL if flush
-   *         should not be started.
+   * By acquiring a log sequence ID, we can allow log messages to continue while
+   * we flush the cache.
+   * 
+   * Acquire a lock so that we do not roll the log between the start and
+   * completion of a cache-flush. Otherwise the log-seq-id for the flush will
+   * not appear in the correct logfile.
+   * 
+   * Ensuring that flushes and log-rolls don't happen concurrently also allows
+   * us to temporarily put a log-seq-number in lastSeqWritten against the region
+   * being flushed that might not be the earliest in-memory log-seq-number for
+   * that region. By the time the flush is completed or aborted and before the
+   * cacheFlushLock is released it is ensured that lastSeqWritten again has the
+   * oldest in-memory edit's lsn for the region that was being flushed.
+   * 
+   * In this method, by removing the entry in lastSeqWritten for the region
+   * being flushed we ensure that the next edit inserted in this region will be
+   * correctly recorded in
+   * {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)} The
+   * lsn of the earliest in-memory lsn - which is now in the memstore snapshot -
+   * is saved temporarily in the lastSeqWritten map while the flush is active.
+   * 
+   * @return sequence ID to pass
+   *         {@link #completeCacheFlush(byte[], byte[], long, boolean)} (byte[],
+   *         byte[], long)}
+   * @see #completeCacheFlush(byte[], byte[], long, boolean)
+   * @see #abortCacheFlush(byte[])
    */
-  public Long startCacheFlush(final byte[] encodedRegionName);
+  public long startCacheFlush(final byte[] encodedRegionName);
 
   /**
-   * Complete the cache flush.
-   * @param encodedRegionName Encoded region name.
+   * Complete the cache flush
+   * 
+   * Protected by cacheFlushLock
+   * 
+   * @param encodedRegionName
+   * @param tableName
+   * @param logSeqId
+   * @throws IOException
    */
-  public void completeCacheFlush(final byte[] encodedRegionName);
+  public void completeCacheFlush(final byte[] encodedRegionName,
+      final byte[] tableName, final long logSeqId, final boolean isMetaRegion)
+      throws IOException;
 
   /**
    * Abort a cache flush. Call if the flush fails. Note that the only recovery
    * for an aborted flush currently is a restart of the regionserver so the
-   * snapshot content dropped by the failure gets restored to the memstore.v
-   * @param encodedRegionName Encoded region name.
+   * snapshot content dropped by the failure gets restored to the memstore.
    */
   public void abortCacheFlush(byte[] encodedRegionName);
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java Mon Jan 21 21:36:25 2013
@@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.util.Byte
 public class HLogUtil {
   static final Log LOG = LogFactory.getLog(HLogUtil.class);
 
+  static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH");
+
   /**
    * @param family
    * @return true if the column is a meta column
@@ -242,6 +244,32 @@ public class HLogUtil {
   }
 
   /**
+   * Return regions (memstores) that have edits that are equal or less than the
+   * passed <code>oldestWALseqid</code>.
+   * 
+   * @param oldestWALseqid
+   * @param regionsToSeqids
+   *          Encoded region names to sequence ids
+   * @return All regions whose seqid is < than <code>oldestWALseqid</code> (Not
+   *         necessarily in order). Null if no regions found.
+   */
+  static byte[][] findMemstoresWithEditsEqualOrOlderThan(
+      final long oldestWALseqid, final Map<byte[], Long> regionsToSeqids) {
+    // This method is static so it can be unit tested the easier.
+    List<byte[]> regions = null;
+    for (Map.Entry<byte[], Long> e : regionsToSeqids.entrySet()) {
+      if (e.getValue().longValue() <= oldestWALseqid) {
+        if (regions == null)
+          regions = new ArrayList<byte[]>();
+        // Key is encoded region name.
+        regions.add(e.getKey());
+      }
+    }
+    return regions == null ? null : regions
+        .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
+  }
+
+  /**
    * Returns sorted set of edit files made by wal-log splitter, excluding files
    * with '.temp' suffix.
    * 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java Mon Jan 21 21:36:25 2013
@@ -244,12 +244,7 @@ public class SequenceFileLogWriter imple
 
   @Override
   public void sync() throws IOException {
-    try {
-      this.writer.syncFs();
-    } catch (NullPointerException npe) {
-      // Concurrent close...
-      throw new IOException(npe);
-    }
+    this.writer.syncFs();
   }
 
   @Override

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Mon Jan 21 21:36:25 2013
@@ -323,11 +323,11 @@ public class TestHLog  {
       regionsToSeqids.put(l.toString().getBytes(), l);
     }
     byte [][] regions =
-      FSHLog.findMemstoresWithEditsEqualOrOlderThan(1, regionsToSeqids);
+      HLogUtil.findMemstoresWithEditsEqualOrOlderThan(1, regionsToSeqids);
     assertEquals(2, regions.length);
     assertTrue(Bytes.equals(regions[0], "0".getBytes()) ||
         Bytes.equals(regions[0], "1".getBytes()));
-    regions = FSHLog.findMemstoresWithEditsEqualOrOlderThan(3, regionsToSeqids);
+    regions = HLogUtil.findMemstoresWithEditsEqualOrOlderThan(3, regionsToSeqids);
     int count = 4;
     assertEquals(count, regions.length);
     // Regions returned are not ordered.
@@ -518,8 +518,9 @@ public class TestHLog  {
       htd.addFamily(new HColumnDescriptor("column"));
 
       log.append(info, tableName, cols, System.currentTimeMillis(), htd);
-      log.startCacheFlush(info.getEncodedNameAsBytes());
-      log.completeCacheFlush(info.getEncodedNameAsBytes());
+      long logSeqId = log.startCacheFlush(info.getEncodedNameAsBytes());
+      log.completeCacheFlush(info.getEncodedNameAsBytes(), tableName, logSeqId,
+          info.isMetaRegion());
       log.close();
       Path filename = ((FSHLog) log).computeFilename();
       log = null;
@@ -539,6 +540,20 @@ public class TestHLog  {
         assertEquals((byte)(i + '0'), kv.getValue()[0]);
         System.out.println(key + " " + val);
       }
+      HLog.Entry entry = null;
+      while ((entry = reader.next(null)) != null) {
+        HLogKey key = entry.getKey();
+        WALEdit val = entry.getEdit();
+        // Assert only one more row... the meta flushed row.
+        assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
+        assertTrue(Bytes.equals(tableName, key.getTablename()));
+        KeyValue kv = val.getKeyValues().get(0);
+        assertTrue(Bytes.equals(HLog.METAROW, kv.getRow()));
+        assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily()));
+        assertEquals(0, Bytes.compareTo(HLogUtil.COMPLETE_CACHE_FLUSH,
+          val.getKeyValues().get(0).getValue()));
+        System.out.println(key + " " + val);
+      }
     } finally {
       if (log != null) {
         log.closeAndDelete();
@@ -574,8 +589,8 @@ public class TestHLog  {
       HTableDescriptor htd = new HTableDescriptor();
       htd.addFamily(new HColumnDescriptor("column"));
       log.append(hri, tableName, cols, System.currentTimeMillis(), htd);
-      log.startCacheFlush(hri.getEncodedNameAsBytes());
-      log.completeCacheFlush(hri.getEncodedNameAsBytes());
+      long logSeqId = log.startCacheFlush(hri.getEncodedNameAsBytes());
+      log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, logSeqId, false);
       log.close();
       Path filename = ((FSHLog) log).computeFilename();
       log = null;
@@ -593,6 +608,20 @@ public class TestHLog  {
         System.out.println(entry.getKey() + " " + val);
         idx++;
       }
+
+      // Get next row... the meta flushed row.
+      entry = reader.next();
+      assertEquals(1, entry.getEdit().size());
+      for (KeyValue val : entry.getEdit().getKeyValues()) {
+        assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
+          entry.getKey().getEncodedRegionName()));
+        assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
+        assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
+        assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
+        assertEquals(0, Bytes.compareTo(HLogUtil.COMPLETE_CACHE_FLUSH,
+          val.getValue()));
+        System.out.println(entry.getKey() + " " + val);
+      }
     } finally {
       if (log != null) {
         log.closeAndDelete();
@@ -676,19 +705,17 @@ public class TestHLog  {
       assertEquals(3, ((FSHLog) log).getNumLogFiles());
 
       // Flush the first region, we expect to see the first two files getting
-      // archived. We need to append something or writer won't be rolled.
-      addEdits(log, hri2, tableName2, 1);
-      log.startCacheFlush(hri.getEncodedNameAsBytes());
-      log.completeCacheFlush(hri.getEncodedNameAsBytes());
+      // archived
+      long seqId = log.startCacheFlush(hri.getEncodedNameAsBytes());
+      log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, seqId, false);
       log.rollWriter();
       assertEquals(2, ((FSHLog) log).getNumLogFiles());
 
       // Flush the second region, which removes all the remaining output files
       // since the oldest was completely flushed and the two others only contain
       // flush information
-      addEdits(log, hri2, tableName2, 1);
-      log.startCacheFlush(hri2.getEncodedNameAsBytes());
-      log.completeCacheFlush(hri2.getEncodedNameAsBytes());
+      seqId = log.startCacheFlush(hri2.getEncodedNameAsBytes());
+      log.completeCacheFlush(hri2.getEncodedNameAsBytes(), tableName2, seqId, false);
       log.rollWriter();
       assertEquals(0, ((FSHLog) log).getNumLogFiles());
     } finally {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java Mon Jan 21 21:36:25 2013
@@ -117,7 +117,7 @@ public class TestWALActionsListener {
     assertEquals(11, observer.postLogRollCounter);
     assertEquals(5, laterobserver.preLogRollCounter);
     assertEquals(5, laterobserver.postLogRollCounter);
-    assertEquals(1, observer.closedCount);
+    assertEquals(2, observer.closedCount);
   }
 
 

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1436633&r1=1436632&r2=1436633&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Mon Jan 21 21:36:25 2013
@@ -557,8 +557,8 @@ public class TestWALReplay {
     }
 
     // Add a cache flush, shouldn't have any effect
-    wal.startCacheFlush(regionName);
-    wal.completeCacheFlush(regionName);
+    long logSeqId = wal.startCacheFlush(regionName);
+    wal.completeCacheFlush(regionName, tableName, logSeqId, hri.isMetaRegion());
 
     // Add an edit to another family, should be skipped.
     WALEdit edit = new WALEdit();
@@ -661,7 +661,7 @@ public class TestWALReplay {
     wal.doCompleteCacheFlush = true;
     // allow complete cache flush with the previous seq number got after first
     // set of edits.
-    wal.completeCacheFlush(hri.getEncodedNameAsBytes());
+    wal.completeCacheFlush(hri.getEncodedNameAsBytes(), hri.getTableName(), sequenceNumber, false);
     wal.close();
     FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
     HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf,
@@ -686,11 +686,12 @@ public class TestWALReplay {
     }
 
     @Override
-    public void completeCacheFlush(byte[] encodedRegionName) {
+    public void completeCacheFlush(byte[] encodedRegionName, byte[] tableName, long logSeqId,
+        boolean isMetaRegion) throws IOException {
       if (!doCompleteCacheFlush) {
         return;
       }
-      super.completeCacheFlush(encodedRegionName);
+      super.completeCacheFlush(encodedRegionName, tableName, logSeqId, isMetaRegion);
     }
   }