You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/13 22:24:54 UTC

svn commit: r1183070 - in /hbase/branches/0.92: CHANGES.txt src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java

Author: nspiegelberg
Date: Thu Oct 13 20:24:53 2011
New Revision: 1183070

URL: http://svn.apache.org/viewvc?rev=1183070&view=rev
Log:
HBASE-4078 Validate store files after flush/compaction

Modified:
    hbase/branches/0.92/CHANGES.txt
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java

Modified: hbase/branches/0.92/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/CHANGES.txt?rev=1183070&r1=1183069&r2=1183070&view=diff
==============================================================================
--- hbase/branches/0.92/CHANGES.txt (original)
+++ hbase/branches/0.92/CHANGES.txt Thu Oct 13 20:24:53 2011
@@ -335,6 +335,7 @@ Release 0.92.0 - Unreleased
                output file (Mingjie Lai)
    HBASE-4582  Store.java cleanup (failing TestHeapSize and has warnings)
    HBASE-4556  Fix all incorrect uses of InternalScanner.next(...) (Lars H)
+   HBASE-4078  Validate store files after flush/compaction
 
   TESTS
    HBASE-4492  TestRollingRestart fails intermittently

Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1183070&r1=1183069&r2=1183070&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Oct 13 20:24:53 2011
@@ -520,6 +520,7 @@ public class Store implements HeapSize {
 
     // Write-out finished successfully, move into the right spot
     Path dstPath = StoreFile.getUniqueFile(fs, homedir);
+    validateStoreFile(writer.getPath());
     String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath;
     LOG.info(msg);
     status.setStatus("Flushing " + this + ": " + msg);
@@ -1091,7 +1092,7 @@ public class Store implements HeapSize {
    * nothing made it through the compaction.
    * @throws IOException
    */
-  private StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
+  StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
                                final boolean majorCompaction, final long maxId)
       throws IOException {
     // calculate maximum key count after compaction (for blooms)
@@ -1193,6 +1194,30 @@ public class Store implements HeapSize {
     return writer;
   }
 
+  /**
+   * Validates a store file by opening and closing it. In HFileV2 this should
+   * not be an expensive operation.
+   *
+   * @param path the path to the store file
+   */
+  private void validateStoreFile(Path path)
+      throws IOException {
+    StoreFile storeFile = null;
+    try {
+      storeFile = new StoreFile(this.fs, path, this.conf,
+          this.cacheConf, this.family.getBloomFilterType());
+      storeFile.createReader();
+    } catch (IOException e) {
+      LOG.error("Failed to open store file : " + path
+          + ", keeping it in tmp location", e);
+      throw e;
+    } finally {
+      if (storeFile != null) {
+        storeFile.closeReader();
+      }
+    }
+  }
+
   /*
    * <p>It works by processing a compaction that's been written to disk.
    *
@@ -1212,13 +1237,14 @@ public class Store implements HeapSize {
    * @return StoreFile created. May be null.
    * @throws IOException
    */
-  private StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
+  StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
                                        final StoreFile.Writer compactedFile)
       throws IOException {
     // 1. Moving the new files into place -- if there is a new file (may not
     // be if all cells were expired or deleted).
     StoreFile result = null;
     if (compactedFile != null) {
+      validateStoreFile(compactedFile.getPath());
       Path p = null;
       try {
         p = StoreFile.rename(this.fs, compactedFile.getPath(),

Modified: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1183070&r1=1183069&r2=1183070&view=diff
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Thu Oct 13 20:24:53 2011
@@ -24,14 +24,19 @@ import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.junit.Assert.fail;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.hbase.HConstants;
@@ -44,6 +49,7 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -478,4 +484,41 @@ public class TestCompaction extends HBas
     		"bbb").getBytes(), null);
     loader.flushcache();
   }
+
+  public void testCompactionWithCorruptResult() throws Exception {
+    int nfiles = 10;
+    for (int i = 0; i < nfiles; i++) {
+      createStoreFile(r);
+    }
+    Store store = r.getStore(COLUMN_FAMILY);
+
+    List<StoreFile> storeFiles = store.getStorefiles();
+    long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
+
+    StoreFile.Writer compactedFile = store.compactStore(storeFiles, false, maxId);
+
+    // Now lets corrupt the compacted file.
+    FileSystem fs = cluster.getFileSystem();
+    Path origPath = compactedFile.getPath();
+    Path homedir = store.getHomedir();
+    Path dstPath = new Path(homedir, origPath.getName());
+    FSDataOutputStream stream = fs.create(origPath, null, true, 512, (short) 3,
+        (long) 1024,
+        null);
+    stream.writeChars("CORRUPT FILE!!!!");
+    stream.close();
+
+    try {
+      store.completeCompaction(storeFiles, compactedFile);
+    } catch (Exception e) {
+      // The complete compaction should fail and the corrupt file should remain
+      // in the 'tmp' directory;
+      assert (fs.exists(origPath));
+      assert (!fs.exists(dstPath));
+      System.out.println("testCompactionWithCorruptResult Passed");
+      return;
+    }
+    fail("testCompactionWithCorruptResult failed since no exception was" +
+        "thrown while completing a corrupt file");
+  }
 }