You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/07/17 00:28:21 UTC

svn commit: r964965 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/regionserver/wal/ src/test/java/org/apache/hadoop/hbase/regionserver/wal/

Author: stack
Date: Fri Jul 16 22:28:21 2010
New Revision: 964965

URL: http://svn.apache.org/viewvc?rev=964965&view=rev
Log:
HBASE-2727 Splits writing one file only is untenable; need dir of recovered edits ordered by sequenceid

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=964965&r1=964964&r2=964965&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Jul 16 22:28:21 2010
@@ -435,6 +435,8 @@ Release 0.21.0 - Unreleased
                (Nicolas Spiegelberg via Stack)
    HBASE-2781  ZKW.createUnassignedRegion doesn't make sure existing znode is 
                in the right state (Karthik Ranganathan via JD)
+   HBASE-2727  Splits writing one file only is untenable; need dir of recovered
+               edits ordered by sequenceid
 
   IMPROVEMENTS
    HBASE-1760  Cleanup TODOs in HTable

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=964965&r1=964964&r2=964965&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Jul 16 22:28:21 2010
@@ -19,6 +19,27 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Constructor;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -64,28 +85,6 @@ import org.apache.hadoop.util.StringUtil
 
 import com.google.common.collect.Lists;
 
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Constructor;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 /**
  * HRegion stores data for a certain region of a table.  It stores all columns
  * for each row. A given table consists of one or more HRegions.
@@ -126,6 +125,7 @@ public class HRegion implements HeapSize
   public static final Log LOG = LogFactory.getLog(HRegion.class);
   static final String SPLITDIR = "splits";
   static final String MERGEDIR = "merges";
+
   final AtomicBoolean closed = new AtomicBoolean(false);
   /* Closing can take some time; use the closing flag if there is stuff we don't
    * want to do while in closing state; e.g. like offer this region up to the
@@ -330,9 +330,8 @@ public class HRegion implements HeapSize
     // Remove temporary data left over from old regions
     cleanupTmpDir();
     
-    // Load in all the HStores.  Get min and max seqids across all families.
+    // Load in all the HStores.  Get maximum seqid.
     long maxSeqId = -1;
-    long minSeqId = Integer.MAX_VALUE;
     for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
       Store store = instantiateHStore(this.tableDir, c);
       this.stores.put(c.getName(), store);
@@ -340,12 +339,9 @@ public class HRegion implements HeapSize
       if (storeSeqId > maxSeqId) {
         maxSeqId = storeSeqId;
       }
-      if (minSeqId > storeSeqId) {
-        minSeqId = storeSeqId;
-      }
     }
     // Recover any edits if available.
-    long seqid = replayRecoveredEditsIfAny(this.regiondir, minSeqId, reporter);
+    maxSeqId = replayRecoveredEditsIfAny(this.regiondir, maxSeqId, reporter);
 
     // Get rid of any splits or merges that were lost in-progress.  Clean out
     // these directories here on open.  We may be opening a region that was
@@ -362,7 +358,7 @@ public class HRegion implements HeapSize
     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
     // Use maximum of log sequenceid or that which was found in stores
     // (particularly if no recovered edits, seqid will be -1).
-    long nextSeqid = Math.max(seqid, maxSeqId) + 1;
+    long nextSeqid = maxSeqId + 1;
     LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
     return nextSeqid;
   }
@@ -902,7 +898,9 @@ public class HRegion implements HeapSize
   }
 
   /**
-   * Flushing the cache is a little tricky. We have a lot of updates in the
+   * Flush the memstore.
+   * 
+   * Flushing the memstore is a little tricky. We have a lot of updates in the
    * memstore, all of which have also been written to the log. We need to
    * write those updates in the memstore out to disk, while being able to
    * process reads/writes as much as possible during the flush operation. Also,
@@ -934,6 +932,19 @@ public class HRegion implements HeapSize
    * because a Snapshot was not properly persisted.
    */
   protected boolean internalFlushcache() throws IOException {
+    return internalFlushcache(this.log, -1);
+  }
+
+  /**
+   * @param wal Null if we're NOT to go via hlog/wal.
+   * @param myseqid The seqid to use if <code>wal</code> is null writing out
+   * flush file.
+   * @return true if the region needs compacting
+   * @throws IOException
+   * @see {@link #internalFlushcache()}
+   */
+  protected boolean internalFlushcache(final HLog wal, final long myseqid)
+  throws IOException {
     final long startTime = EnvironmentEdgeManager.currentTimeMillis();
     // Clear flush flag.
     // Record latest flush time
@@ -945,7 +956,8 @@ public class HRegion implements HeapSize
     if (LOG.isDebugEnabled()) {
       LOG.debug("Started memstore flush for region " + this +
         ". Current region memstore size " +
-          StringUtils.humanReadableInt(this.memstoreSize.get()));
+        StringUtils.humanReadableInt(this.memstoreSize.get()) +
+        ((wal != null)? "": "; wal is null, using passed myseqid=" + myseqid));
     }
 
     // Stop updates while we snapshot the memstore of all stores. We only have
@@ -958,14 +970,14 @@ public class HRegion implements HeapSize
     long sequenceId = -1L;
     long completeSequenceId = -1L;
 
-    // we have to take a write lock during snapshot, or else a write could
+    // We have to take a write lock during snapshot, or else a write could
     // end up in both snapshot and memstore (makes it difficult to do atomic
     // rows then)
     this.updatesLock.writeLock().lock();
     final long currentMemStoreSize = this.memstoreSize.get();
     List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
     try {
-      sequenceId = log.startCacheFlush();
+      sequenceId = (wal == null)? myseqid: wal.startCacheFlush();
       completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
 
       for (Store s : stores.values()) {
@@ -1009,9 +1021,9 @@ public class HRegion implements HeapSize
       }
 
       try {
-	if (atomicWork != null) {
-	  atomicWork.call();
-	}
+        if (atomicWork != null) {
+          atomicWork.call();
+        }
 
         // Switch snapshot (in memstore) -> new hfile (thus causing
         // all the store scanners to reset/reseek).
@@ -1038,7 +1050,7 @@ public class HRegion implements HeapSize
       // We used to only catch IOEs but its possible that we'd get other
       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
       // all and sundry.
-      this.log.abortCacheFlush();
+      if (wal != null) wal.abortCacheFlush();
       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
           Bytes.toStringBinary(getRegionName()));
       dse.initCause(t);
@@ -1052,9 +1064,11 @@ public class HRegion implements HeapSize
     //     This tells future readers that the HStores were emitted correctly,
     //     and that all updates to the log for this regionName that have lower
     //     log-sequence-ids can be safely ignored.
-    this.log.completeCacheFlush(getRegionName(),
+    if (wal != null) {
+      wal.completeCacheFlush(getRegionName(),
         regionInfo.getTableDesc().getName(), completeSequenceId,
         this.getRegionInfo().isMetaRegion());
+    }
 
     // C. Finally notify anyone waiting on memstore to clear:
     // e.g. checkResources().
@@ -1067,12 +1081,12 @@ public class HRegion implements HeapSize
       LOG.info("Finished memstore flush of ~" +
         StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
         this + " in " + (now - startTime) + "ms, sequence id=" + sequenceId +
-        ", compaction requested=" + compactionRequested);
+        ", compaction requested=" + compactionRequested +
+        ((wal == null)? "; wal=null": ""));
     }
     return compactionRequested;
   }
 
-
    /**
     * A hook for sub classed wishing to perform operations prior to the cache
     * flush commit stage.
@@ -1853,46 +1867,75 @@ public class HRegion implements HeapSize
    * Read the edits log put under this region by wal log splitting process.  Put
    * the recovered edits back up into this region.
    *
-   * We can ignore any log message that has a sequence ID that's equal to or
+   * <p>We can ignore any log message that has a sequence ID that's equal to or
    * lower than minSeqId.  (Because we know such log messages are already
    * reflected in the HFiles.)
+   * 
+   * <p>While this is running we are putting pressure on memory yet we are
+   * outside of our usual accounting because we are not yet an onlined region
+   * (this stuff is being run as part of Region initialization).  This means
+   * that if we're up against global memory limits, we'll not be flagged to flush
+   * because we are not online. We can't be flushed by usual mechanisms anyways;
+   * we're not yet online so our relative sequenceids are not yet aligned with
+   * HLog sequenceids -- not till we come up online, post processing of split
+   * edits.
+   * 
+   * <p>But to help relieve memory pressure, at least manage our own heap size
+   * flushing if are in excess of per-region limits.  Flushing, though, we have
+   * to be careful and avoid using the regionserver/hlog sequenceid.  Its running
+   * on a different line to whats going on in here in this region context so if we
+   * crashed replaying these edits, but in the midst had a flush that used the
+   * regionserver log with a sequenceid in excess of whats going on in here
+   * in this region and with its split editlogs, then we could miss edits the
+   * next time we go to recover. So, we have to flush inline, using seqids that
+   * make sense in a this single region context only -- until we online.
+   * 
    * @param regiondir
-   * @param minSeqId Minimum sequenceid found in a store file.  Edits in log
-   * must be larger than this to be replayed.
+   * @param minSeqId Any edit found in split editlogs needs to be in excess of
+   * this minSeqId to be applied, else its skipped.
    * @param reporter
    * @return the sequence id of the last edit added to this region out of the
-   * recovered edits log, or -1 if no log recovered
+   * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
    * @throws UnsupportedEncodingException
    * @throws IOException
    */
   protected long replayRecoveredEditsIfAny(final Path regiondir,
       final long minSeqId, final Progressable reporter)
   throws UnsupportedEncodingException, IOException {
-    Path edits = new Path(regiondir, HLog.RECOVERED_EDITS);
-    if (edits == null || !this.fs.exists(edits)) return -1;
-    if (isZeroLengthThenDelete(this.fs, edits)) return -1;
-    long maxSeqIdInLog = -1;
-    try {
-      maxSeqIdInLog = replayRecoveredEdits(edits, minSeqId, reporter);
-      LOG.debug("Deleting recovered edits file: " + edits);
-      if (!this.fs.delete(edits, false)) {
-        LOG.error("Failed delete of " + edits);
-      }
-    } catch (IOException e) {
-      boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
-      if (skipErrors) {
-        Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
-          System.currentTimeMillis());
-        LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits +
-          " as " + moveAsideName, e);
-        if (!this.fs.rename(edits, moveAsideName)) {
-          LOG.error("hbase.skip.errors=true so continuing. Rename failed");
+    long seqid = minSeqId;
+    NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
+    if (files == null || files.isEmpty()) return seqid;
+    for (Path edits: files) {
+      if (edits == null || !this.fs.exists(edits)) {
+        LOG.warn("Null or non-existent edits file: " + edits);
+        continue;
+      }
+      if (isZeroLengthThenDelete(this.fs, edits)) continue;
+      try {
+        seqid = replayRecoveredEdits(edits, seqid, reporter);
+      } catch (IOException e) {
+        boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
+        if (skipErrors) {
+          Path p = HLog.moveAsideBadEditsFile(fs, edits);
+          LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits +
+            " as " + p, e);
+        } else {
+          throw e;
+        }
+      }
+    }
+    if (seqid > minSeqId) {
+      // Then we added some edits to memory. Flush and cleanup split edit files.
+      internalFlushcache(null, seqid);
+      for (Path file: files) {
+        if (!this.fs.delete(file, false)) {
+          LOG.error("Failed delete of " + file);
+        } else {
+          LOG.debug("Deleted recovered.edits file=" + file);
         }
-      } else {
-        throw e;
       }
     }
-    return maxSeqIdInLog;
+    return seqid;
   }
 
   /*
@@ -1901,12 +1944,13 @@ public class HRegion implements HeapSize
    * must be larger than this to be replayed.
    * @param reporter
    * @return the sequence id of the last edit added to this region out of the
-   * recovered edits log, or -1 if no log recovered
+   * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
    * @throws IOException
    */
   private long replayRecoveredEdits(final Path edits,
       final long minSeqId, final Progressable reporter)
     throws IOException {
+    LOG.info("Replaying edits from " + edits + "; minSeqId=" + minSeqId);
     HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
     try {
       return replayRecoveredEdits(reader, minSeqId, reporter);
@@ -1916,26 +1960,22 @@ public class HRegion implements HeapSize
   }
 
  /* @param reader Reader against file of recovered edits.
-  * @param minSeqId Minimum sequenceid found in a store file.  Edits in log
-  * must be larger than this to be replayed.
+  * @param minSeqId Any edit found in split editlogs needs to be in excess of
+  * this minSeqId to be applied, else its skipped.
   * @param reporter
   * @return the sequence id of the last edit added to this region out of the
-  * recovered edits log, or -1 if no log recovered
+  * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
   * @throws IOException
   */
   private long replayRecoveredEdits(final HLog.Reader reader,
     final long minSeqId, final Progressable reporter)
   throws IOException {
-    long currentEditSeqId = -1;
+    long currentEditSeqId = minSeqId;
     long firstSeqIdInLog = -1;
     long skippedEdits = 0;
     long editsCount = 0;
     HLog.Entry entry;
     Store store = null;
-    // Get map of family name to maximum sequence id.  Do it here up front
-    // because as we progress, the sequence id can change if we happen to flush
-    // The flush ups the seqid for the Store.  The new seqid can cause us skip edits.
-    Map<byte [], Long> familyToOriginalMaxSeqId = familyToMaxSeqId(this.stores);
     // How many edits to apply before we send a progress report.
     int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
     while ((entry = reader.next()) != null) {
@@ -1945,12 +1985,13 @@ public class HRegion implements HeapSize
         firstSeqIdInLog = key.getLogSeqNum();
       }
       // Now, figure if we should skip this edit.
-      currentEditSeqId = Math.max(currentEditSeqId, key.getLogSeqNum());
-      if (key.getLogSeqNum() <= minSeqId) {
+      if (key.getLogSeqNum() <= currentEditSeqId) {
         skippedEdits++;
         continue;
       }
-      for (KeyValue kv : val.getKeyValues()) {
+      currentEditSeqId = key.getLogSeqNum();
+      boolean flush = false;
+      for (KeyValue kv: val.getKeyValues()) {
         // Check this edit is for me. Also, guard against writing the special
         // METACOLUMN info such as HBASE::CACHEFLUSH entries
         if (kv.matchingFamily(HLog.METAFAMILY) ||
@@ -1969,16 +2010,13 @@ public class HRegion implements HeapSize
           skippedEdits++;
           continue;
         }
-        // The edits' id has to be in excess of the original max seqid of the
-        // targeted store.
-        long storeMaxSeqId = familyToOriginalMaxSeqId.get(store.getFamily().getName());
-        if (currentEditSeqId < storeMaxSeqId) {
-          skippedEdits++;
-          continue;
-        }
-        restoreEdit(kv);
+        // Once we are over the limit, restoreEdit will keep returning true to
+        // flush -- but don't flush until we've played all the kvs that make up
+        // the WALEdit.
+        flush = restoreEdit(store, kv);
         editsCount++;
      }
+     if (flush) internalFlushcache(null, currentEditSeqId);
 
       // Every 'interval' edits, tell the reporter we're making progress.
       // Have seen 60k edits taking 3minutes to complete.
@@ -1994,34 +2032,14 @@ public class HRegion implements HeapSize
     return currentEditSeqId;
   }
 
-  /*
-   * @param stores
-   * @return Map of family name to maximum sequenceid.
-   */
-  private Map<byte [], Long> familyToMaxSeqId(final Map<byte [], Store> stores) {
-    Map<byte [], Long> map = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
-    for (Map.Entry<byte [], Store> e: stores.entrySet()) {
-      map.put(e.getKey(), e.getValue().getMaxSequenceId());
-    }
-    return map;
-  }
-
-  /*
-   * @param kv Apply this value to this region.
-   * @throws IOException
+  /**
+   * Used by tests
+   * @param s Store to add edit too.
+   * @param kv KeyValue to add.
+   * @return True if we should flush.
    */
-  // This method is protected so can be called from tests.
-  protected void restoreEdit(final KeyValue kv) throws IOException {
-    // This is really expensive to do per edit.  Loads of object creation.
-    // TODO: Optimization.  Apply edits batched by family.
-    Map<byte [], List<KeyValue>> map =
-      new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
-    map.put(kv.getFamily(), Collections.singletonList(kv));
-    if (kv.isDelete()) {
-      delete(map, true);
-    } else {
-      put(map, true);
-    }
+  protected boolean restoreEdit(final Store s, final KeyValue kv) {
+    return isFlushSize(this.memstoreSize.addAndGet(s.add(kv)));
   }
 
   /*

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=964965&r1=964964&r2=964965&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Jul 16 22:28:21 2010
@@ -36,7 +36,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableMap;
+import java.util.NavigableSet;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -53,6 +53,7 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
@@ -61,6 +62,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -119,9 +121,17 @@ import com.google.common.util.concurrent
  */
 public class HLog implements Syncable {
   static final Log LOG = LogFactory.getLog(HLog.class);
-  private static final String HLOG_DATFILE = "hlog.dat.";
   public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
   static final byte [] METAROW = Bytes.toBytes("METAROW");
+
+  /*
+   * Name of directory that holds recovered edits written by the wal log
+   * splitting code, one per region
+   */
+  private static final String RECOVERED_EDITS_DIR = "recovered.edits";
+  private static final Pattern EDITFILES_NAME_PATTERN =
+    Pattern.compile("-?[0-9]+");
+  
   private final FileSystem fs;
   private final Path dir;
   private final Configuration conf;
@@ -144,11 +154,6 @@ public class HLog implements Syncable {
   private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
   final static Object [] NO_ARGS = new Object []{};
 
-  /** Name of file that holds recovered edits written by the wal log splitting
-   * code, one per region
-   */
-  public static final String RECOVERED_EDITS = "recovered.edits";
-
   // used to indirectly tell syncFs to force the sync
   private boolean forceSync = false;
 
@@ -1459,7 +1464,7 @@ public class HLog implements Syncable {
     NamingThreadFactory f  = new NamingThreadFactory(
             "SplitWriter-%1$d", Executors.defaultThreadFactory());
     ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, f);
-    for (final byte[] region : splitLogsMap.keySet()) {
+    for (final byte [] region : splitLogsMap.keySet()) {
       Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf);
       writeFutureResult.put(region, threadPool.submit(splitter));
     }
@@ -1579,17 +1584,19 @@ public class HLog implements Syncable {
           WriterAndPath wap = logWriters.get(region);
           for (Entry logEntry: entries) {
             if (wap == null) {
-              Path logFile = getRegionLogPath(logEntry, rootDir);
-              if (fs.exists(logFile)) {
-                LOG.warn("Found existing old hlog file. It could be the result of a previous" +
-                        "failed split attempt. Deleting " + logFile +
-                        ", length=" + fs.getFileStatus(logFile).getLen());
-                fs.delete(logFile, false);
+              Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir);
+              if (fs.exists(regionedits)) {
+                LOG.warn("Found existing old edits file. It could be the " +
+                  "result of a previous failed split attempt. Deleting " +
+                  regionedits + ", length=" + fs.getFileStatus(regionedits).getLen());
+                if (!fs.delete(regionedits, false)) {
+                  LOG.warn("Failed delete of old " + regionedits);
+                }
               }
-              Writer w = createWriter(fs, logFile, conf);
-              wap = new WriterAndPath(logFile, w);
+              Writer w = createWriter(fs, regionedits, conf);
+              wap = new WriterAndPath(regionedits, w);
               logWriters.put(region, wap);
-              LOG.debug("Creating writer path=" + logFile +
+              LOG.debug("Creating writer path=" + regionedits +
                 " region=" + Bytes.toStringBinary(region));
             }
             wap.w.append(logEntry);
@@ -1643,14 +1650,101 @@ public class HLog implements Syncable {
     }
   }
 
-  private static Path getRegionLogPath(Entry logEntry, Path rootDir) {
-    Path tableDir =
-      HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename());
-    Path regionDir =
-            HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
-    return new Path(regionDir, RECOVERED_EDITS);
+  /*
+   * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
+   * <code>logEntry</code> named for the sequenceid in the passed
+   * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
+   * This method also ensures existence of RECOVERED_EDITS_DIR under the region
+   * creating it if necessary.
+   * @param fs
+   * @param logEntry
+   * @param rootDir HBase root dir.
+   * @return Path to file into which to dump split log edits.
+   * @throws IOException
+   */
+  private static Path getRegionSplitEditsPath(final FileSystem fs,
+      final Entry logEntry, final Path rootDir)
+  throws IOException {
+    Path tableDir = HTableDescriptor.getTableDir(rootDir,
+      logEntry.getKey().getTablename());
+    Path regiondir = HRegion.getRegionDir(tableDir,
+      HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
+    Path dir = getRegionDirRecoveredEditsDir(regiondir);
+    if (!fs.exists(dir)) {
+      if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
+    }
+    return new Path(dir,
+      formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum()));
    }
 
+  static String formatRecoveredEditsFileName(final long seqid) {
+    return String.format("%019d", seqid);
+  }
+
+
+  /**
+   * Returns sorted set of edit files made by wal-log splitter.
+   * @param fs
+   * @param regiondir
+   * @return Files in passed <code>regiondir</code> as a sorted set.
+   * @throws IOException
+   */
+  public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
+      final Path regiondir)
+  throws IOException {
+    Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
+    FileStatus [] files = fs.listStatus(editsdir, new PathFilter () {
+      @Override
+      public boolean accept(Path p) {
+        boolean result = false;
+        try {
+          // Return files and only files that match the editfile names pattern.
+          // There can be other files in this directory other than edit files.
+          // In particular, on error, we'll move aside the bad edit file giving
+          // it a timestamp suffix.  See moveAsideBadEditsFile.
+          Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
+          result = fs.isFile(p) && m.matches();
+        } catch (IOException e) {
+          LOG.warn("Failed isFile check on " + p);
+        }
+        return result;
+      }
+    });
+    NavigableSet<Path> filesSorted = new TreeSet<Path>();
+    if (files == null) return filesSorted;
+    for (FileStatus status: files) {
+      filesSorted.add(status.getPath());
+    }
+    return filesSorted;
+  }
+
+  /**
+   * Move aside a bad edits file.
+   * @param fs
+   * @param edits Edits file to move aside.
+   * @return The name of the moved aside file.
+   * @throws IOException
+   */
+  public static Path moveAsideBadEditsFile(final FileSystem fs,
+      final Path edits)
+  throws IOException {
+    Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
+      System.currentTimeMillis());
+    if (!fs.rename(edits, moveAsideName)) {
+      LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
+    }
+    return moveAsideName;
+  }
+
+  /**
+   * @param regiondir This regions directory in the filesystem.
+   * @return The directory that holds recovered edits files for the region
+   * <code>regiondir</code>
+   */
+  public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
+    return new Path(regiondir, RECOVERED_EDITS_DIR);
+  }
+
   /**
    *
    * @param visitor

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=964965&r1=964964&r2=964965&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Fri Jul 16 22:28:21 2010
@@ -476,9 +476,6 @@ public class TestHLogSplit {
     assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
   }
 
-
-
-
   /**
    * This thread will keep writing to the file after the split process has started
    * It simulates a region server that was considered dead but woke up and wrote
@@ -610,11 +607,14 @@ public class TestHLogSplit {
     }
   }
 
-  private Path getLogForRegion(Path rootdir, byte[] table, String region) {
-    return new Path(HRegion.getRegionDir(HTableDescriptor
-            .getTableDir(rootdir, table),
-            HRegionInfo.encodeRegionName(region.getBytes())),
-            HLog.RECOVERED_EDITS);
+  private Path getLogForRegion(Path rootdir, byte[] table, String region)
+  throws IOException {
+    Path tdir = HTableDescriptor.getTableDir(rootdir, table);
+    Path editsdir = HLog.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
+      HRegionInfo.encodeRegionName(region.getBytes())));
+    FileStatus [] files = this.fs.listStatus(editsdir);
+    assertEquals(1, files.length);
+    return files[0].getPath();
   }
 
   private void corruptHLog(Path path, Corruptions corruption, boolean close,
@@ -722,8 +722,15 @@ public class TestHLogSplit {
     FileStatus[] f2 = fs.listStatus(p2);
 
     for (int i=0; i<f1.length; i++) {
-      if (!logsAreEqual(new Path(f1[i].getPath(), HLog.RECOVERED_EDITS),
-              new Path(f2[i].getPath(), HLog.RECOVERED_EDITS))) {
+      // Regions now have a directory named RECOVERED_EDITS_DIR and in here
+      // are split edit files.  In below presume only 1.
+      Path rd1 = HLog.getRegionDirRecoveredEditsDir(f1[i].getPath());
+      FileStatus [] rd1fs = fs.listStatus(rd1);
+      assertEquals(1, rd1fs.length);
+      Path rd2 = HLog.getRegionDirRecoveredEditsDir(f2[i].getPath());
+      FileStatus [] rd2fs = fs.listStatus(rd2);
+      assertEquals(1, rd2fs.length);
+      if (!logsAreEqual(rd1fs[0].getPath(), rd2fs[0].getPath())) {
         return -1;
       }
     }
@@ -745,6 +752,4 @@ public class TestHLogSplit {
     }
     return true;
   }
-
-
 }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=964965&r1=964964&r2=964965&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Fri Jul 16 22:28:21 2010
@@ -42,8 +42,8 @@ import org.apache.hadoop.hbase.client.Ge
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -115,6 +115,57 @@ public class TestWALReplay {
   }
 
   /**
+   * Tests for hbase-2727.
+   * @throws Exception
+   * @see https://issues.apache.org/jira/browse/HBASE-2727
+   */
+  @Test
+  public void test2727() throws Exception {
+    // Test being able to have > 1 set of edits in the recovered.edits directory.
+    // Ensure edits are replayed properly.
+    final String tableNameStr = "test2727";
+    HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
+    Path basedir = new Path(hbaseRootDir, tableNameStr);
+    deleteDir(basedir);
+
+    final byte [] tableName = Bytes.toBytes(tableNameStr);
+    final byte [] rowName = tableName;
+
+    HLog wal1 = createWAL(this.conf);
+    // Add 1k to each family.
+    final int countPerFamily = 1000;
+    for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
+      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal1);
+    }
+    wal1.close();
+    runWALSplit(this.conf);
+
+    HLog wal2 = createWAL(this.conf);
+    // Up the sequenceid so that these edits are after the ones added above.
+    wal2.setSequenceNumber(wal1.getSequenceNumber());
+    // Add 1k to each family.
+    for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
+      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal2);
+    }
+    wal2.close();
+    runWALSplit(this.conf);
+
+    HLog wal3 = createWAL(this.conf);
+    wal3.setSequenceNumber(wal2.getSequenceNumber());
+    try {
+      final HRegion region = new HRegion(basedir, wal3, this.fs, this.conf, hri,
+          null);
+      long seqid = region.initialize();
+      assertTrue(seqid > wal3.getSequenceNumber());
+
+      // TODO: Scan all.
+      region.close();
+    } finally {
+      wal3.closeAndDelete();
+    }
+  }
+
+  /**
    * Test case of HRegion that is only made out of bulk loaded files.  Assert
    * that we don't 'crash'.
    * @throws IOException
@@ -210,8 +261,8 @@ public class TestWALReplay {
     HLog wal2 = createWAL(this.conf);
     HRegion region2 = new HRegion(basedir, wal2, this.fs, this.conf, hri, null) {
       @Override
-      protected void restoreEdit(KeyValue kv) throws IOException {
-        super.restoreEdit(kv);
+      protected boolean restoreEdit(Store s, KeyValue kv) {
+        super.restoreEdit(s, kv);
         throw new RuntimeException("Called when it should not have been!");
       }
     };
@@ -221,7 +272,7 @@ public class TestWALReplay {
     assertTrue(seqid + result.size() < seqid2);
 
     // Next test.  Add more edits, then 'crash' this region by stealing its wal
-    // out from under it and assert that replay of the log addes the edits back
+    // out from under it and assert that replay of the log adds the edits back
     // correctly when region is opened again.
     for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
@@ -242,9 +293,10 @@ public class TestWALReplay {
     final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
     HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, null) {
       @Override
-      protected void restoreEdit(KeyValue kv) throws IOException {
-        super.restoreEdit(kv);
+      protected boolean restoreEdit(Store s, KeyValue kv) {
+        boolean b = super.restoreEdit(s, kv);
         countOfRestoredEdits.incrementAndGet();
+        return b;
       }
     };
     long seqid3 = region3.initialize();
@@ -317,14 +369,20 @@ public class TestWALReplay {
     newConf.setInt("hbase.hregion.memstore.flush.size", 1024 * 100);
     // Make a new wal for new region.
     HLog newWal = createWAL(newConf);
+    final AtomicInteger flushcount = new AtomicInteger(0);
     try {
-      TestFlusher flusher = new TestFlusher();
       final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri,
-          flusher);
-      flusher.r = region;
+          null) {
+        protected boolean internalFlushcache(HLog wal, long myseqid)
+        throws IOException {
+          boolean b = super.internalFlushcache(wal, myseqid);
+          flushcount.incrementAndGet();
+          return b;
+        };
+      };
       long seqid = region.initialize();
-      // Assert we flushed.
-      assertTrue(flusher.count > 0);
+      // We flushed during init.
+      assertTrue(flushcount.get() > 0);
       assertTrue(seqid > wal.getSequenceNumber());
 
       Get get = new Get(rowName);
@@ -338,23 +396,6 @@ public class TestWALReplay {
     }
   }
 
-  // Flusher used in this test.  Keep count of how often we are called and
-  // actually run the flush inside here.
-  class TestFlusher implements FlushRequester {
-    private int count = 0;
-    private HRegion r;
-
-    @Override
-    public void request(HRegion region) {
-      count++;
-      try {
-        r.flushcache();
-      } catch (IOException e) {
-        throw new RuntimeException("Exception flushing", e);
-      }
-    }
-  }
-
   private void addWALEdits (final byte [] tableName, final HRegionInfo hri,
       final byte [] rowName, final byte [] family, 
       final int count, EnvironmentEdge ee, final HLog wal)