You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/01/21 22:33:37 UTC

svn commit: r1436632 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/util/ hbase-common/src/test/java/org/apache/hadoop/hbase/util/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ hbase-server/src/main/java/org/apach...

Author: tedyu
Date: Mon Jan 21 21:33:37 2013
New Revision: 1436632

URL: http://svn.apache.org/viewvc?rev=1436632&view=rev
Log:
HBASE-7329 remove flush-related records from WAL and make locking more granular (Sergey)


Added:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java
    hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java?rev=1436632&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java Mon Jan 21 21:33:37 2013
@@ -0,0 +1,131 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A simple barrier that can be used by classes that need to wait for some operations to
+ * finish before stopping/closing/etc. forever.
+ */
+public class DrainBarrier {
+  /**
+   * Contains the number of outstanding operations, as well as flags.
+   * Initially, the number of operations is 1. Each beginOp increments, and endOp decrements it.
+   * beginOp does not proceed when it sees the draining flag. When stop is called, it atomically
+   * decrements the number of operations (the initial 1) and sets the draining flag. If stop did
+   * the decrement to zero, that means there are no more operations outstanding, so stop is done.
+   * Otherwise, stop blocks, and the endOp that decrements the count to 0 unblocks it.
+   */
+  private final AtomicLong valueAndFlags = new AtomicLong(inc(0));
+  private final static long DRAINING_FLAG = 0x1;
+  private final static int FLAG_BIT_COUNT = 1;
+
+  /**
+   * Tries to start an operation.
+   * @return false iff the stop is in progress, and the operation cannot be started.
+   */
+  public boolean beginOp() {
+    long oldValAndFlags;
+    do {
+      oldValAndFlags = valueAndFlags.get();
+      if (isDraining(oldValAndFlags)) return false;
+    } while (!valueAndFlags.compareAndSet(oldValAndFlags, inc(oldValAndFlags)));
+    return true;
+  }
+
+  /**
+   * Ends the operation. Unblocks the blocked caller of stop, if necessary.
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
+      justification="First, we do change the state before notify, 2nd, it doesn't even matter")
+  public void endOp() {
+    long oldValAndFlags;
+    do {
+      oldValAndFlags = valueAndFlags.get();
+      long unacceptableCount = isDraining(oldValAndFlags) ? 0 : 1;
+      if (getValue(oldValAndFlags) == unacceptableCount) {
+        throw new AssertionError("endOp called without corresponding beginOp call ("
+          + "the current count is " + unacceptableCount + ")");
+      }
+    } while (!valueAndFlags.compareAndSet(oldValAndFlags, dec(oldValAndFlags)));
+    if (getValue(oldValAndFlags) == 1) {
+      synchronized (this) { this.notifyAll(); }
+    }
+  }
+
+  /**
+   * Blocks new operations from starting, waits for the current ones to drain.
+   * If someone already called it, returns immediately, which is currently unavoidable as
+   * most of the users stop and close things right and left, and hope for the best.
+   * stopAndWaitForOpsOnce asserts instead.
+   * @throws InterruptedException the wait for operations has been interrupted.
+   */
+  public void stopAndDrainOps() throws InterruptedException {
+    stopAndDrainOps(true);
+  }
+
+  /**
+   * Blocks new operations from starting, waits for the current ones to drain.
+   * Can only be called once.
+   * @throws InterruptedException the wait for operations has been interrupted.
+   */
+  public void stopAndDrainOpsOnce() throws InterruptedException {
+    stopAndDrainOps(false);
+  }
+
+  /**
+   * @param ignoreRepeatedCalls If this is true and somebody already called stop, this method
+   *                            will return immediately if true; if this is false and somebody
+   *                            already called stop, it will assert.
+   */
+  // Justification for warnings - wait is not unconditional, and contrary to what WA_NOT_IN_LOOP
+  // description says we are not waiting on multiple conditions.
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings({"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP"})
+  private void stopAndDrainOps(boolean ignoreRepeatedCalls) throws InterruptedException {
+    long oldValAndFlags;
+    do {
+      oldValAndFlags = valueAndFlags.get();
+      if (isDraining(oldValAndFlags)) {
+        if (ignoreRepeatedCalls) return;
+        throw new AssertionError("stopAndWaitForOpsOnce called more than once");
+      }
+    } while (!valueAndFlags.compareAndSet(oldValAndFlags, dec(oldValAndFlags) | DRAINING_FLAG));
+    if (getValue(oldValAndFlags) == 1) return; // There were no operations outstanding.
+    synchronized (this) { this.wait(); }
+  }
+
+  // Helper methods.
+  private static final boolean isDraining(long valueAndFlags) {
+    return (valueAndFlags & DRAINING_FLAG) == DRAINING_FLAG;
+  }
+
+  private static final long getValue(long valueAndFlags) {
+    return valueAndFlags >> FLAG_BIT_COUNT;
+  }
+
+  private static final long inc(long valueAndFlags) {
+    return valueAndFlags + (1 << FLAG_BIT_COUNT); // Not checking for overflow.
+  }
+
+  private static final long dec(long valueAndFlags) {
+    return valueAndFlags - (1 << FLAG_BIT_COUNT); // Negative overflow checked outside.
+  }
+}

Added: hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java?rev=1436632&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java (added)
+++ hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java Mon Jan 21 21:33:37 2013
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.*;
+
+@Category(SmallTests.class)
+public class TestDrainBarrier {
+
+  @Test
+  public void testBeginEndStopWork() throws Exception {
+    DrainBarrier barrier = new DrainBarrier();
+    assertTrue(barrier.beginOp());
+    assertTrue(barrier.beginOp());
+    barrier.endOp();
+    barrier.endOp();
+    barrier.stopAndDrainOps();
+    assertFalse(barrier.beginOp());
+  }
+
+  @Test
+  public void testUnmatchedEndAssert() throws Exception {
+    DrainBarrier barrier = new DrainBarrier();
+    try {
+      barrier.endOp();
+      fail("Should have asserted");
+    } catch (AssertionError e) {
+    }
+
+    barrier.beginOp();
+    barrier.beginOp();
+    barrier.endOp();
+    barrier.endOp();
+    try {
+      barrier.endOp();
+      fail("Should have asserted");
+    } catch (AssertionError e) {
+    }
+  }
+
+  @Test
+  public void testStopWithoutOpsDoesntBlock() throws Exception {
+    DrainBarrier barrier = new DrainBarrier();
+    barrier.stopAndDrainOpsOnce();
+
+    barrier = new DrainBarrier();
+    barrier.beginOp();
+    barrier.endOp();
+    barrier.stopAndDrainOpsOnce();
+  }
+
+  @Test
+  /** This test tests blocking and can have false positives in very bad timing cases. */
+  public void testStopIsBlockedByOps() throws Exception {
+    final DrainBarrier barrier = new DrainBarrier();
+    barrier.beginOp();
+    barrier.beginOp();
+    barrier.beginOp();
+    barrier.endOp();
+
+    Thread stoppingThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          barrier.stopAndDrainOpsOnce();
+        } catch (InterruptedException e) {
+          fail("Should not have happened");
+        }
+      }
+    });
+    stoppingThread.start();
+
+    // First "end" should not unblock the thread, but the second should.
+    barrier.endOp();
+    stoppingThread.join(1000);
+    assertTrue(stoppingThread.isAlive());
+    barrier.endOp();
+    stoppingThread.join(30000); // When not broken, will be a very fast wait; set safe value.
+    assertFalse(stoppingThread.isAlive());
+  }
+
+  @Test
+  public void testMultipleStopOnceAssert() throws Exception {
+    DrainBarrier barrier = new DrainBarrier();
+    barrier.stopAndDrainOpsOnce();
+    try {
+      barrier.stopAndDrainOpsOnce();
+      fail("Should have asserted");
+    } catch (AssertionError e) {
+    }
+  }
+
+  @Test
+  public void testMultipleSloppyStopsHaveNoEffect() throws Exception {
+    DrainBarrier barrier = new DrainBarrier();
+    barrier.stopAndDrainOps();
+    barrier.stopAndDrainOps();
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java?rev=1436632&r1=1436631&r2=1436632&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java Mon Jan 21 21:33:37 2013
@@ -126,7 +126,7 @@ public class WALPlayer extends Configure
           Delete del = null;
           KeyValue lastKV = null;
           for (KeyValue kv : value.getKeyValues()) {
-            // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit
+            // filtering HLog meta entries
             if (HLogUtil.isMetaFamily(kv.getFamily())) continue;
 
             // A WALEdit may contain multiple operations (HBASE-3584) and/or

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1436632&r1=1436631&r2=1436632&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Jan 21 21:33:37 2013
@@ -1553,17 +1553,26 @@ public class HRegion implements HeapSize
     long flushsize = this.memstoreSize.get();
     status.setStatus("Preparing to flush by snapshotting stores");
     List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
-    long completeSeqId = -1L;
+    long flushSeqId = -1L;
     try {
       // Record the mvcc for all transactions in progress.
       w = mvcc.beginMemstoreInsert();
       mvcc.advanceMemstore(w);
 
-      sequenceId = (wal == null)? myseqid:
-        wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
-      completeSeqId = this.getCompleteCacheFlushSequenceId(sequenceId);
+      if (wal != null) {
+        Long startSeqId = wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
+        if (startSeqId == null) {
+          status.setStatus("Flush will not be started for [" + this.regionInfo.getEncodedName()
+              + "] - WAL is going away");
+          return false;
+        }
+        flushSeqId = startSeqId.longValue();
+      } else {
+        flushSeqId = myseqid;
+      }
+
       for (Store s : stores.values()) {
-        storeFlushers.add(s.getStoreFlusher(completeSeqId));
+        storeFlushers.add(s.getStoreFlusher(flushSeqId));
       }
 
       // prepare flush (take a snapshot)
@@ -1632,22 +1641,14 @@ public class HRegion implements HeapSize
       throw dse;
     }
 
-    // If we get to here, the HStores have been written. If we get an
-    // error in completeCacheFlush it will release the lock it is holding
-
-    // B.  Write a FLUSHCACHE-COMPLETE message to the log.
-    //     This tells future readers that the HStores were emitted correctly,
-    //     and that all updates to the log for this regionName that have lower
-    //     log-sequence-ids can be safely ignored.
+    // If we get to here, the HStores have been written.
     if (wal != null) {
-      wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(),
-        regionInfo.getTableName(), completeSeqId,
-        this.getRegionInfo().isMetaRegion());
+      wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes());
     }
 
     // Update the last flushed sequence id for region
     if (this.rsServices != null) {
-      completeSequenceId = completeSeqId;
+      completeSequenceId = flushSeqId;
     }
 
     // C. Finally notify anyone waiting on memstore to clear:
@@ -1672,18 +1673,6 @@ public class HRegion implements HeapSize
     return compactionRequested;
   }
 
-   /**
-   * Get the sequence number to be associated with this cache flush. Used by
-   * TransactionalRegion to not complete pending transactions.
-   *
-   *
-   * @param currentSequenceId
-   * @return sequence id to complete the cache flush with
-   */
-  protected long getCompleteCacheFlushSequenceId(long currentSequenceId) {
-    return currentSequenceId;
-  }
-
   //////////////////////////////////////////////////////////////////////////////
   // get() methods for client use.
   //////////////////////////////////////////////////////////////////////////////

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1436632&r1=1436631&r2=1436632&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Mon Jan 21 21:33:37 2013
@@ -24,8 +24,10 @@ import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URLEncoder;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -57,6 +59,7 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.DrainBarrier;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HasThread;
@@ -132,23 +135,45 @@ class FSHLog implements HLog, Syncable {
   private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
   final static Object [] NO_ARGS = new Object []{};
 
-  /*
+  /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */
+  private DrainBarrier closeBarrier = new DrainBarrier();
+
+  /**
    * Current log file.
    */
   Writer writer;
 
-  /*
+  /**
    * Map of all log files but the current one.
    */
   final SortedMap<Long, Path> outputfiles =
     Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
 
-  /*
-   * Map of encoded region names to their most recent sequence/edit id in their
-   * memstore.
+
+  /**
+   * This lock synchronizes all operations on oldestUnflushedSeqNums and oldestFlushingSeqNums,
+   * with the exception of append's putIfAbsent into oldestUnflushedSeqNums.
+   * We only use these to find out the low bound seqNum, or to find regions with old seqNums to
+   * force flush them, so we don't care about these numbers messing with anything. */
+  private final Object oldestSeqNumsLock = new Object();
+
+  /**
+   * This lock makes sure only one log roll runs at the same time. Should not be taken while
+   * any other lock is held. We don't just use synchronized because that results in bogus and
+   * tedious findbugs warning when it thinks synchronized controls writer thread safety */
+  private final Object rollWriterLock = new Object();
+
+  /**
+   * Map of encoded region names to their most recent sequence/edit id in their memstore.
    */
-  private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
+  private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedSeqNums =
     new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+  /**
+   * Map of encoded region names to their most recent sequence/edit id in their memstore;
+   * contains the regions that are currently flushing. That way we can store two numbers for
+   * flushing and non-flushing (oldestUnflushedSeqNums) memstore for the same region.
+   */
+  private final Map<byte[], Long> oldestFlushingSeqNums = new HashMap<byte[], Long>();
 
   private volatile boolean closed = false;
 
@@ -178,10 +203,6 @@ class FSHLog implements HLog, Syncable {
   // of the default Hdfs block size.
   private final long logrollsize;
 
-  // This lock prevents starting a log roll during a cache flush.
-  // synchronized is insufficient because a cache flush spans two method calls.
-  private final Lock cacheFlushLock = new ReentrantLock();
-
   // We synchronize on updateLock to prevent updates and to prevent a log roll
   // during an update
   // locked during appends
@@ -472,88 +493,77 @@ class FSHLog implements HLog, Syncable {
   @Override
   public byte [][] rollWriter(boolean force)
       throws FailedLogCloseException, IOException {
-    // Return if nothing to flush.
-    if (!force && this.writer != null && this.numEntries.get() <= 0) {
-      return null;
-    }
-    byte [][] regionsToFlush = null;
-    this.cacheFlushLock.lock();
-    try {
-      this.logRollRunning = true;
-      if (closed) {
-        LOG.debug("HLog closed.  Skipping rolling of writer");
-        return regionsToFlush;
-      }
-      // Do all the preparation outside of the updateLock to block
-      // as less as possible the incoming writes
-      long currentFilenum = this.filenum;
-      Path oldPath = null;
-      if (currentFilenum > 0) {
-        //computeFilename  will take care of meta hlog filename
-        oldPath = computeFilename(currentFilenum);
+    synchronized (rollWriterLock) {
+      // Return if nothing to flush.
+      if (!force && this.writer != null && this.numEntries.get() <= 0) {
+        return null;
       }
-      this.filenum = System.currentTimeMillis();
-      Path newPath = computeFilename();
-
-      // Tell our listeners that a new log is about to be created
-      if (!this.listeners.isEmpty()) {
-        for (WALActionsListener i : this.listeners) {
-          i.preLogRoll(oldPath, newPath);
+      byte [][] regionsToFlush = null;
+      try {
+        this.logRollRunning = true;
+        boolean isClosed = closed;
+        if (isClosed || !closeBarrier.beginOp()) {
+          LOG.debug("HLog " + (isClosed ? "closed" : "closing") + ". Skipping rolling of writer");
+          return regionsToFlush;
+        }
+        // Do all the preparation outside of the updateLock to block
+        // as less as possible the incoming writes
+        long currentFilenum = this.filenum;
+        Path oldPath = null;
+        if (currentFilenum > 0) {
+          //computeFilename  will take care of meta hlog filename
+          oldPath = computeFilename(currentFilenum);
+        }
+        this.filenum = System.currentTimeMillis();
+        Path newPath = computeFilename();
+
+        // Tell our listeners that a new log is about to be created
+        if (!this.listeners.isEmpty()) {
+          for (WALActionsListener i : this.listeners) {
+            i.preLogRoll(oldPath, newPath);
+          }
         }
-      }
-      FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
-      // Can we get at the dfsclient outputstream?  If an instance of
-      // SFLW, it'll have done the necessary reflection to get at the
-      // protected field name.
-      FSDataOutputStream nextHdfsOut = null;
-      if (nextWriter instanceof SequenceFileLogWriter) {
-        nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
-      }
-
-      synchronized (updateLock) {
-        // Clean up current writer.
-        Path oldFile = cleanupCurrentWriter(currentFilenum);
-        this.writer = nextWriter;
-        this.hdfs_out = nextHdfsOut;
-
-        LOG.info((oldFile != null?
-            "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
-            this.numEntries.get() +
-            ", filesize=" +
-            this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
-          " for " + FSUtils.getPath(newPath));
-        this.numEntries.set(0);
-      }
-      // Tell our listeners that a new log was created
-      if (!this.listeners.isEmpty()) {
-        for (WALActionsListener i : this.listeners) {
-          i.postLogRoll(oldPath, newPath);
+        FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
+        // Can we get at the dfsclient outputstream?  If an instance of
+        // SFLW, it'll have done the necessary reflection to get at the
+        // protected field name.
+        FSDataOutputStream nextHdfsOut = null;
+        if (nextWriter instanceof SequenceFileLogWriter) {
+          nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
+        }
+
+        Path oldFile = null;
+        int oldNumEntries = 0;
+        synchronized (updateLock) {
+          // Clean up current writer.
+          oldNumEntries = this.numEntries.get();
+          oldFile = cleanupCurrentWriter(currentFilenum);
+          this.writer = nextWriter;
+          this.hdfs_out = nextHdfsOut;
+          this.numEntries.set(0);
+        }
+        LOG.info("Rolled log" + (oldFile != null ? " for file=" + FSUtils.getPath(oldFile)
+          + ", entries=" + oldNumEntries + ", filesize=" + this.fs.getFileStatus(oldFile).getLen()
+          : "" ) + "; new path=" + FSUtils.getPath(newPath));
+
+        // Tell our listeners that a new log was created
+        if (!this.listeners.isEmpty()) {
+          for (WALActionsListener i : this.listeners) {
+            i.postLogRoll(oldPath, newPath);
+          }
         }
-      }
 
-      // Can we delete any of the old log files?
-      if (this.outputfiles.size() > 0) {
-        if (this.lastSeqWritten.isEmpty()) {
-          LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
-          // If so, then no new writes have come in since all regions were
-          // flushed (and removed from the lastSeqWritten map). Means can
-          // remove all but currently open log file.
-          for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
-            archiveLogFile(e.getValue(), e.getKey());
-          }
-          this.outputfiles.clear();
-        } else {
-          regionsToFlush = cleanOldLogs();
+        // Can we delete any of the old log files?
+        if (getNumLogFiles() > 0) {
+          cleanOldLogs();
+          regionsToFlush = getRegionsToForceFlush();
         }
-      }
-    } finally {
-      try {
-        this.logRollRunning = false;
       } finally {
-        this.cacheFlushLock.unlock();
+        this.logRollRunning = false;
+        closeBarrier.endOp();
       }
+      return regionsToFlush;
     }
-    return regionsToFlush;
   }
 
   /**
@@ -581,36 +591,64 @@ class FSHLog implements HLog, Syncable {
    * encoded region names to flush.
    * @throws IOException
    */
-  private byte [][] cleanOldLogs() throws IOException {
-    Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
+  private void cleanOldLogs() throws IOException {
+    long oldestOutstandingSeqNum = Long.MAX_VALUE;
+    synchronized (oldestSeqNumsLock) {
+      Long oldestFlushing = (oldestFlushingSeqNums.size() > 0)
+        ? Collections.min(oldestFlushingSeqNums.values()) : Long.MAX_VALUE;
+      Long oldestUnflushed = (oldestUnflushedSeqNums.size() > 0)
+        ? Collections.min(oldestUnflushedSeqNums.values()) : Long.MAX_VALUE;
+      oldestOutstandingSeqNum = Math.min(oldestFlushing, oldestUnflushed);
+    }
+
     // Get the set of all log files whose last sequence number is smaller than
     // the oldest edit's sequence number.
     TreeSet<Long> sequenceNumbers = new TreeSet<Long>(this.outputfiles.headMap(
         oldestOutstandingSeqNum).keySet());
     // Now remove old log files (if any)
-    int logsToRemove = sequenceNumbers.size();
-    if (logsToRemove > 0) {
-      if (LOG.isDebugEnabled()) {
-        // Find associated region; helps debugging.
-        byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
-        LOG.debug("Found " + logsToRemove + " hlogs to remove" +
+    if (LOG.isDebugEnabled()) {
+      if (sequenceNumbers.size() > 0) {
+        LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove" +
           " out of total " + this.outputfiles.size() + ";" +
-          " oldest outstanding sequenceid is " + oldestOutstandingSeqNum +
-          " from region " + Bytes.toStringBinary(oldestRegion));
+          " oldest outstanding sequenceid is " + oldestOutstandingSeqNum);
       }
-      for (Long seq : sequenceNumbers) {
-        archiveLogFile(this.outputfiles.remove(seq), seq);
+    }
+    for (Long seq : sequenceNumbers) {
+      archiveLogFile(this.outputfiles.remove(seq), seq);
+    }
+  }
+
+  /**
+   * Return regions that have edits that are equal or less than a certain sequence number.
+   * Static due to some old unit test.
+   * @param walSeqNum The sequence number to compare with.
+   * @param regionsToSeqNums Encoded region names to sequence ids
+   * @return All regions whose seqNum <= walSeqNum. Null if no regions found.
+   */
+  static byte[][] findMemstoresWithEditsEqualOrOlderThan(
+      final long walSeqNum, final Map<byte[], Long> regionsToSeqNums) {
+    List<byte[]> regions = null;
+    for (Map.Entry<byte[], Long> e : regionsToSeqNums.entrySet()) {
+      if (e.getValue().longValue() <= walSeqNum) {
+        if (regions == null) regions = new ArrayList<byte[]>();
+        regions.add(e.getKey());
       }
     }
+    return regions == null ? null : regions
+        .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
+  }
 
+  private byte[][] getRegionsToForceFlush() throws IOException {
     // If too many log files, figure which regions we need to flush.
     // Array is an array of encoded region names.
     byte [][] regions = null;
-    int logCount = this.outputfiles.size();
+    int logCount = getNumLogFiles();
     if (logCount > this.maxLogs && logCount > 0) {
       // This is an array of encoded region names.
-      regions = HLogUtil.findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
-        this.lastSeqWritten);
+      synchronized (oldestSeqNumsLock) {
+        regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
+          this.oldestUnflushedSeqNums);
+      }
       if (regions != null) {
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < regions.length; i++) {
@@ -626,29 +664,6 @@ class FSHLog implements HLog, Syncable {
   }
 
   /*
-   * @return Logs older than this id are safe to remove.
-   */
-  private Long getOldestOutstandingSeqNum() {
-    return Collections.min(this.lastSeqWritten.values());
-  }
-
-  /**
-   * @param oldestOutstandingSeqNum
-   * @return (Encoded) name of oldest outstanding region.
-   */
-  private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
-    byte [] oldestRegion = null;
-    for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
-      if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
-        // Key is encoded region name.
-        oldestRegion = e.getKey();
-        break;
-      }
-    }
-    return oldestRegion;
-  }
-
-  /*
    * Cleans up current writer closing and adding to outputfiles.
    * Presumes we're operating inside an updateLock scope.
    * @return Path to current writer or null if none.
@@ -780,33 +795,39 @@ class FSHLog implements HLog, Syncable {
 
   @Override
   public void close() throws IOException {
+    if (this.closed) {
+      return;
+    }
     try {
       logSyncerThread.close();
       // Make sure we synced everything
       logSyncerThread.join(this.optionalFlushInterval*2);
     } catch (InterruptedException e) {
       LOG.error("Exception while waiting for syncer thread to die", e);
+      Thread.currentThread().interrupt();
     }
-
-    cacheFlushLock.lock();
     try {
-      // Tell our listeners that the log is closing
-      if (!this.listeners.isEmpty()) {
-        for (WALActionsListener i : this.listeners) {
-          i.logCloseRequested();
-        }
+      // Prevent all further flushing and rolling.
+      closeBarrier.stopAndDrainOps();
+    } catch (InterruptedException e) {
+      LOG.error("Exception while waiting for cache flushes and log rolls", e);
+      Thread.currentThread().interrupt();
+    }
+
+    // Tell our listeners that the log is closing
+    if (!this.listeners.isEmpty()) {
+      for (WALActionsListener i : this.listeners) {
+        i.logCloseRequested();
       }
-      synchronized (updateLock) {
-        this.closed = true;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("closing hlog writer in " + this.dir.toString());
-        }
-        if (this.writer != null) {
-          this.writer.close();
-        }
+    }
+    synchronized (updateLock) {
+      this.closed = true;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("closing hlog writer in " + this.dir.toString());
+      }
+      if (this.writer != null) {
+        this.writer.close();
       }
-    } finally {
-      cacheFlushLock.unlock();
     }
   }
 
@@ -838,7 +859,7 @@ class FSHLog implements HLog, Syncable {
       // memstore). When the cache is flushed, the entry for the
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
-      this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
+      this.oldestUnflushedSeqNums.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
         Long.valueOf(seqNum));
       doWrite(regionInfo, logKey, logEdit, htd);
       txid = this.unflushedEntries.incrementAndGet();
@@ -910,7 +931,7 @@ class FSHLog implements HLog, Syncable {
         // Use encoded name.  Its shorter, guaranteed unique and a subset of
         // actual  name.
         byte [] encodedRegionName = info.getEncodedNameAsBytes();
-        this.lastSeqWritten.putIfAbsent(encodedRegionName, seqNum);
+        this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
         HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
         doWrite(info, logKey, edits, htd);
         this.numEntries.incrementAndGet();
@@ -1042,7 +1063,11 @@ class FSHLog implements HLog, Syncable {
     Writer tempWriter;
     synchronized (this.updateLock) {
       if (this.closed) return;
-      tempWriter = this.writer; // guaranteed non-null
+      // Guaranteed non-null.
+      // Note that parallel sync can close tempWriter.
+      // The current method of dealing with this is to catch exceptions.
+      // See HBASE-4387, HBASE-5623, HBASE-7329.
+      tempWriter = this.writer;
     }
     // if the transaction that we are interested in is already 
     // synced, then return immediately.
@@ -1078,9 +1103,11 @@ class FSHLog implements HLog, Syncable {
       }
       try {
         tempWriter.sync();
-      } catch(IOException io) {
+      } catch(IOException ex) {
         synchronized (this.updateLock) {
           // HBASE-4387, HBASE-5623, retry with updateLock held
+          // TODO: we don't actually need to do it for concurrent close - what is the point
+          //       of syncing new unrelated writer? Keep behavior for now.
           tempWriter = this.writer;
           tempWriter.sync();
         }
@@ -1088,6 +1115,9 @@ class FSHLog implements HLog, Syncable {
       this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
 
       this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
+      // TODO: preserving the old behavior for now, but this check is strange. It's not
+      //       protected by any locks here, so for all we know rolling locks might start
+      //       as soon as we enter the "if". Is this best-effort optimization check?
       if (!this.logRollRunning) {
         checkLowReplication();
         try {
@@ -1250,107 +1280,61 @@ class FSHLog implements HLog, Syncable {
     return outputfiles.size();
   }
 
-  private byte[] getSnapshotName(byte[] encodedRegionName) {
-    byte snp[] = new byte[encodedRegionName.length + 3];
-    // an encoded region name has only hex digits. s, n or p are not hex
-    // and therefore snapshot-names will never collide with
-    // encoded-region-names
-    snp[0] = 's'; snp[1] = 'n'; snp[2] = 'p';
-    for (int i = 0; i < encodedRegionName.length; i++) {
-      snp[i+3] = encodedRegionName[i];
-    }
-    return snp;
-  }
-
   @Override
-  public long startCacheFlush(final byte[] encodedRegionName) {
-    this.cacheFlushLock.lock();
-    Long seq = this.lastSeqWritten.remove(encodedRegionName);
-    // seq is the lsn of the oldest edit associated with this region. If a
-    // snapshot already exists - because the last flush failed - then seq will
-    // be the lsn of the oldest edit in the snapshot
-    if (seq != null) {
-      // keeping the earliest sequence number of the snapshot in
-      // lastSeqWritten maintains the correctness of
-      // getOldestOutstandingSeqNum(). But it doesn't matter really because
-      // everything is being done inside of cacheFlush lock.
-      Long oldseq =
-        lastSeqWritten.put(getSnapshotName(encodedRegionName), seq);
-      if (oldseq != null) {
-        LOG.error("Logic Error Snapshot seq id from earlier flush still" +
-            " present! for region " + Bytes.toString(encodedRegionName) +
-            " overwritten oldseq=" + oldseq + "with new seq=" + seq);
-        Runtime.getRuntime().halt(1);
-      }
+  public Long startCacheFlush(final byte[] encodedRegionName) {
+    Long oldRegionSeqNum = null;
+    if (!closeBarrier.beginOp()) {
+      return null;
+    }
+    synchronized (oldestSeqNumsLock) {
+      oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName);
+      if (oldRegionSeqNum != null) {
+        Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum);
+        assert oldValue == null : "Flushing map not cleaned up for "
+          + Bytes.toString(encodedRegionName);
+      }
+    }
+    if (oldRegionSeqNum == null) {
+      // TODO: if we have no oldRegionSeqNum, and WAL is not disabled, presumably either
+      //       the region is already flushing (which would make this call invalid), or there
+      //       were no appends after last flush, so why are we starting flush? Maybe we should
+      //       assert not null, and switch to "long" everywhere. Less rigorous, but safer,
+      //       alternative is telling the caller to stop. For now preserve old logic.
+      LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
+        + Bytes.toString(encodedRegionName) + "]");
     }
     return obtainSeqNum();
   }
 
   @Override
-  public void completeCacheFlush(final byte [] encodedRegionName,
-      final byte [] tableName, final long logSeqId, final boolean isMetaRegion)
-  throws IOException {
-    try {
-      if (this.closed) {
-        return;
-      }
-      long txid = 0;
-      synchronized (updateLock) {
-        long now = EnvironmentEdgeManager.currentTimeMillis();
-        WALEdit edit = completeCacheFlushLogEdit();
-        HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
-            System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
-        logSyncerThread.append(new Entry(key, edit));
-        txid = this.unflushedEntries.incrementAndGet();
-        long took = EnvironmentEdgeManager.currentTimeMillis() - now;
-        long len = 0;
-        for (KeyValue kv : edit.getKeyValues()) {
-          len += kv.getLength();
-        }
-        this.metrics.finishAppend(took, len);
-        this.numEntries.incrementAndGet();
-      }
-      // sync txn to file system
-      this.sync(txid);
-
-    } finally {
-      // updateLock not needed for removing snapshot's entry
-      // Cleaning up of lastSeqWritten is in the finally clause because we
-      // don't want to confuse getOldestOutstandingSeqNum()
-      this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
-      this.cacheFlushLock.unlock();
+  public void completeCacheFlush(final byte [] encodedRegionName)
+  {
+    synchronized (oldestSeqNumsLock) {
+      this.oldestFlushingSeqNums.remove(encodedRegionName);
     }
-  }
-
-  private WALEdit completeCacheFlushLogEdit() {
-    KeyValue kv = new KeyValue(HLog.METAROW, HLog.METAFAMILY, null,
-      System.currentTimeMillis(), HLogUtil.COMPLETE_CACHE_FLUSH);
-    WALEdit e = new WALEdit();
-    e.add(kv);
-    return e;
+    closeBarrier.endOp();
   }
 
   @Override
   public void abortCacheFlush(byte[] encodedRegionName) {
-    Long snapshot_seq =
-      this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
-    if (snapshot_seq != null) {
-      // updateLock not necessary because we are racing against
-      // lastSeqWritten.putIfAbsent() in append() and we will always win
-      // before releasing cacheFlushLock make sure that the region's entry in
-      // lastSeqWritten points to the earliest edit in the region
-      Long current_memstore_earliest_seq =
-        this.lastSeqWritten.put(encodedRegionName, snapshot_seq);
-      if (current_memstore_earliest_seq != null &&
-          (current_memstore_earliest_seq.longValue() <=
-            snapshot_seq.longValue())) {
-        LOG.error("Logic Error region " + Bytes.toString(encodedRegionName) +
-            "acquired edits out of order current memstore seq=" +
-            current_memstore_earliest_seq + " snapshot seq=" + snapshot_seq);
-        Runtime.getRuntime().halt(1);
-      }
+    Long currentSeqNum = null, seqNumBeforeFlushStarts = null;
+    synchronized (oldestSeqNumsLock) {
+      seqNumBeforeFlushStarts = this.oldestFlushingSeqNums.remove(encodedRegionName);
+      if (seqNumBeforeFlushStarts != null) {
+        currentSeqNum =
+          this.oldestUnflushedSeqNums.put(encodedRegionName, seqNumBeforeFlushStarts);
+      }
+    }
+    closeBarrier.endOp();
+    if ((currentSeqNum != null)
+        && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) {
+      String errorStr = "Region " + Bytes.toString(encodedRegionName) +
+          "acquired edits out of order current memstore seq=" + currentSeqNum
+          + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts;
+      LOG.error(errorStr);
+      assert false : errorStr;
+      Runtime.getRuntime().halt(1);
     }
-    this.cacheFlushLock.unlock();
   }
 
   @Override

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1436632&r1=1436631&r2=1436632&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Mon Jan 21 21:33:37 2013
@@ -162,14 +162,14 @@ public interface HLog {
     }
   }
 
-  /*
+  /**
    * registers WALActionsListener
    * 
    * @param listener
    */
   public void registerWALActionsListener(final WALActionsListener listener);
 
-  /*
+  /**
    * unregisters WALActionsListener
    * 
    * @param listener
@@ -200,18 +200,10 @@ public interface HLog {
   /**
    * Roll the log writer. That is, start writing log messages to a new file.
    * 
-   * Because a log cannot be rolled during a cache flush, and a cache flush
-   * spans two method calls, a special lock needs to be obtained so that a cache
-   * flush cannot start when the log is being rolled and the log cannot be
-   * rolled during a cache flush.
-   * 
    * <p>
-   * Note that this method cannot be synchronized because it is possible that
-   * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
-   * start which would obtain the lock on this but block on obtaining the
-   * cacheFlushLock and then completeCacheFlush could be called which would wait
-   * for the lock on this and consequently never release the cacheFlushLock
-   * 
+   * The implementation is synchronized in order to make sure there's one rollWriter
+   * running at any given time.
+   *
    * @return If lots of logs, flush the returned regions so next time through we
    *         can clean logs. Returns null if nothing to flush. Names are actual
    *         region names as returned by {@link HRegionInfo#getEncodedName()}
@@ -223,17 +215,9 @@ public interface HLog {
   /**
    * Roll the log writer. That is, start writing log messages to a new file.
    * 
-   * Because a log cannot be rolled during a cache flush, and a cache flush
-   * spans two method calls, a special lock needs to be obtained so that a cache
-   * flush cannot start when the log is being rolled and the log cannot be
-   * rolled during a cache flush.
-   * 
    * <p>
-   * Note that this method cannot be synchronized because it is possible that
-   * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
-   * start which would obtain the lock on this but block on obtaining the
-   * cacheFlushLock and then completeCacheFlush could be called which would wait
-   * for the lock on this and consequently never release the cacheFlushLock
+   * The implementation is synchronized in order to make sure there's one rollWriter
+   * running at any given time.
    * 
    * @param force
    *          If true, force creation of a new writer even if no entries have
@@ -337,53 +321,33 @@ public interface HLog {
   public long obtainSeqNum();
 
   /**
-   * By acquiring a log sequence ID, we can allow log messages to continue while
-   * we flush the cache.
-   * 
-   * Acquire a lock so that we do not roll the log between the start and
-   * completion of a cache-flush. Otherwise the log-seq-id for the flush will
-   * not appear in the correct logfile.
-   * 
-   * Ensuring that flushes and log-rolls don't happen concurrently also allows
-   * us to temporarily put a log-seq-number in lastSeqWritten against the region
-   * being flushed that might not be the earliest in-memory log-seq-number for
-   * that region. By the time the flush is completed or aborted and before the
-   * cacheFlushLock is released it is ensured that lastSeqWritten again has the
-   * oldest in-memory edit's lsn for the region that was being flushed.
-   * 
-   * In this method, by removing the entry in lastSeqWritten for the region
-   * being flushed we ensure that the next edit inserted in this region will be
-   * correctly recorded in
-   * {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)} The
-   * lsn of the earliest in-memory lsn - which is now in the memstore snapshot -
-   * is saved temporarily in the lastSeqWritten map while the flush is active.
-   * 
-   * @return sequence ID to pass
-   *         {@link #completeCacheFlush(byte[], byte[], long, boolean)} (byte[],
-   *         byte[], long)}
-   * @see #completeCacheFlush(byte[], byte[], long, boolean)
-   * @see #abortCacheFlush(byte[])
+   * WAL keeps track of the sequence numbers that were not yet flushed from memstores
+   * in order to be able to do cleanup. This method tells WAL that some region is about
+   * to flush memstore.
+   *
+   * We stash the oldest seqNum for the region, and let the the next edit inserted in this
+   * region be recorded in {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)}
+   * as new oldest seqnum. In case of flush being aborted, we put the stashed value back;
+   * in case of flush succeeding, the seqNum of that first edit after start becomes the
+   * valid oldest seqNum for this region.
+   *
+   * @return current seqNum, to pass on to flushers (who will put it into the metadata of
+   *         the resulting file as an upper-bound seqNum for that file), or NULL if flush
+   *         should not be started.
    */
-  public long startCacheFlush(final byte[] encodedRegionName);
+  public Long startCacheFlush(final byte[] encodedRegionName);
 
   /**
-   * Complete the cache flush
-   * 
-   * Protected by cacheFlushLock
-   * 
-   * @param encodedRegionName
-   * @param tableName
-   * @param logSeqId
-   * @throws IOException
+   * Complete the cache flush.
+   * @param encodedRegionName Encoded region name.
    */
-  public void completeCacheFlush(final byte[] encodedRegionName,
-      final byte[] tableName, final long logSeqId, final boolean isMetaRegion)
-      throws IOException;
+  public void completeCacheFlush(final byte[] encodedRegionName);
 
   /**
    * Abort a cache flush. Call if the flush fails. Note that the only recovery
    * for an aborted flush currently is a restart of the regionserver so the
-   * snapshot content dropped by the failure gets restored to the memstore.
+   * snapshot content dropped by the failure gets restored to the memstore.v
+   * @param encodedRegionName Encoded region name.
    */
   public void abortCacheFlush(byte[] encodedRegionName);
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java?rev=1436632&r1=1436631&r2=1436632&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java Mon Jan 21 21:33:37 2013
@@ -46,8 +46,6 @@ import org.apache.hadoop.hbase.util.Byte
 public class HLogUtil {
   static final Log LOG = LogFactory.getLog(HLogUtil.class);
 
-  static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH");
-
   /**
    * @param family
    * @return true if the column is a meta column
@@ -244,32 +242,6 @@ public class HLogUtil {
   }
 
   /**
-   * Return regions (memstores) that have edits that are equal or less than the
-   * passed <code>oldestWALseqid</code>.
-   * 
-   * @param oldestWALseqid
-   * @param regionsToSeqids
-   *          Encoded region names to sequence ids
-   * @return All regions whose seqid is < than <code>oldestWALseqid</code> (Not
-   *         necessarily in order). Null if no regions found.
-   */
-  static byte[][] findMemstoresWithEditsEqualOrOlderThan(
-      final long oldestWALseqid, final Map<byte[], Long> regionsToSeqids) {
-    // This method is static so it can be unit tested the easier.
-    List<byte[]> regions = null;
-    for (Map.Entry<byte[], Long> e : regionsToSeqids.entrySet()) {
-      if (e.getValue().longValue() <= oldestWALseqid) {
-        if (regions == null)
-          regions = new ArrayList<byte[]>();
-        // Key is encoded region name.
-        regions.add(e.getKey());
-      }
-    }
-    return regions == null ? null : regions
-        .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
-  }
-
-  /**
    * Returns sorted set of edit files made by wal-log splitter, excluding files
    * with '.temp' suffix.
    * 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=1436632&r1=1436631&r2=1436632&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java Mon Jan 21 21:33:37 2013
@@ -244,7 +244,12 @@ public class SequenceFileLogWriter imple
 
   @Override
   public void sync() throws IOException {
-    this.writer.syncFs();
+    try {
+      this.writer.syncFs();
+    } catch (NullPointerException npe) {
+      // Concurrent close...
+      throw new IOException(npe);
+    }
   }
 
   @Override

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1436632&r1=1436631&r2=1436632&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Mon Jan 21 21:33:37 2013
@@ -323,11 +323,11 @@ public class TestHLog  {
       regionsToSeqids.put(l.toString().getBytes(), l);
     }
     byte [][] regions =
-      HLogUtil.findMemstoresWithEditsEqualOrOlderThan(1, regionsToSeqids);
+      FSHLog.findMemstoresWithEditsEqualOrOlderThan(1, regionsToSeqids);
     assertEquals(2, regions.length);
     assertTrue(Bytes.equals(regions[0], "0".getBytes()) ||
         Bytes.equals(regions[0], "1".getBytes()));
-    regions = HLogUtil.findMemstoresWithEditsEqualOrOlderThan(3, regionsToSeqids);
+    regions = FSHLog.findMemstoresWithEditsEqualOrOlderThan(3, regionsToSeqids);
     int count = 4;
     assertEquals(count, regions.length);
     // Regions returned are not ordered.
@@ -518,9 +518,8 @@ public class TestHLog  {
       htd.addFamily(new HColumnDescriptor("column"));
 
       log.append(info, tableName, cols, System.currentTimeMillis(), htd);
-      long logSeqId = log.startCacheFlush(info.getEncodedNameAsBytes());
-      log.completeCacheFlush(info.getEncodedNameAsBytes(), tableName, logSeqId,
-          info.isMetaRegion());
+      log.startCacheFlush(info.getEncodedNameAsBytes());
+      log.completeCacheFlush(info.getEncodedNameAsBytes());
       log.close();
       Path filename = ((FSHLog) log).computeFilename();
       log = null;
@@ -540,20 +539,6 @@ public class TestHLog  {
         assertEquals((byte)(i + '0'), kv.getValue()[0]);
         System.out.println(key + " " + val);
       }
-      HLog.Entry entry = null;
-      while ((entry = reader.next(null)) != null) {
-        HLogKey key = entry.getKey();
-        WALEdit val = entry.getEdit();
-        // Assert only one more row... the meta flushed row.
-        assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
-        assertTrue(Bytes.equals(tableName, key.getTablename()));
-        KeyValue kv = val.getKeyValues().get(0);
-        assertTrue(Bytes.equals(HLog.METAROW, kv.getRow()));
-        assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily()));
-        assertEquals(0, Bytes.compareTo(HLogUtil.COMPLETE_CACHE_FLUSH,
-          val.getKeyValues().get(0).getValue()));
-        System.out.println(key + " " + val);
-      }
     } finally {
       if (log != null) {
         log.closeAndDelete();
@@ -589,8 +574,8 @@ public class TestHLog  {
       HTableDescriptor htd = new HTableDescriptor();
       htd.addFamily(new HColumnDescriptor("column"));
       log.append(hri, tableName, cols, System.currentTimeMillis(), htd);
-      long logSeqId = log.startCacheFlush(hri.getEncodedNameAsBytes());
-      log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, logSeqId, false);
+      log.startCacheFlush(hri.getEncodedNameAsBytes());
+      log.completeCacheFlush(hri.getEncodedNameAsBytes());
       log.close();
       Path filename = ((FSHLog) log).computeFilename();
       log = null;
@@ -608,20 +593,6 @@ public class TestHLog  {
         System.out.println(entry.getKey() + " " + val);
         idx++;
       }
-
-      // Get next row... the meta flushed row.
-      entry = reader.next();
-      assertEquals(1, entry.getEdit().size());
-      for (KeyValue val : entry.getEdit().getKeyValues()) {
-        assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
-          entry.getKey().getEncodedRegionName()));
-        assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
-        assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
-        assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
-        assertEquals(0, Bytes.compareTo(HLogUtil.COMPLETE_CACHE_FLUSH,
-          val.getValue()));
-        System.out.println(entry.getKey() + " " + val);
-      }
     } finally {
       if (log != null) {
         log.closeAndDelete();
@@ -705,17 +676,19 @@ public class TestHLog  {
       assertEquals(3, ((FSHLog) log).getNumLogFiles());
 
       // Flush the first region, we expect to see the first two files getting
-      // archived
-      long seqId = log.startCacheFlush(hri.getEncodedNameAsBytes());
-      log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, seqId, false);
+      // archived. We need to append something or writer won't be rolled.
+      addEdits(log, hri2, tableName2, 1);
+      log.startCacheFlush(hri.getEncodedNameAsBytes());
+      log.completeCacheFlush(hri.getEncodedNameAsBytes());
       log.rollWriter();
       assertEquals(2, ((FSHLog) log).getNumLogFiles());
 
       // Flush the second region, which removes all the remaining output files
       // since the oldest was completely flushed and the two others only contain
       // flush information
-      seqId = log.startCacheFlush(hri2.getEncodedNameAsBytes());
-      log.completeCacheFlush(hri2.getEncodedNameAsBytes(), tableName2, seqId, false);
+      addEdits(log, hri2, tableName2, 1);
+      log.startCacheFlush(hri2.getEncodedNameAsBytes());
+      log.completeCacheFlush(hri2.getEncodedNameAsBytes());
       log.rollWriter();
       assertEquals(0, ((FSHLog) log).getNumLogFiles());
     } finally {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java?rev=1436632&r1=1436631&r2=1436632&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java Mon Jan 21 21:33:37 2013
@@ -117,7 +117,7 @@ public class TestWALActionsListener {
     assertEquals(11, observer.postLogRollCounter);
     assertEquals(5, laterobserver.preLogRollCounter);
     assertEquals(5, laterobserver.postLogRollCounter);
-    assertEquals(2, observer.closedCount);
+    assertEquals(1, observer.closedCount);
   }
 
 

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1436632&r1=1436631&r2=1436632&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Mon Jan 21 21:33:37 2013
@@ -557,8 +557,8 @@ public class TestWALReplay {
     }
 
     // Add a cache flush, shouldn't have any effect
-    long logSeqId = wal.startCacheFlush(regionName);
-    wal.completeCacheFlush(regionName, tableName, logSeqId, hri.isMetaRegion());
+    wal.startCacheFlush(regionName);
+    wal.completeCacheFlush(regionName);
 
     // Add an edit to another family, should be skipped.
     WALEdit edit = new WALEdit();
@@ -661,7 +661,7 @@ public class TestWALReplay {
     wal.doCompleteCacheFlush = true;
     // allow complete cache flush with the previous seq number got after first
     // set of edits.
-    wal.completeCacheFlush(hri.getEncodedNameAsBytes(), hri.getTableName(), sequenceNumber, false);
+    wal.completeCacheFlush(hri.getEncodedNameAsBytes());
     wal.close();
     FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
     HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf,
@@ -686,12 +686,11 @@ public class TestWALReplay {
     }
 
     @Override
-    public void completeCacheFlush(byte[] encodedRegionName, byte[] tableName, long logSeqId,
-        boolean isMetaRegion) throws IOException {
+    public void completeCacheFlush(byte[] encodedRegionName) {
       if (!doCompleteCacheFlush) {
         return;
       }
-      super.completeCacheFlush(encodedRegionName, tableName, logSeqId, isMetaRegion);
+      super.completeCacheFlush(encodedRegionName);
     }
   }