You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/07/27 19:21:18 UTC

svn commit: r798225 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/io/hfile/ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/io/hfile/ src/test/org/apache/hadoop/hbase/reg...

Author: stack
Date: Mon Jul 27 17:21:17 2009
New Revision: 798225

URL: http://svn.apache.org/viewvc?rev=798225&view=rev
Log:
HBASE-1686 major compaction can create empty store files, causing AIOOB when trying to read

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/hfile/TestHFile.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Jul 27 17:21:17 2009
@@ -289,7 +289,8 @@
    HBASE-1709  Thrift getRowWithColumns doesn't accept column-family only
                (Mathias Lehmann via Stack)
    HBASE-1692  Web UI is extremely slow / freezes up if you have many tables
-   
+   HBASE-1686  major compaction can create empty store files, causing AIOOB
+               when trying to read
 
   IMPROVEMENTS
    HBASE-1089  Add count of regions on filesystem to master UI; add percentage

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java Mon Jul 27 17:21:17 2009
@@ -643,6 +643,7 @@
    * @return Key as a String.
    */
   public static String keyToString(final byte [] b, final int o, final int l) {
+    if (b == null) return "";
     int rowlength = Bytes.toShort(b, o);
     String row = Bytes.toStringBinary(b, o + Bytes.SIZEOF_SHORT, rowlength);
     int columnoffset = o + Bytes.SIZEOF_SHORT + 1 + rowlength;

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java Mon Jul 27 17:21:17 2009
@@ -974,13 +974,13 @@
     }
 
     /**
-     * @return First key in the file.
+     * @return First key in the file.  May be null if file has no entries.
      */
     public byte [] getFirstKey() {
       if (blockIndex == null) {
         throw new RuntimeException("Block index not loaded");
       }
-      return blockIndex.blockKeys[0];
+      return this.blockIndex.isEmpty()? null: this.blockIndex.blockKeys[0];
     }
 
     public int getEntries() {
@@ -991,13 +991,13 @@
     }
 
     /**
-     * @return Last key in the file.
+     * @return Last key in the file.  May be null if file has no entries.
      */
     public byte [] getLastKey() {
       if (!isFileInfoLoaded()) {
         throw new RuntimeException("Load file info first");
       }
-      return this.lastkey;
+      return this.blockIndex.isEmpty()? null: this.lastkey;
     }
 
     /**

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Mon Jul 27 17:21:17 2009
@@ -619,19 +619,18 @@
    * thread must be able to block for long periods.
    * 
    * <p>During this time, the Store can work as usual, getting values from
-   * MapFiles and writing new MapFiles from the memstore.
+   * StoreFiles and writing new StoreFiles from the memstore.
    * 
-   * Existing MapFiles are not destroyed until the new compacted TreeMap is 
+   * Existing StoreFiles are not destroyed until the new compacted StoreFile is 
    * completely written-out to disk.
    *
-   * The compactLock prevents multiple simultaneous compactions.
+   * <p>The compactLock prevents multiple simultaneous compactions.
    * The structureLock prevents us from interfering with other write operations.
    * 
-   * We don't want to hold the structureLock for the whole time, as a compact() 
+   * <p>We don't want to hold the structureLock for the whole time, as a compact() 
    * can be lengthy and we want to allow cache-flushes during this period.
    * 
-   * @param mc True to force a major compaction regardless of
-   * thresholds
+   * @param mc True to force a major compaction regardless of thresholds
    * @return row to split around if a split is needed, null otherwise
    * @throws IOException
    */
@@ -639,29 +638,32 @@
     boolean forceSplit = this.regioninfo.shouldSplit(false);
     boolean majorcompaction = mc;
     synchronized (compactLock) {
-      long maxId = -1;
       // filesToCompact are sorted oldest to newest.
-      List<StoreFile> filesToCompact = null;
-      filesToCompact = new ArrayList<StoreFile>(this.storefiles.values());
-      if (filesToCompact.size() <= 0) {
+      List<StoreFile> filesToCompact =
+        new ArrayList<StoreFile>(this.storefiles.values());
+      if (filesToCompact.isEmpty()) {
         LOG.debug(this.storeNameStr + ": no store files to compact");
         return null;
       }
-      // The max-sequenceID in any of the to-be-compacted TreeMaps is the 
-      // last key of storefiles.
-      maxId = this.storefiles.lastKey().longValue();
+
+      // Max-sequenceID is the last key of the storefiles TreeMap
+      long maxId = this.storefiles.lastKey().longValue();
+
       // Check to see if we need to do a major compaction on this region.
       // If so, change doMajorCompaction to true to skip the incremental
       // compacting below. Only check if doMajorCompaction is not true.
       if (!majorcompaction) {
         majorcompaction = isMajorCompaction(filesToCompact);
       }
+
       boolean references = hasReferences(filesToCompact);
       if (!majorcompaction && !references && 
           (forceSplit || (filesToCompact.size() < compactionThreshold))) {
         return checkSplit(forceSplit);
       }
-      if (!fs.exists(this.regionCompactionDir) && !fs.mkdirs(this.regionCompactionDir)) {
+
+      if (!fs.exists(this.regionCompactionDir) &&
+          !fs.mkdirs(this.regionCompactionDir)) {
         LOG.warn("Mkdir on " + this.regionCompactionDir.toString() + " failed");
         return checkSplit(forceSplit);
       }
@@ -670,7 +672,7 @@
       // selection.
       int countOfFiles = filesToCompact.size();
       long totalSize = 0;
-      long[] fileSizes = new long[countOfFiles];
+      long [] fileSizes = new long[countOfFiles];
       long skipped = 0;
       int point = 0;
       for (int i = 0; i < countOfFiles; i++) {
@@ -689,6 +691,7 @@
         fileSizes[i] = len;
         totalSize += len;
       }
+ 
       if (!majorcompaction && !references) {
         // Here we select files for incremental compaction.  
         // The rule is: if the largest(oldest) one is more than twice the 
@@ -719,26 +722,17 @@
         }
       }
  
-      // Step through them, writing to the brand-new file
-      HFile.Writer writer = getWriter(this.regionCompactionDir);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Started compaction of " + filesToCompact.size() + " file(s)" +
-          (references? ", hasReferences=true,": " ") + " into " +
-          FSUtils.getPath(writer.getPath()));
-      }
-      try {
-        compact(writer, filesToCompact, majorcompaction);
-      } finally {
-        // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
-        StoreFile.appendMetadata(writer, maxId, majorcompaction);
-        writer.close();
-      }
-
+      // Ready to go.  Have list of files to compact.
+      LOG.debug("Started compaction of " + filesToCompact.size() + " file(s)" +
+        (references? ", hasReferences=true,": " ") + " into " +
+          FSUtils.getPath(this.regionCompactionDir) + ", seqid=" + maxId);
+      HFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
       // Move the compaction into place.
-      completeCompaction(filesToCompact, writer);
+      StoreFile sf = completeCompaction(filesToCompact, writer);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Completed" + (majorcompaction? " major ": " ") +
           "compaction of " + this.storeNameStr +
+          "; new storefile is " + (sf == null? "none": sf.toString()) +
           "; store size is " + StringUtils.humanReadableInt(storeSize));
       }
     }
@@ -834,16 +828,17 @@
    * @param writer output writer
    * @param filesToCompact which files to compact
    * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
+   * @param maxId Readers maximum sequence id.
+   * @return Product of compaction or null if all cells expired or deleted and
+   * nothing made it through the compaction.
    * @throws IOException
    */
-  private void compact(HFile.Writer writer,
-                       List<StoreFile> filesToCompact,
-                       boolean majorCompaction) throws IOException {
-    // for each file, obtain a scanner:
+  private HFile.Writer compact(final List<StoreFile> filesToCompact,
+      final boolean majorCompaction, final long maxId)
+  throws IOException {
+    // For each file, obtain a scanner:
     KeyValueScanner [] scanners = new KeyValueScanner[filesToCompact.size()];
-    // init:
     for (int i = 0; i < filesToCompact.size(); ++i) {
-      // TODO open a new HFile.Reader w/o block cache.
       Reader r = filesToCompact.get(i).getReader();
       if (r == null) {
         LOG.warn("StoreFile " + filesToCompact.get(i) + " has a null Reader");
@@ -853,82 +848,97 @@
       scanners[i] = new StoreFileScanner(r.getScanner(false));
     }
 
+    // Make the instantiation lazy in case compaction produces no product; i.e.
+    // where all source cells are expired or deleted.
+    HFile.Writer writer = null;
+    try {
     if (majorCompaction) {
       InternalScanner scanner = null;
-
       try {
         Scan scan = new Scan();
         scan.setMaxVersions(family.getMaxVersions());
         scanner = new StoreScanner(this, scan, scanners);
-
         // since scanner.next() can return 'false' but still be delivering data,
         // we have to use a do/while loop.
-        ArrayList<KeyValue> row = new ArrayList<KeyValue>();
+        ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
         boolean more = true;
-        while ( more ) {
-          more = scanner.next(row);
+        while (more) {
+          more = scanner.next(kvs);
           // output to writer:
-          for (KeyValue kv : row) {
+          for (KeyValue kv : kvs) {
+            if (writer == null) writer = getWriter(this.regionCompactionDir);
             writer.append(kv);
           }
-          row.clear();
+          kvs.clear();
         }
       } finally {
-        if (scanner != null)
-          scanner.close();
+        if (scanner != null) scanner.close();
       }
     } else {
       MinorCompactingStoreScanner scanner = null;
       try {
         scanner = new MinorCompactingStoreScanner(this, scanners);
-
-        while ( scanner.next(writer) ) { }
+        writer = getWriter(this.regionCompactionDir);
+        while (scanner.next(writer)) {
+          // Nothing to do
+        }
       } finally {
         if (scanner != null)
           scanner.close();
       }
     }
-
+    } finally {
+      if (writer != null) {
+        StoreFile.appendMetadata(writer, maxId, majorCompaction);
+        writer.close();
+      }
+    }
+    return writer;
   }
 
   /*
    * It's assumed that the compactLock  will be acquired prior to calling this 
    * method!  Otherwise, it is not thread-safe!
    *
-   * It works by processing a compaction that's been written to disk.
+   * <p>It works by processing a compaction that's been written to disk.
    * 
    * <p>It is usually invoked at the end of a compaction, but might also be
    * invoked at HStore startup, if the prior execution died midway through.
    * 
    * <p>Moving the compacted TreeMap into place means:
    * <pre>
-   * 1) Moving the new compacted MapFile into place
-   * 2) Unload all replaced MapFiles, close and collect list to delete.
+   * 1) Moving the new compacted StoreFile into place
+   * 2) Unload all replaced StoreFile, close and collect list to delete.
    * 3) Loading the new TreeMap.
    * 4) Compute new store size
    * </pre>
    * 
    * @param compactedFiles list of files that were compacted
-   * @param compactedFile HStoreFile that is the result of the compaction
+   * @param compactedFile StoreFile that is the result of the compaction
+   * @return StoreFile created. May be null.
    * @throws IOException
    */
-  private void completeCompaction(final List<StoreFile> compactedFiles,
+  private StoreFile completeCompaction(final List<StoreFile> compactedFiles,
     final HFile.Writer compactedFile)
   throws IOException {
-    // 1. Moving the new files into place.
-    Path p = null;
-    try {
-      p = StoreFile.rename(this.fs, compactedFile.getPath(),
-        StoreFile.getRandomFilename(fs, this.homedir));
-    } catch (IOException e) {
-      LOG.error("Failed move of compacted file " + compactedFile.getPath(), e);
-      return;
+    // 1. Moving the new files into place -- if there is a new file (may not
+    // be if all cells were expired or deleted).
+    StoreFile result = null;
+    if (compactedFile != null) {
+      Path p = null;
+      try {
+        p = StoreFile.rename(this.fs, compactedFile.getPath(),
+          StoreFile.getRandomFilename(fs, this.homedir));
+      } catch (IOException e) {
+        LOG.error("Failed move of compacted file " + compactedFile.getPath(), e);
+        return null;
+      }
+      result = new StoreFile(this.fs, p, blockcache, this.conf, this.inMemory);
     }
-    StoreFile finalCompactedFile = new StoreFile(this.fs, p, blockcache, 
-      this.conf, this.inMemory);
     this.lock.writeLock().lock();
     try {
       try {
+        // 2. Unloading
         // 3. Loading the new TreeMap.
         // Change this.storefiles so it reflects new state but do not
         // delete old store files until we have sent out notification of
@@ -939,10 +949,12 @@
             this.storefiles.remove(e.getKey());
           }
         }
-        // Add new compacted Reader and store file.
-        Long orderVal = Long.valueOf(finalCompactedFile.getMaxSequenceId());
-        this.storefiles.put(orderVal, finalCompactedFile);
-        // Tell observers that list of Readers has changed.
+        // If a StoreFile result, move it into place.  May be null.
+        if (result != null) {
+          Long orderVal = Long.valueOf(result.getMaxSequenceId());
+          this.storefiles.put(orderVal, result);
+        }
+        // Tell observers that list of StoreFiles has changed.
         notifyChangedReadersObservers();
         // Finally, delete old store files.
         for (StoreFile hsf: compactedFiles) {
@@ -950,11 +962,10 @@
         }
       } catch (IOException e) {
         e = RemoteExceptionHandler.checkIOException(e);
-        LOG.error("Failed replacing compacted files for " +
-            this.storeNameStr +
-            ". Compacted file is " + finalCompactedFile.toString() +
-            ".  Files replaced are " + compactedFiles.toString() +
-            " some of which may have been already removed", e);
+        LOG.error("Failed replacing compacted files in " + this.storeNameStr +
+          ". Compacted file is " + (result == null? "none": result.toString()) +
+          ".  Files replaced " + compactedFiles.toString() +
+          " some of which may have been already removed", e);
       }
       // 4. Compute new store size
       this.storeSize = 0L;
@@ -969,6 +980,7 @@
     } finally {
       this.lock.writeLock().unlock();
     }
+    return result;
   }
 
   // ////////////////////////////////////////////////////////////////////////////

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java Mon Jul 27 17:21:17 2009
@@ -62,7 +62,7 @@
   public void get(List<KeyValue> result) throws IOException {
     for(HFileScanner scanner : this.scanners) {
       this.matcher.update();
-      if(getStoreFile(scanner, result) || matcher.isDone()) {
+      if (getStoreFile(scanner, result) || matcher.isDone()) {
         return;
       }
     }
@@ -77,11 +77,13 @@
    */
   public boolean getStoreFile(HFileScanner scanner, List<KeyValue> result) 
   throws IOException {
-    if(scanner.seekTo(startKey.getBuffer(), startKey.getKeyOffset(),
+    if (scanner.seekTo(startKey.getBuffer(), startKey.getKeyOffset(),
         startKey.getKeyLength()) == -1) {
       // No keys in StoreFile at or after specified startKey
       // First row may be = our row, so we have to check anyways.
       byte [] firstKey = scanner.getReader().getFirstKey();
+      // Key may be null if storefile is empty.
+      if (firstKey == null) return false;
       short rowLen = Bytes.toShort(firstKey, 0, Bytes.SIZEOF_SHORT);
       int rowOffset = Bytes.SIZEOF_SHORT;
       if (this.matcher.rowComparator.compareRows(firstKey, rowOffset, rowLen,

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/hfile/TestHFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/hfile/TestHFile.java?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/hfile/TestHFile.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/hfile/TestHFile.java Mon Jul 27 17:21:17 2009
@@ -52,6 +52,21 @@
   private final int minBlockSize = 512;
   private static String localFormatter = "%010d";
 
+  /**
+   * Test empty HFile.
+   * Test all features work reasonably when hfile is empty of entries.
+   * @throws IOException 
+   */
+  public void testEmptyHFile() throws IOException {
+    Path f = new Path(ROOT_DIR, getName());
+    Writer w = new Writer(this.fs, f);
+    w.close();
+    Reader r = new Reader(fs, f, null, false);
+    r.loadFileInfo();
+    assertNull(r.getFirstKey());
+    assertNull(r.getLastKey());
+  }
+
   // write some records into the tfile
   // write them twice
   private int writeSomeRecords(Writer writer, int start, int n)
@@ -213,7 +228,7 @@
     reader.loadFileInfo();
     assertNull(reader.getMetaBlock("non-existant"));
   }
-  
+
   /**
    * Make sure the orginals for our compression libs doesn't change on us.
    */

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java Mon Jul 27 17:21:17 2009
@@ -20,6 +20,8 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -27,13 +29,16 @@
 import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 
+
 /**
  * Test compactions
  */
@@ -83,7 +88,41 @@
     }
     super.tearDown();
   }
-  
+
+  /**
+   * Test that on a major compaction, if all cells are expired or deleted, then
+   * we'll end up with no product.  Make sure scanner over region returns
+   * right answer in this case - and that it just basically works.
+   * @throws IOException
+   */
+  public void testMajorCompactingToNoOutput() throws IOException {
+    createStoreFile(r);
+    for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+      createStoreFile(r);
+    }
+    // Now delete everything.
+    InternalScanner s = r.getScanner(new Scan());
+    do {
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      boolean result = s.next(results);
+      r.delete(new Delete(results.get(0).getRow()), null, false);
+      if (!result) break;
+    } while(true);
+    // Flush
+    r.flushcache();
+    // Major compact.
+    r.compactStores(true);
+    s = r.getScanner(new Scan());
+    int counter = 0;
+    do {
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      boolean result = s.next(results);
+      if (!result) break;
+      counter++;
+    } while(true);
+    assertEquals(0, counter);
+  }
+
   /**
    * Run compaction and flushing memstore
    * Assert deletes get cleaned up.
@@ -195,7 +234,7 @@
     count = count();
     assertTrue(count == 0);
   }
-  
+
   private int count() throws IOException {
     int count = 0;
     for (StoreFile f: this.r.stores.

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java Mon Jul 27 17:21:17 2009
@@ -8,6 +8,8 @@
 import java.util.NavigableSet;
 import java.util.concurrent.ConcurrentSkipListSet;
 
+import junit.framework.TestCase;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -16,11 +18,10 @@
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.Progressable;
 
-import junit.framework.TestCase;
-
 /**
  * Test class fosr the Store 
  */
@@ -87,6 +88,38 @@
   //////////////////////////////////////////////////////////////////////////////
   // Get tests
   //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Test for hbase-1686.
+   * @throws IOException
+   */
+  public void testEmptyStoreFile() throws IOException {
+    init(this.getName());
+    // Write a store file.
+    this.store.add(new KeyValue(row, family, qf1, null));
+    this.store.add(new KeyValue(row, family, qf2, null));
+    flush(1);
+    // Now put in place an empty store file.  Its a little tricky.  Have to
+    // do manually with hacked in sequence id.
+    StoreFile f = this.store.getStorefiles().firstEntry().getValue();
+    Path storedir = f.getPath().getParent();
+    long seqid = f.getMaxSequenceId();
+    HBaseConfiguration c = new HBaseConfiguration();
+    FileSystem fs = FileSystem.get(c);
+    Writer w = StoreFile.getWriter(fs, storedir);
+    StoreFile.appendMetadata(w, seqid + 1);
+    w.close();
+    this.store.close();
+    // Reopen it... should pick up two files
+    this.store = new Store(storedir.getParent().getParent(),
+      this.store.getHRegionInfo(),
+      this.store.getFamily(), fs, null, c, null);
+    System.out.println(this.store.getHRegionInfo().getEncodedName());
+    assertEquals(2, this.store.getStorefilesCount());
+    this.store.get(get, qualifiers, result);
+    assertEquals(1, result.size());
+  }
+
   /**
    * Getting data from memstore only
    * @throws IOException