You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 19:41:47 UTC

svn commit: r1181914 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/hbase/regionserver/

Author: nspiegelberg
Date: Tue Oct 11 17:41:46 2011
New Revision: 1181914

URL: http://svn.apache.org/viewvc?rev=1181914&view=rev
Log:
(HBASE-4078) Move corrupt store files out of main store file directory.

Summary:
During compaction of store files, at times the newly created store
file is corrupt. The current logic to handle this is to keep the store file
around but not change the list of store files served. For example if A, B, C
was
compacted to D and D was corrupt, the regionserver still keeps 'D' around but
maintains the current list of store files as A, B, C. The problem now arises
that when the region moves, the new RS sees A, B, C and D and tries to open 'D'
as well and gets an error. This keeps happening as the region moves around.

The fix for now is to move 'D' out of the main store files directory into a
'tmp' location so that regionservers don't pick it up.

Test Plan:
1) Run TestCompaction, TestCompactSelection.
2) Write a unit test which tests this specific case.

Reviewed By: nspiegelberg
Reviewers: kranganathan, kannan, mbautin, nspiegelberg
Commenters: kannan, dhruba, mbautin, liyintang
CC: , hbase@lists, nspiegelberg, kannan, pritam, dhruba,
mbautin, liyintang
Revert Plan:
Tags:

- begin *PUBLIC* platform impact section -
Bugzilla: #
- end platform impact -

Differential Revision: 278958
Task ID: 624621

Added:
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerFileSystemFailure.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1181914&r1=1181913&r2=1181914&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Oct 11 17:41:46 2011
@@ -834,13 +834,16 @@ public class HRegionServer implements HR
    */
   private Throwable cleanup(final Throwable t, final String msg) {
     if (t instanceof NotServingRegionException) {
-        LOG.info(t.toString());
+      // In case of NotServingRegionException we should not make any sanity
+      // checks for the FileSystem or OOM.
+      LOG.info(t.toString());
+      return t;
     } else {
-        if (msg == null) {
-          LOG.error("", RemoteExceptionHandler.checkThrowable(t));
-        } else {
+      if (msg == null) {
+        LOG.error("", RemoteExceptionHandler.checkThrowable(t));
+      } else {
         LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
-        }
+      }
     }
     if (!checkOOME(t)) {
       checkFileSystem();
@@ -903,9 +906,17 @@ public class HRegionServer implements HR
   public boolean checkFileSystem() {
     if (this.fsOk && this.fs != null) {
       try {
-        FSUtils.checkFileSystemAvailable(this.fs);
+        FSUtils.checkFileSystemAvailable(this.fs, false);
       } catch (IOException e) {
         abort("File System not available", e);
+        // Wait for all threads to exit cleanly.
+        join();
+        // Finally attempt to close the Filesystem, to flush out any open streams.
+        try {
+          this.fs.close();
+        } catch (IOException ie) {
+          LOG.error("Could not close FileSystem", ie);
+        }
         this.fsOk = false;
       }
     }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181914&r1=1181913&r2=1181914&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 17:41:46 2011
@@ -482,8 +482,11 @@ public class Store implements HeapSize {
       }
     }
 
-    // Write-out finished successfully, move into the right spot
     Path dstPath = new Path(homedir, fileName);
+
+    validateStoreFile(writer.getPath());
+
+    // Write-out finished successfully, move into the right spot
     LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath);
     fs.rename(writer.getPath(), dstPath);
 
@@ -1009,7 +1012,7 @@ public class Store implements HeapSize {
    * nothing made it through the compaction.
    * @throws IOException
    */
-  private StoreFile.Writer compactStores(final Collection<StoreFile> filesToCompact,
+  StoreFile.Writer compactStores(final Collection<StoreFile> filesToCompact,
                                final boolean majorCompaction, final long maxId)
       throws IOException {
     // calculate maximum key count after compaction (for blooms)
@@ -1091,6 +1094,31 @@ public class Store implements HeapSize {
     return writer;
   }
 
+  /**
+   * Validates a store file by opening and closing it. In HFileV2 this should
+   * not be an expensive operation.
+   *
+   * @param path
+   *          the path to the store file
+   */
+  private void validateStoreFile(Path path)
+      throws IOException {
+    StoreFile storeFile = null;
+    try {
+      storeFile = new StoreFile(this.fs, path, blockcache, this.conf,
+          this.family.getBloomFilterType(), this.inMemory);
+      storeFile.createReader();
+    } catch (IOException e) {
+      LOG.error("Failed to open store file : " + path
+          + ", keeping it in tmp location", e);
+      throw e;
+    } finally {
+      if (storeFile != null) {
+        storeFile.closeReader();
+      }
+    }
+  }
+
   /*
    * <p>It works by processing a compaction that's been written to disk.
    *
@@ -1110,16 +1138,19 @@ public class Store implements HeapSize {
    * @return StoreFile created. May be null.
    * @throws IOException
    */
-  private StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
+  StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
                                        final StoreFile.Writer compactedFile)
       throws IOException {
     // 1. Moving the new files into place -- if there is a new file (may not
     // be if all cells were expired or deleted).
     StoreFile result = null;
     if (compactedFile != null) {
-      // Move file into the right spot
       Path origPath = compactedFile.getPath();
       Path dstPath = new Path(homedir, origPath.getName());
+
+      validateStoreFile(origPath);
+
+      // Move file into the right spot
       LOG.info("Renaming compacted file at " + origPath + " to " + dstPath);
       if (!fs.rename(origPath, dstPath)) {
         LOG.error("Failed move of compacted file " + origPath + " to " +

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1181914&r1=1181913&r2=1181914&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Tue Oct 11 17:41:46 2011
@@ -113,13 +113,23 @@ public class FSUtils {
     return p;
   }
 
+  public static void checkFileSystemAvailable(final FileSystem fs)
+      throws IOException {
+    checkFileSystemAvailable(fs, true);
+  }
+
   /**
    * Checks to see if the specified file system is available
    *
-   * @param fs filesystem
-   * @throws IOException e
+   * @param fs
+   *          filesystem
+   * @param shutdown
+   *          whether or not to shutdown the filesystem.
+   * @throws IOException
+   *           e
    */
-  public static void checkFileSystemAvailable(final FileSystem fs)
+  public static void checkFileSystemAvailable(final FileSystem fs,
+      boolean shutdown)
   throws IOException {
     if (!(fs instanceof DistributedFileSystem)) {
       return;
@@ -133,10 +143,12 @@ public class FSUtils {
     } catch (IOException e) {
       exception = RemoteExceptionHandler.checkIOException(e);
     }
-    try {
-      fs.close();
-    } catch (Exception e) {
+    if (shutdown) {
+      try {
+        fs.close();
+      } catch (Exception e) {
         LOG.error("file system close failed: ", e);
+      }
     }
     IOException io = new IOException("File system is not available");
     io.initCause(exception);

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1181914&r1=1181913&r2=1181914&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Tue Oct 11 17:41:46 2011
@@ -21,16 +21,22 @@ package org.apache.hadoop.hbase.regionse
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
+import static org.junit.Assert.fail;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -457,4 +463,42 @@ public class TestCompaction extends HBas
     		"bbb").getBytes(), null);
     loader.flushcache();
   }
+
+  public void testCompactionWithCorruptResult() throws Exception {
+    int nfiles = 10;
+    for (int i = 0; i < nfiles; i++) {
+      createStoreFile(r);
+    }
+    Store store = r.getStore(COLUMN_FAMILY);
+
+    List<StoreFile> storeFiles = store.getStorefiles();
+    long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
+
+    StoreFile.Writer compactedFile = store.compactStores(storeFiles, false, maxId);
+
+    // Now lets corrupt the compacted file.
+    FileSystem fs = cluster.getFileSystem();
+    Path origPath = compactedFile.getPath();
+    Path homedir = store.getHomedir();
+    Path dstPath = new Path(homedir, origPath.getName());
+    FSDataOutputStream stream = fs.create(origPath, null, true, 512, (short) 3,
+        (long) 1024,
+        null);
+    stream.writeChars("CORRUPT FILE!!!!");
+    stream.close();
+
+    try {
+      store.completeCompaction(storeFiles, compactedFile);
+    } catch (Exception e) {
+      // The complete compaction should fail and the corrupt file should remain
+      // in the 'tmp' directory;
+      assert (fs.exists(origPath));
+      assert (!fs.exists(dstPath));
+      e.printStackTrace();
+      System.out.println("testCompactionWithCorruptResult Passed");
+      return;
+    }
+    fail("testCompactionWithCorruptResult failed since no exception was" +
+        "thrown while completing a corrupt file");
+  }
 }

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerFileSystemFailure.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerFileSystemFailure.java?rev=1181914&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerFileSystemFailure.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerFileSystemFailure.java Tue Oct 11 17:41:46 2011
@@ -0,0 +1,87 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import static org.junit.Assert.*;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHRegionServerFileSystemFailure {
+  private static final Log LOG = LogFactory
+      .getLog(TestHRegionServerFileSystemFailure.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static byte[][] FAMILIES = { Bytes.toBytes("f1"),
+      Bytes.toBytes("f2"), Bytes.toBytes("f3"), Bytes.toBytes("f4") };
+  private static final int nLoaders = 10;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private static class TableLoader extends Thread {
+    private final HTable table;
+
+    public TableLoader(HTable table) {
+      this.table = table;
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        try {
+          for (int i = 0; i < FAMILIES.length; i++) {
+            byte[] columnFamily = FAMILIES[i];
+            TEST_UTIL.loadTable(table, columnFamily);
+          }
+        } catch (IOException e) {
+          LOG.warn(e);
+          break;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testHRegionServerFileSystemFailure() throws Exception {
+    // Build some data.
+    byte[] tableName = Bytes.toBytes("testCloseHRegion");
+    TEST_UTIL.createTable(tableName, FAMILIES);
+    HTable table = new HTable(tableName);
+    for (int i = 0; i < FAMILIES.length; i++) {
+      byte[] columnFamily = FAMILIES[i];
+      TEST_UTIL.createMultiRegions(table, columnFamily);
+    }
+
+    for (int i = 0; i < nLoaders; i++) {
+      new TableLoader(table).start();
+    }
+
+    // Wait for loaders to build up some data.
+    Thread.sleep(10000);
+
+    // Pick a regionserver.
+    Configuration conf = TEST_UTIL.getConfiguration();
+    HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
+
+    // Bring down HDFS.
+    TEST_UTIL.shutdownMiniDFSCluster();
+
+    // Verify checkFileSystem returns false and doesn't throw Exceptions.
+    assertFalse(server.checkFileSystem());
+  }
+}