You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2014/04/25 23:03:16 UTC

svn commit: r1590146 - in /hbase/branches/0.96/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver...

Author: jdcryans
Date: Fri Apr 25 21:03:15 2014
New Revision: 1590146

URL: http://svn.apache.org/r1590146
Log:
HBASE-10958 [dataloss] Bulk loading with seqids can prevent some log entries from being replayed

Added:
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileTestUtil.java
Modified:
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1590146&r1=1590145&r2=1590146&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Apr 25 21:03:15 2014
@@ -379,6 +379,81 @@ public class HRegion implements HeapSize
         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
   }
 
+  /**
+   * Objects from this class are created when flushing to describe all the different states that
+   * that method ends up in. The Result enum describes those states. The sequence id should only
+   * be specified if the flush was successful, and the failure message should only be speficied
+   * if it didn't flush.
+   */
+  public static class FlushResult {
+    enum Result {
+      FLUSHED_NO_COMPACTION_NEEDED,
+      FLUSHED_COMPACTION_NEEDED,
+      // Special case where a flush didn't run because there's nothing in the memstores. Used when
+      // bulk loading to know when we can still load even if a flush didn't happen.
+      CANNOT_FLUSH_MEMSTORE_EMPTY,
+      CANNOT_FLUSH
+      // Be careful adding more to this enum, look at the below methods to make sure
+    }
+
+    final Result result;
+    final String failureReason;
+    final long flushSequenceId;
+
+    /**
+     * Convenience constructor to use when the flush is successful, the failure message is set to
+     * null.
+     * @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED.
+     * @param flushSequenceId Generated sequence id that comes right after the edits in the
+     *                        memstores.
+     */
+    FlushResult(Result result, long flushSequenceId) {
+      this(result, flushSequenceId, null);
+      assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
+          .FLUSHED_COMPACTION_NEEDED;
+    }
+
+    /**
+     * Convenience constructor to use when we cannot flush.
+     * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
+     * @param failureReason Reason why we couldn't flush.
+     */
+    FlushResult(Result result, String failureReason) {
+      this(result, -1, failureReason);
+      assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
+    }
+
+    /**
+     * Constructor with all the parameters.
+     * @param result Any of the Result.
+     * @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
+     * @param failureReason Reason why we couldn't flush, or null.
+     */
+    FlushResult(Result result, long flushSequenceId, String failureReason) {
+      this.result = result;
+      this.flushSequenceId = flushSequenceId;
+      this.failureReason = failureReason;
+    }
+
+    /**
+     * Convenience method, the equivalent of checking if result is
+     * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
+     * @return true if the memstores were flushed, else false.
+     */
+    public boolean isFlushSucceeded() {
+      return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
+          .FLUSHED_COMPACTION_NEEDED;
+    }
+
+    /**
+     * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
+     * @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
+     */
+    public boolean isCompactionNeeded() {
+      return result == Result.FLUSHED_COMPACTION_NEEDED;
+    }
+  }
+
   final WriteState writestate = new WriteState();
 
   long memstoreFlushSize;
@@ -686,16 +761,13 @@ public class HRegion implements HeapSize
         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
           Future<HStore> future = completionService.take();
           HStore store = future.get();
-
           this.stores.put(store.getColumnFamilyName().getBytes(), store);
-          // Do not include bulk loaded files when determining seqIdForReplay
-          long storeSeqIdForReplay = store.getMaxSequenceId(false);
+
+          long storeMaxSequenceId = store.getMaxSequenceId();
           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
-              storeSeqIdForReplay);
-          // Include bulk loaded files when determining seqIdForAssignment
-          long storeSeqIdForAssignment = store.getMaxSequenceId(true);
-          if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) {
-            maxSeqId = storeSeqIdForAssignment;
+              storeMaxSequenceId);
+          if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
+            maxSeqId = storeMaxSequenceId;
           }
           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
           if (maxStoreMemstoreTS > maxMemstoreTS) {
@@ -1409,11 +1481,12 @@ public class HRegion implements HeapSize
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
    */
-  public boolean flushcache() throws IOException {
+  public FlushResult flushcache() throws IOException {
     // fail-fast instead of waiting on the lock
     if (this.closing.get()) {
-      LOG.debug("Skipping flush on " + this + " because closing");
-      return false;
+      String msg = "Skipping flush on " + this + " because closing";
+      LOG.debug(msg);
+      return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
     }
     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
     status.setStatus("Acquiring readlock on region");
@@ -1421,9 +1494,10 @@ public class HRegion implements HeapSize
     lock.readLock().lock();
     try {
       if (this.closed.get()) {
-        LOG.debug("Skipping flush on " + this + " because closed");
-        status.abort("Skipped: closed");
-        return false;
+        String msg = "Skipping flush on " + this + " because closed";
+        LOG.debug(msg);
+        status.abort(msg);
+        return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
       }
       if (coprocessorHost != null) {
         status.setStatus("Running coprocessor pre-flush hooks");
@@ -1442,14 +1516,15 @@ public class HRegion implements HeapSize
                 + ", flushing=" + writestate.flushing + ", writesEnabled="
                 + writestate.writesEnabled);
           }
-          status.abort("Not flushing since "
+          String msg = "Not flushing since "
               + (writestate.flushing ? "already flushing"
-                  : "writes not enabled"));
-          return false;
+              : "writes not enabled");
+          status.abort(msg);
+          return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
         }
       }
       try {
-        boolean result = internalFlushcache(status);
+        FlushResult fs = internalFlushcache(status);
 
         if (coprocessorHost != null) {
           status.setStatus("Running post-flush coprocessor hooks");
@@ -1457,7 +1532,7 @@ public class HRegion implements HeapSize
         }
 
         status.markComplete("Flush successful");
-        return result;
+        return fs;
       } finally {
         synchronized (writestate) {
           writestate.flushing = false;
@@ -1523,13 +1598,13 @@ public class HRegion implements HeapSize
    * <p> This method may block for some time.
    * @param status
    *
-   * @return true if the region needs compacting
+   * @return object describing the flush's state
    *
    * @throws IOException general io exceptions
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
    */
-  protected boolean internalFlushcache(MonitoredTask status)
+  protected FlushResult internalFlushcache(MonitoredTask status)
       throws IOException {
     return internalFlushcache(this.log, -1, status);
   }
@@ -1543,7 +1618,7 @@ public class HRegion implements HeapSize
    * @throws IOException
    * @see #internalFlushcache(MonitoredTask)
    */
-  protected boolean internalFlushcache(
+  protected FlushResult internalFlushcache(
       final HLog wal, final long myseqid, MonitoredTask status)
   throws IOException {
     if (this.rsServices != null && this.rsServices.isAborted()) {
@@ -1554,7 +1629,7 @@ public class HRegion implements HeapSize
     // Clear flush flag.
     // If nothing to flush, return and avoid logging start/stop flush.
     if (this.memstoreSize.get() <= 0) {
-      return false;
+      return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Started memstore flush for " + this +
@@ -1590,9 +1665,10 @@ public class HRegion implements HeapSize
       if (wal != null) {
         Long startSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
         if (startSeqId == null) {
-          status.setStatus("Flush will not be started for [" + this.getRegionInfo().getEncodedName()
-              + "] - WAL is going away");
-          return false;
+          String msg = "Flush will not be started for ["
+              + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
+          status.setStatus(msg);
+          return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
         }
         flushSeqId = startSeqId.longValue();
       } else {
@@ -1709,7 +1785,8 @@ public class HRegion implements HeapSize
     status.setStatus(msg);
     this.recentFlushes.add(new Pair<Long,Long>(time/1000, totalFlushableSize));
 
-    return compactionRequested;
+    return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
+        FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -3381,6 +3458,22 @@ public class HRegion implements HeapSize
         return false;
       }
 
+      long seqId = -1;
+      // We need to assign a sequential ID that's in between two memstores in order to preserve
+      // the guarantee that all the edits lower than the highest sequential ID from all the
+      // HFiles are flushed on disk. See HBASE-10958.
+      if (assignSeqId) {
+        FlushResult fs = this.flushcache();
+        if (fs.isFlushSucceeded()) {
+          seqId = fs.flushSequenceId;
+        } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
+          seqId = this.log.obtainSeqNum();
+        } else {
+          throw new IOException("Could not bulk load with an assigned sequential ID because the " +
+              "flush didn't run. Reason for not flushing: " + fs.failureReason);
+        }
+      }
+
       for (Pair<byte[], String> p : familyPaths) {
         byte[] familyName = p.getFirst();
         String path = p.getSecond();
@@ -3390,7 +3483,7 @@ public class HRegion implements HeapSize
           if(bulkLoadListener != null) {
             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
           }
-          store.bulkLoadHFile(finalPath, assignSeqId ? this.log.obtainSeqNum() : -1);
+          store.bulkLoadHFile(finalPath, seqId);
           if(bulkLoadListener != null) {
             bulkLoadListener.doneBulkLoad(familyName, path);
           }

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1590146&r1=1590145&r2=1590146&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Apr 25 21:03:15 2014
@@ -3704,7 +3704,7 @@ public class HRegionServer implements Cl
       }
       FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
       if (shouldFlush) {
-        boolean result = region.flushcache();
+        boolean result = region.flushcache().isCompactionNeeded();
         if (result) {
           this.compactSplitThread.requestSystemCompaction(region,
               "Compaction through user triggered flush");

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1590146&r1=1590145&r2=1590146&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Fri Apr 25 21:03:15 2014
@@ -359,8 +359,8 @@ public class HStore implements Store {
   /**
    * @return The maximum sequence id in all store files. Used for log replay.
    */
-  long getMaxSequenceId(boolean includeBulkFiles) {
-    return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkFiles);
+  long getMaxSequenceId() {
+    return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
   }
 
   @Override

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1590146&r1=1590145&r2=1590146&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Fri Apr 25 21:03:15 2014
@@ -458,7 +458,7 @@ class MemStoreFlusher implements FlushRe
     }
     lock.readLock().lock();
     try {
-      boolean shouldCompact = region.flushcache();
+      boolean shouldCompact = region.flushcache().isCompactionNeeded();
       // We just want to check the size
       boolean shouldSplit = region.checkSplit() != null;
       if (shouldSplit) {

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1590146&r1=1590145&r2=1590146&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Fri Apr 25 21:03:15 2014
@@ -301,21 +301,15 @@ public class StoreFile {
 
   /**
    * Return the highest sequence ID found across all storefiles in
-   * the given list. Store files that were created by a mapreduce
-   * bulk load are ignored, as they do not correspond to any edit
-   * log items.
+   * the given list.
    * @param sfs
-   * @param includeBulkLoadedFiles
    * @return 0 if no non-bulk-load files are provided or, this is Store that
    * does not yet have any store files.
    */
-  public static long getMaxSequenceIdInList(Collection<StoreFile> sfs,
-      boolean includeBulkLoadedFiles) {
+  public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
     long max = 0;
     for (StoreFile sf : sfs) {
-      if (includeBulkLoadedFiles || !sf.isBulkLoadResult()) {
-        max = Math.max(max, sf.getMaxSequenceId());
-      }
+      max = Math.max(max, sf.getMaxSequenceId());
     }
     return max;
   }

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java?rev=1590146&r1=1590145&r2=1590146&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java (original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java Fri Apr 25 21:03:15 2014
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.junit.*;
 import org.junit.experimental.categories.Category;
 
@@ -62,10 +63,6 @@ public class TestLoadIncrementalHFiles {
     Bytes.toBytes("ppp")
   };
 
-  public static int BLOCKSIZE = 64*1024;
-  public static String COMPRESSION =
-    Compression.Algorithm.NONE.getName();
-
   static HBaseTestingUtility util = new HBaseTestingUtility();
 
   @BeforeClass
@@ -142,7 +139,7 @@ public class TestLoadIncrementalHFiles {
     for (byte[][] range : hfileRanges) {
       byte[] from = range[0];
       byte[] to = range[1];
-      createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
           + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
     }
     int expectedRows = hfileIdx * 1000;
@@ -184,7 +181,7 @@ public class TestLoadIncrementalHFiles {
     for (byte[][] range : hfileRanges) {
       byte[] from = range[0];
       byte[] to = range[1];
-      createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
           + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
     }
 
@@ -228,7 +225,7 @@ public class TestLoadIncrementalHFiles {
     for (byte[][] range : hfileRanges) {
       byte[] from = range[0];
       byte[] to = range[1];
-      createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
           + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
     }
 
@@ -272,7 +269,7 @@ public class TestLoadIncrementalHFiles {
     FileSystem fs = util.getTestFileSystem();
     Path testIn = new Path(dir, "testhfile");
     HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
-    createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
+    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
         Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
 
     Path bottomOut = new Path(dir, "bottom.out");
@@ -305,37 +302,6 @@ public class TestLoadIncrementalHFiles {
     return count;
   }
 
-
-  /**
-   * Create an HFile with the given number of rows between a given
-   * start key and end key.
-   * TODO put me in an HFileTestUtil or something?
-   */
-  static void createHFile(
-      Configuration conf,
-      FileSystem fs, Path path,
-      byte[] family, byte[] qualifier,
-      byte[] startKey, byte[] endKey, int numRows) throws IOException
-  {
-    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf))
-        .withPath(fs, path)
-        .withBlockSize(BLOCKSIZE)
-        .withCompression(COMPRESSION)
-        .create();
-    long now = System.currentTimeMillis();
-    try {
-      // subtract 2 since iterateOnSplits doesn't include boundary keys
-      for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) {
-        KeyValue kv = new KeyValue(key, family, qualifier, now, key);
-        writer.append(kv);
-      }
-    } finally {
-      writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
-          Bytes.toBytes(System.currentTimeMillis()));
-      writer.close();
-    }
-  }
-
   private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
     Integer value = map.containsKey(first)?(Integer)map.get(first):0;
     map.put(first, value+1);
@@ -411,7 +377,7 @@ public class TestLoadIncrementalHFiles {
     byte[] from = Bytes.toBytes("begin");
     byte[] to = Bytes.toBytes("end");
     for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
-      createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
           + i), FAMILY, QUALIFIER, from, to, 1000);
     }
 

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1590146&r1=1590145&r2=1590146&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Fri Apr 25 21:03:15 2014
@@ -3396,7 +3396,7 @@ public class TestHRegion {
 
         @Override
         public void doAnAction() throws Exception {
-          if (region.flushcache()) {
+          if (region.flushcache().isCompactionNeeded()) {
             ++flushesSinceCompact;
           }
           // Compact regularly to avoid creating too many files and exceeding
@@ -4241,6 +4241,42 @@ public class TestHRegion {
     }
   }
 
+  /**
+   * Test that we get the expected flush results back
+   * @throws IOException
+   */
+  @Test
+  public void testFlushResult() throws IOException {
+    String method = name.getMethodName();
+    byte[] tableName = Bytes.toBytes(method);
+    byte[] family = Bytes.toBytes("family");
+
+    this.region = initHRegion(tableName, method, CONF, family);
+
+    // empty memstore, flush doesn't run
+    HRegion.FlushResult fr = region.flushcache();
+    assertFalse(fr.isFlushSucceeded());
+    assertFalse(fr.isCompactionNeeded());
+
+    // Flush enough files to get up to the threshold, doesn't need compactions
+    for (int i = 0; i < 2; i++) {
+      Put put = new Put(tableName).add(family, family, tableName);
+      region.put(put);
+      fr = region.flushcache();
+      assertTrue(fr.isFlushSucceeded());
+      assertFalse(fr.isCompactionNeeded());
+    }
+
+    // Two flushes after the threshold, compactions are needed
+    for (int i = 0; i < 2; i++) {
+      Put put = new Put(tableName).add(family, family, tableName);
+      region.put(put);
+      fr = region.flushcache();
+      assertTrue(fr.isFlushSucceeded());
+      assertTrue(fr.isCompactionNeeded());
+    }
+  }
+
   private Configuration initSplit() {
     // Always compact if there is more than one store file.
     CONF.setInt("hbase.hstore.compactionThreshold", 2);

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1590146&r1=1590145&r2=1590146&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Fri Apr 25 21:03:15 2014
@@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -315,7 +316,7 @@ public class TestWALReplay {
   throws IOException, SecurityException, IllegalArgumentException,
       NoSuchFieldException, IllegalAccessException, InterruptedException {
     final TableName tableName =
-        TableName.valueOf("testReplayEditsWrittenViaHRegion");
+        TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
     final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
     deleteDir(basedir);
@@ -326,18 +327,21 @@ public class TestWALReplay {
     HLog wal = createWAL(this.conf);
     HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
     Path f =  new Path(basedir, "hfile");
-    HFile.Writer writer =
-      HFile.getWriterFactoryNoCache(conf).withPath(fs, f).create();
+
     byte [] family = htd.getFamilies().iterator().next().getName();
-    byte [] row = tableName.getName();
-    writer.append(new KeyValue(row, family, family, row));
-    writer.close();
+    HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
+        Bytes.toBytes("z"), 10);
     List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
     hfs.add(Pair.newPair(family, f.toString()));
     region.bulkLoadHFiles(hfs, true);
+
     // Add an edit so something in the WAL
+    byte [] row = tableName.getName();
     region.put((new Put(row)).add(family, family, family));
     wal.sync();
+    final int rowsInsertedCount = 11;
+
+    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
 
     // Now 'crash' the region by stealing its wal
     final Configuration newConf = HBaseConfiguration.create(this.conf);
@@ -352,6 +356,7 @@ public class TestWALReplay {
           hbaseRootDir, hri, htd, wal2);
         long seqid2 = region2.getOpenSeqNum();
         assertTrue(seqid2 > -1);
+        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
 
         // I can't close wal1.  Its been appropriated when we split.
         region2.close();
@@ -362,6 +367,78 @@ public class TestWALReplay {
   }
 
   /**
+   * HRegion test case that is made of a major compacted HFile (created with three bulk loaded
+   * files) and an edit in the memstore.
+   * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries
+   * from being replayed"
+   * @throws IOException
+   * @throws IllegalAccessException
+   * @throws NoSuchFieldException
+   * @throws IllegalArgumentException
+   * @throws SecurityException
+   */
+  @Test
+  public void testCompactedBulkLoadedFiles()
+      throws IOException, SecurityException, IllegalArgumentException,
+      NoSuchFieldException, IllegalAccessException, InterruptedException {
+    final TableName tableName =
+        TableName.valueOf("testCompactedBulkLoadedFiles");
+    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
+    deleteDir(basedir);
+    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
+    HRegion region2 = HRegion.createHRegion(hri,
+        hbaseRootDir, this.conf, htd);
+    HRegion.closeHRegion(region2);
+    HLog wal = createWAL(this.conf);
+    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
+
+    // Add an edit so something in the WAL
+    byte [] row = tableName.getName();
+    byte [] family = htd.getFamilies().iterator().next().getName();
+    region.put((new Put(row)).add(family, family, family));
+    wal.sync();
+
+    List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
+    for (int i = 0; i < 3; i++) {
+      Path f = new Path(basedir, "hfile"+i);
+      HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
+          Bytes.toBytes(i + "50"), 10);
+      hfs.add(Pair.newPair(family, f.toString()));
+    }
+    region.bulkLoadHFiles(hfs, true);
+    final int rowsInsertedCount = 31;
+    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
+
+    // major compact to turn all the bulk loaded files into one normal file
+    region.compactStores(true);
+    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
+
+    // Now 'crash' the region by stealing its wal
+    final Configuration newConf = HBaseConfiguration.create(this.conf);
+    User user = HBaseTestingUtility.getDifferentUser(newConf,
+        tableName.getNameAsString());
+    user.runAs(new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        runWALSplit(newConf);
+        HLog wal2 = createWAL(newConf);
+
+        HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
+            hbaseRootDir, hri, htd, wal2);
+        long seqid2 = region2.getOpenSeqNum();
+        assertTrue(seqid2 > -1);
+        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
+
+        // I can't close wal1.  Its been appropriated when we split.
+        region2.close();
+        wal2.closeAndDelete();
+        return null;
+      }
+    });
+  }
+
+
+  /**
    * Test writing edits into an HRegion, closing it, splitting logs, opening
    * Region again.  Verify seqids.
    * @throws IOException
@@ -750,14 +827,14 @@ public class TestWALReplay {
         try {
           final HRegion region =
               new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
-            protected boolean internalFlushcache(
+            protected FlushResult internalFlushcache(
                 final HLog wal, final long myseqid, MonitoredTask status)
             throws IOException {
               LOG.info("InternalFlushCache Invoked");
-              boolean b = super.internalFlushcache(wal, myseqid,
+              FlushResult fs = super.internalFlushcache(wal, myseqid,
                   Mockito.mock(MonitoredTask.class));
               flushcount.incrementAndGet();
-              return b;
+              return fs;
             };
           };
           long seqid = region.initialize();

Added: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileTestUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileTestUtil.java?rev=1590146&view=auto
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileTestUtil.java (added)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileTestUtil.java Fri Apr 25 21:03:15 2014
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+
+import java.io.IOException;
+
+/**
+ * Utility class for HFile-related testing.
+ */
+public class HFileTestUtil {
+
+  /**
+   * Create an HFile with the given number of rows between a given
+   * start key and end key.
+   */
+  public static void createHFile(
+      Configuration configuration,
+      FileSystem fs, Path path,
+      byte[] family, byte[] qualifier,
+      byte[] startKey, byte[] endKey, int numRows) throws IOException
+  {
+    HFile.Writer writer = HFile.getWriterFactory(configuration, new CacheConfig(configuration))
+        .withPath(fs, path)
+        .create();
+    long now = System.currentTimeMillis();
+    try {
+      // subtract 2 since iterateOnSplits doesn't include boundary keys
+      for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) {
+        KeyValue kv = new KeyValue(key, family, qualifier, now, key);
+        writer.append(kv);
+      }
+    } finally {
+      writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+          Bytes.toBytes(System.currentTimeMillis()));
+      writer.close();
+    }
+  }
+}