You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2012/08/28 03:55:22 UTC

svn commit: r1377941 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/regionserver/wal/ main/java/org/apache/hadoop/hbase/util/ main/java/org/apache/had...

Author: jmhsieh
Date: Tue Aug 28 01:55:21 2012
New Revision: 1377941

URL: http://svn.apache.org/viewvc?rev=1377941&view=rev
Log:
HBASE-6586 Quarantine Corrupted HFiles with hbck

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CorruptHFileException.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/DoNotRetryIOException.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/DoNotRetryIOException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/DoNotRetryIOException.java?rev=1377941&r1=1377940&r2=1377941&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/DoNotRetryIOException.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/DoNotRetryIOException.java Tue Aug 28 01:55:21 2012
@@ -19,8 +19,6 @@
  */
 package org.apache.hadoop.hbase;
 
-import java.io.IOException;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -30,7 +28,7 @@ import org.apache.hadoop.classification.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class DoNotRetryIOException extends IOException {
+public class DoNotRetryIOException extends HBaseIOException {
 
   private static final long serialVersionUID = 1197446454511704139L;
 

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java?rev=1377941&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java Tue Aug 28 01:55:21 2012
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * All hbase specific IOExceptions should be subclasses of HBaseIOException
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class HBaseIOException extends IOException {
+
+  private static final long serialVersionUID = 1L;
+
+  public HBaseIOException() {
+    super();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public HBaseIOException(String message) {
+    super(message);
+  }
+
+  /**
+   * {@inheritDoc}
+   **/
+  public HBaseIOException(String message, Throwable cause) {
+      super(message, cause);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public HBaseIOException(Throwable cause) {
+      super(cause);
+  }}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CorruptHFileException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CorruptHFileException.java?rev=1377941&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CorruptHFileException.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CorruptHFileException.java Tue Aug 28 01:55:21 2012
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+
+/**
+ * This exception is thrown when attempts to read an HFile fail due to corruption or truncation
+ * issues.
+ */
+@InterfaceAudience.Private
+public class CorruptHFileException extends DoNotRetryIOException {
+  private static final long serialVersionUID = 1L;
+
+  CorruptHFileException(String m, Throwable t) {
+    super(m, t);
+  }
+
+  CorruptHFileException(String m) {
+    super(m);
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java?rev=1377941&r1=1377940&r2=1377941&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java Tue Aug 28 01:55:21 2012
@@ -19,6 +19,9 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import static org.apache.hadoop.hbase.io.hfile.HFile.MAX_FORMAT_VERSION;
+import static org.apache.hadoop.hbase.io.hfile.HFile.MIN_FORMAT_VERSION;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -34,9 +37,6 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.RawComparator;
 
-import static org.apache.hadoop.hbase.io.hfile.HFile.MIN_FORMAT_VERSION;
-import static org.apache.hadoop.hbase.io.hfile.HFile.MAX_FORMAT_VERSION;
-
 import com.google.common.io.NullOutputStream;
 
 /**
@@ -322,12 +322,7 @@ public class FixedFileTrailer {
     int majorVersion = extractMajorVersion(version);
     int minorVersion = extractMinorVersion(version);
 
-    try {
-      HFile.checkFormatVersion(majorVersion);
-    } catch (IllegalArgumentException iae) {
-      // In this context, an invalid version might indicate a corrupt HFile.
-      throw new IOException(iae);
-    }
+    HFile.checkFormatVersion(majorVersion); // throws IAE if invalid
 
     int trailerSize = getTrailerSize(majorVersion);
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1377941&r1=1377940&r2=1377941&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Tue Aug 28 01:55:21 2012
@@ -29,7 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -521,12 +520,32 @@ public class HFile {
     DataBlockEncoding getEncodingOnDisk();
   }
 
+  /**
+   * Method returns the reader given the specified arguments.
+   * TODO This is a bad abstraction.  See HBASE-6635.
+   *
+   * @param path hfile's path
+   * @param fsdis an open checksummed stream of path's file
+   * @param fsdisNoFsChecksum an open unchecksummed stream of path's file
+   * @param size max size of the trailer.
+   * @param closeIStream boolean for closing file after the getting the reader version.
+   * @param cacheConf Cache configuation values, cannot be null.
+   * @param preferredEncodingInCache
+   * @param hfs
+   * @return an appropriate instance of HFileReader
+   * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
+   */
   private static Reader pickReaderVersion(Path path, FSDataInputStream fsdis,
       FSDataInputStream fsdisNoFsChecksum,
       long size, boolean closeIStream, CacheConfig cacheConf,
       DataBlockEncoding preferredEncodingInCache, HFileSystem hfs)
       throws IOException {
-    FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, size);
+    FixedFileTrailer trailer = null;
+    try {
+      trailer = FixedFileTrailer.readFromStream(fsdis, size);
+    } catch (IllegalArgumentException iae) {
+      throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, iae);
+    }
     switch (trailer.getMajorVersion()) {
     case 1:
       return new HFileReaderV1(path, trailer, fsdis, size, closeIStream,
@@ -536,11 +555,18 @@ public class HFile {
           size, closeIStream,
           cacheConf, preferredEncodingInCache, hfs);
     default:
-      throw new IOException("Cannot instantiate reader for HFile version " +
-          trailer.getMajorVersion());
+      throw new CorruptHFileException("Invalid HFile version " + trailer.getMajorVersion());
     }
   }
 
+  /**
+   * @param fs A file system
+   * @param path Path to HFile
+   * @param cacheConf Cache configuration for hfile's contents
+   * @param preferredEncodingInCache Preferred in-cache data encoding algorithm.
+   * @return A version specific Hfile Reader
+   * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
+   */
   public static Reader createReaderWithEncoding(
       FileSystem fs, Path path, CacheConfig cacheConf,
       DataBlockEncoding preferredEncodingInCache) throws IOException {
@@ -570,7 +596,8 @@ public class HFile {
    * @param fs filesystem
    * @param path Path to file to read
    * @param cacheConf This must not be null.  @see {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#CacheConfig(Configuration)}
-   * @return an active Reader instance.
+   * @return an active Reader instance
+   * @throws IOException Will throw a CorruptHFileException (DoNotRetryIOException subtype) if hfile is corrupt/invalid.
    */
   public static Reader createReader(
       FileSystem fs, Path path, CacheConfig cacheConf) throws IOException {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1377941&r1=1377940&r2=1377941&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Tue Aug 28 01:55:21 2012
@@ -498,7 +498,7 @@ public class HLogSplitter {
       final List<Path> processedLogs, final Path oldLogDir,
       final FileSystem fs, final Configuration conf) throws IOException {
     final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
-        "hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
+        "hbase.regionserver.hlog.splitlog.corrupt.dir",  HConstants.CORRUPT_DIR_NAME));
 
     if (!fs.mkdirs(corruptDir)) {
       LOG.info("Unable to mkdir " + corruptDir);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1377941&r1=1377940&r2=1377941&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Tue Aug 28 01:55:21 2012
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,6 +49,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.ClusterId;
 import org.apache.hadoop.hbase.DeserializationException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -1033,6 +1035,114 @@ public abstract class FSUtils {
   }
 
   /**
+   * Filter for all dirs that don't start with '.'
+   */
+  public static class RegionDirFilter implements PathFilter {
+    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
+    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
+    final FileSystem fs;
+
+    public RegionDirFilter(FileSystem fs) {
+      this.fs = fs;
+    }
+
+    @Override
+    public boolean accept(Path rd) {
+      if (!regionDirPattern.matcher(rd.getName()).matches()) {
+        return false;
+      }
+
+      try {
+        return fs.getFileStatus(rd).isDir();
+      } catch (IOException ioe) {
+        // Maybe the file was moved or the fs was disconnected.
+        LOG.warn("Skipping file " + rd +" due to IOException", ioe);
+        return false;
+      }
+    }
+  }
+
+  /**
+   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
+   * .tableinfo
+   * @param fs A file system for the Path
+   * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir>
+   * @return List of paths to valid region directories in table dir.
+   * @throws IOException
+   */
+  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
+    // assumes we are in a table dir.
+    FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
+    List<Path> regionDirs = new ArrayList<Path>(rds.length);
+    for (FileStatus rdfs: rds) {
+      Path rdPath = rdfs.getPath();
+      regionDirs.add(rdPath);
+    }
+    return regionDirs;
+  }
+
+  /**
+   * Filter for all dirs that are legal column family names.  This is generally used for colfam
+   * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
+   */
+  public static class FamilyDirFilter implements PathFilter {
+    final FileSystem fs;
+
+    public FamilyDirFilter(FileSystem fs) {
+      this.fs = fs;
+    }
+
+    @Override
+    public boolean accept(Path rd) {
+      try {
+        // throws IAE if invalid
+        HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName()));
+      } catch (IllegalArgumentException iae) {
+        // path name is an invalid family name and thus is excluded.
+        return false;
+      }
+
+      try {
+        return fs.getFileStatus(rd).isDir();
+      } catch (IOException ioe) {
+        // Maybe the file was moved or the fs was disconnected.
+        LOG.warn("Skipping file " + rd +" due to IOException", ioe);
+        return false;
+      }
+    }
+  }
+
+  /**
+   * Filter for HFiles that excludes reference files.
+   */
+  public static class HFileFilter implements PathFilter {
+    // This pattern will accept 0.90+ style hex hfies files but reject reference files
+    final public static Pattern hfilePattern = Pattern.compile("^([0-9a-f]+)$");
+
+    final FileSystem fs;
+
+    public HFileFilter(FileSystem fs) {
+      this.fs = fs;
+    }
+
+    @Override
+    public boolean accept(Path rd) {
+      if (!hfilePattern.matcher(rd.getName()).matches()) {
+        return false;
+      }
+
+      try {
+        // only files
+        return !fs.getFileStatus(rd).isDir();
+      } catch (IOException ioe) {
+        // Maybe the file was moved or the fs was disconnected.
+        LOG.warn("Skipping file " + rd +" due to IOException", ioe);
+        return false;
+      }
+    }
+  }
+
+  /**
    * @param conf
    * @return Returns the filesystem of the hbase rootdir.
    * @throws IOException

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1377941&r1=1377940&r2=1377941&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Tue Aug 28 01:55:21 2012
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -36,6 +37,7 @@ import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -83,6 +85,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
+import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
 import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
 import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
 import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
@@ -164,8 +167,10 @@ public class HBaseFsck {
   private HConnection connection;
   private HBaseAdmin admin;
   private HTable meta;
-  private ScheduledThreadPoolExecutor executor; // threads to retrieve data from regionservers
+  protected ExecutorService executor; // threads to retrieve data from regionservers
   private long startMillis = System.currentTimeMillis();
+  private HFileCorruptionChecker hfcc;
+  private int retcode = 0;
 
   /***********
    * Options
@@ -243,6 +248,22 @@ public class HBaseFsck {
   }
 
   /**
+   * Constructor
+   *
+   * @param conf
+   *          Configuration object
+   * @throws MasterNotRunningException
+   *           if the master is not running
+   * @throws ZooKeeperConnectionException
+   *           if unable to connect to ZooKeeper
+   */
+  public HBaseFsck(Configuration conf, ExecutorService exec) throws MasterNotRunningException,
+      ZooKeeperConnectionException, IOException {
+    this.conf = conf;
+    this.executor = exec;
+  }
+
+  /**
    * To repair region consistency, one must call connect() in order to repair
    * online state.
    */
@@ -3084,6 +3105,10 @@ public class HBaseFsck {
     tablesIncluded.add(table);
   }
 
+  Set<String> getIncludedTables() {
+    return new HashSet<String>(tablesIncluded);
+  }
+
   /**
    * We are interested in only those tables that have not changed their state in
    * META during the last few seconds specified by hbase.admin.fsck.timelag
@@ -3101,7 +3126,27 @@ public class HBaseFsck {
     this.sidelineDir = new Path(sidelineDir);
   }
 
-  protected static void printUsageAndExit() {
+  protected HFileCorruptionChecker createHFileCorruptionChecker(boolean sidelineCorruptHFiles) throws IOException {
+    return new HFileCorruptionChecker(conf, executor, sidelineCorruptHFiles);
+  }
+
+  public HFileCorruptionChecker getHFilecorruptionChecker() {
+    return hfcc;
+  }
+
+  public void setHFileCorruptionChecker(HFileCorruptionChecker hfcc) {
+    this.hfcc = hfcc;
+  }
+
+  public void setRetCode(int code) {
+    this.retcode = code;
+  }
+
+  public int getRetCode() {
+    return retcode;
+  }
+
+  protected HBaseFsck printUsageAndExit() {
     System.err.println("Usage: fsck [opts] {only tables}");
     System.err.println(" where [opts] are:");
     System.err.println("   -help Display help options (this)");
@@ -3115,7 +3160,8 @@ public class HBaseFsck {
     System.err.println("   -metaonly Only check the state of ROOT and META tables.");
     System.err.println("   -sidelineDir <hdfs://> HDFS path to backup existing meta and root.");
 
-    System.err.println("  Repair options: (expert features, use with caution!)");
+    System.err.println("");
+    System.err.println("  Metadata Repair options: (expert features, use with caution!)");
     System.err.println("   -fix              Try to fix region assignments.  This is for backwards compatiblity");
     System.err.println("   -fixAssignments   Try to fix region assignments.  Replaces the old -fix");
     System.err.println("   -fixMeta          Try to fix meta problems.  This assumes HDFS region info is good.");
@@ -3128,16 +3174,25 @@ public class HBaseFsck {
     System.err.println("   -maxOverlapsToSideline <n>  When fixing region overlaps, allow at most <n> regions to sideline per group. (n=" + DEFAULT_OVERLAPS_TO_SIDELINE +" by default)");
     System.err.println("   -fixSplitParents  Try to force offline split parents to be online.");
     System.err.println("   -ignorePreCheckPermission  ignore filesystem permission pre-check");
+
+    System.err.println("");
+    System.err.println("  Datafile Repair options: (expert features, use with caution!)");
+    System.err.println("   -checkCorruptHFiles     Check all Hfiles by opening them to make sure they are valid");
+    System.err.println("   -sidelineCorruptHfiles  Quarantine corrupted HFiles.  implies -checkCorruptHfiles");
+
     System.err.println("");
+    System.err.println("  Metadata Repair shortcuts");
     System.err.println("   -repair           Shortcut for -fixAssignments -fixMeta -fixHdfsHoles " +
         "-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps");
     System.err.println("   -repairHoles      Shortcut for -fixAssignments -fixMeta -fixHdfsHoles");
 
-    Runtime.getRuntime().exit(-2);
+    setRetCode(-2);
+    return this;
   }
 
   /**
    * Main program
+   *
    * @param args
    * @throws Exception
    */
@@ -3149,162 +3204,204 @@ public class HBaseFsck {
     URI defaultFs = hbasedir.getFileSystem(conf).getUri();
     conf.set("fs.defaultFS", defaultFs.toString());     // for hadoop 0.21+
     conf.set("fs.default.name", defaultFs.toString());  // for hadoop 0.20
-    HBaseFsck fsck = new HBaseFsck(conf);
+
+    int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
+    ExecutorService exec = new ScheduledThreadPoolExecutor(numThreads);
+    HBaseFsck hbck = new HBaseFsck(conf, exec);
+    hbck.exec(exec, args);
+    int retcode = hbck.getRetCode();
+    Runtime.getRuntime().exit(retcode);
+  }
+
+  public HBaseFsck exec(ExecutorService exec, String[] args) throws KeeperException, IOException,
+    ServiceException, InterruptedException {
     long sleepBeforeRerun = DEFAULT_SLEEP_BEFORE_RERUN;
 
+    boolean checkCorruptHFiles = false;
+    boolean sidelineCorruptHFiles = false;
+
     // Process command-line args.
     for (int i = 0; i < args.length; i++) {
       String cmd = args[i];
       if (cmd.equals("-help") || cmd.equals("-h")) {
-        printUsageAndExit();
+        return printUsageAndExit();
       } else if (cmd.equals("-details")) {
-        fsck.setDisplayFullReport();
+        setDisplayFullReport();
       } else if (cmd.equals("-timelag")) {
         if (i == args.length - 1) {
           System.err.println("HBaseFsck: -timelag needs a value.");
-          printUsageAndExit();
+          return printUsageAndExit();
         }
         try {
           long timelag = Long.parseLong(args[i+1]);
-          fsck.setTimeLag(timelag);
+          setTimeLag(timelag);
         } catch (NumberFormatException e) {
           System.err.println("-timelag needs a numeric value.");
-          printUsageAndExit();
+          return printUsageAndExit();
         }
         i++;
       } else if (cmd.equals("-sleepBeforeRerun")) {
         if (i == args.length - 1) {
           System.err.println("HBaseFsck: -sleepBeforeRerun needs a value.");
-          printUsageAndExit();
+          return printUsageAndExit();
         }
         try {
           sleepBeforeRerun = Long.parseLong(args[i+1]);
         } catch (NumberFormatException e) {
           System.err.println("-sleepBeforeRerun needs a numeric value.");
-          printUsageAndExit();
+          return printUsageAndExit();
         }
         i++;
       } else if (cmd.equals("-sidelineDir")) {
         if (i == args.length - 1) {
           System.err.println("HBaseFsck: -sidelineDir needs a value.");
-          printUsageAndExit();
+          return printUsageAndExit();
         }
         i++;
-        fsck.setSidelineDir(args[i]);
+        setSidelineDir(args[i]);
       } else if (cmd.equals("-fix")) {
         System.err.println("This option is deprecated, please use " +
           "-fixAssignments instead.");
-        fsck.setFixAssignments(true);
+        setFixAssignments(true);
       } else if (cmd.equals("-fixAssignments")) {
-        fsck.setFixAssignments(true);
+        setFixAssignments(true);
       } else if (cmd.equals("-fixMeta")) {
-        fsck.setFixMeta(true);
+        setFixMeta(true);
       } else if (cmd.equals("-fixHdfsHoles")) {
-        fsck.setFixHdfsHoles(true);
+        setFixHdfsHoles(true);
       } else if (cmd.equals("-fixHdfsOrphans")) {
-        fsck.setFixHdfsOrphans(true);
+        setFixHdfsOrphans(true);
       } else if (cmd.equals("-fixHdfsOverlaps")) {
-        fsck.setFixHdfsOverlaps(true);
+        setFixHdfsOverlaps(true);
       } else if (cmd.equals("-fixVersionFile")) {
-        fsck.setFixVersionFile(true);
+        setFixVersionFile(true);
       } else if (cmd.equals("-sidelineBigOverlaps")) {
-        fsck.setSidelineBigOverlaps(true);
+        setSidelineBigOverlaps(true);
       } else if (cmd.equals("-fixSplitParents")) {
-        fsck.setFixSplitParents(true);
+        setFixSplitParents(true);
       } else if (cmd.equals("-ignorePreCheckPermission")) {
-        fsck.setIgnorePreCheckPermission(true);
+        setIgnorePreCheckPermission(true);
+      } else if (cmd.equals("-checkCorruptHFiles")) {
+        checkCorruptHFiles = true;
+      } else if (cmd.equals("-sidelineCorruptHFiles")) {
+        sidelineCorruptHFiles = true;
       } else if (cmd.equals("-repair")) {
         // this attempts to merge overlapping hdfs regions, needs testing
         // under load
-        fsck.setFixHdfsHoles(true);
-        fsck.setFixHdfsOrphans(true);
-        fsck.setFixMeta(true);
-        fsck.setFixAssignments(true);
-        fsck.setFixHdfsOverlaps(true);
-        fsck.setFixVersionFile(true);
-        fsck.setSidelineBigOverlaps(true);
-        fsck.setFixSplitParents(false);
+        setFixHdfsHoles(true);
+        setFixHdfsOrphans(true);
+        setFixMeta(true);
+        setFixAssignments(true);
+        setFixHdfsOverlaps(true);
+        setFixVersionFile(true);
+        setSidelineBigOverlaps(true);
+        setFixSplitParents(false);
       } else if (cmd.equals("-repairHoles")) {
         // this will make all missing hdfs regions available but may lose data
-        fsck.setFixHdfsHoles(true);
-        fsck.setFixHdfsOrphans(false);
-        fsck.setFixMeta(true);
-        fsck.setFixAssignments(true);
-        fsck.setFixHdfsOverlaps(false);
-        fsck.setSidelineBigOverlaps(false);
-        fsck.setFixSplitParents(false);
+        setFixHdfsHoles(true);
+        setFixHdfsOrphans(false);
+        setFixMeta(true);
+        setFixAssignments(true);
+        setFixHdfsOverlaps(false);
+        setSidelineBigOverlaps(false);
+        setFixSplitParents(false);
       } else if (cmd.equals("-maxOverlapsToSideline")) {
         if (i == args.length - 1) {
           System.err.println("-maxOverlapsToSideline needs a numeric value argument.");
-          printUsageAndExit();
+          return printUsageAndExit();
         }
         try {
           int maxOverlapsToSideline = Integer.parseInt(args[i+1]);
-          fsck.setMaxOverlapsToSideline(maxOverlapsToSideline);
+          setMaxOverlapsToSideline(maxOverlapsToSideline);
         } catch (NumberFormatException e) {
           System.err.println("-maxOverlapsToSideline needs a numeric value argument.");
-          printUsageAndExit();
+          return printUsageAndExit();
         }
         i++;
       } else if (cmd.equals("-maxMerge")) {
         if (i == args.length - 1) {
           System.err.println("-maxMerge needs a numeric value argument.");
-          printUsageAndExit();
+          return printUsageAndExit();
         }
         try {
           int maxMerge = Integer.parseInt(args[i+1]);
-          fsck.setMaxMerge(maxMerge);
+          setMaxMerge(maxMerge);
         } catch (NumberFormatException e) {
           System.err.println("-maxMerge needs a numeric value argument.");
-          printUsageAndExit();
+          return printUsageAndExit();
         }
         i++;
       } else if (cmd.equals("-summary")) {
-        fsck.setSummary();
+        setSummary();
       } else if (cmd.equals("-metaonly")) {
-        fsck.setCheckMetaOnly();
+        setCheckMetaOnly();
       } else if (cmd.startsWith("-")) {
         System.err.println("Unrecognized option:" + cmd);
-        printUsageAndExit();
+        return printUsageAndExit();
       } else {
-        fsck.includeTable(cmd);
+        includeTable(cmd);
         System.out.println("Allow checking/fixes for table: " + cmd);
       }
     }
 
     // pre-check current user has FS write permission or not
     try {
-      fsck.preCheckPermission();
+      preCheckPermission();
     } catch (AccessControlException ace) {
       Runtime.getRuntime().exit(-1);
     } catch (IOException ioe) {
       Runtime.getRuntime().exit(-1);
     }
-    // do the real work of fsck
-    fsck.connect();
-    int code = fsck.onlineHbck();
-    // If we have changed the HBase state it is better to run fsck again
+
+    // do the real work of hbck
+    connect();
+
+    // if corrupt file mode is on, first fix them since they may be opened later
+    if (checkCorruptHFiles || sidelineCorruptHFiles) {
+      LOG.info("Checking all hfiles for corruption");
+      HFileCorruptionChecker hfcc = createHFileCorruptionChecker(sidelineCorruptHFiles);
+      setHFileCorruptionChecker(hfcc); // so we can get result
+      Collection<String> tables = getIncludedTables();
+      Collection<Path> tableDirs = new ArrayList<Path>();
+      Path rootdir = FSUtils.getRootDir(conf);
+      if (tables.size() > 0) {
+        for (String t : tables) {
+          tableDirs.add(FSUtils.getTablePath(rootdir, t));
+        }
+      } else {
+        tableDirs = FSUtils.getTableDirs(FSUtils.getCurrentFileSystem(conf), rootdir);
+      }
+      hfcc.checkTables(tableDirs);
+      PrintWriter out = new PrintWriter(System.out);
+      hfcc.report(out);
+      out.flush();
+    }
+
+    // check and fix table integrity, region consistency.
+    int code = onlineHbck();
+    setRetCode(code);
+    // If we have changed the HBase state it is better to run hbck again
     // to see if we haven't broken something else in the process.
     // We run it only once more because otherwise we can easily fall into
     // an infinite loop.
-    if (fsck.shouldRerun()) {
+    if (shouldRerun()) {
       try {
         LOG.info("Sleeping " + sleepBeforeRerun + "ms before re-checking after fix...");
         Thread.sleep(sleepBeforeRerun);
       } catch (InterruptedException ie) {
-        Runtime.getRuntime().exit(code);
+        return this;
       }
       // Just report
-      fsck.setFixAssignments(false);
-      fsck.setFixMeta(false);
-      fsck.setFixHdfsHoles(false);
-      fsck.setFixHdfsOverlaps(false);
-      fsck.setFixVersionFile(false);
-      fsck.errors.resetErrors();
-      code = fsck.onlineHbck();
+      setFixAssignments(false);
+      setFixMeta(false);
+      setFixHdfsHoles(false);
+      setFixHdfsOverlaps(false);
+      setFixVersionFile(false);
+      errors.resetErrors();
+      code = onlineHbck();
+      setRetCode(code);
     }
-
-    Runtime.getRuntime().exit(code);
+    return this;
   }
 
   /**

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java?rev=1377941&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java Tue Aug 28 01:55:21 2012
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util.hbck;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter;
+import org.apache.hadoop.hbase.util.FSUtils.HFileFilter;
+import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter;
+import org.apache.hadoop.hbase.util.HBaseFsck;
+
+/**
+ * This class marches through all of the region's hfiles and verifies that
+ * they are all valid files. One just needs to instantiate the class, use
+ * checkTables(List<Path>) and then retrieve the corrupted hfiles (and
+ * quarantined files if in quarantining mode)
+ *
+ * The implementation currently parallelizes at the regionDir level.
+ */
+@InterfaceAudience.Private
+public class HFileCorruptionChecker {
+  private static final Log LOG = LogFactory.getLog(HFileCorruptionChecker.class);
+
+  final Configuration conf;
+  final FileSystem fs;
+  final CacheConfig cacheConf;
+  final ExecutorService executor;
+  final Set<Path> corrupted = new ConcurrentSkipListSet<Path>();
+  final Set<Path> failures = new ConcurrentSkipListSet<Path>();
+  final Set<Path> quarantined = new ConcurrentSkipListSet<Path>();
+  final Set<Path> missing = new ConcurrentSkipListSet<Path>();
+  final boolean inQuarantineMode;
+  final AtomicInteger hfilesChecked = new AtomicInteger();
+
+  public HFileCorruptionChecker(Configuration conf, ExecutorService executor,
+      boolean quarantine) throws IOException {
+    this.conf = conf;
+    this.fs = FileSystem.get(conf);
+    this.cacheConf = new CacheConfig(conf);
+    this.executor = executor;
+    this.inQuarantineMode = quarantine;
+  }
+
+  /**
+   * Checks a path to see if it is a valid hfile.
+   *
+   * @param p
+   *          full Path to an HFile
+   * @throws IOException
+   *           This is a connectivity related exception
+   */
+  protected void checkHFile(Path p) throws IOException {
+    HFile.Reader r = null;
+    try {
+      r = HFile.createReader(fs, p, cacheConf);
+    } catch (CorruptHFileException che) {
+      LOG.warn("Found corrupt HFile " + p, che);
+      corrupted.add(p);
+      if (inQuarantineMode) {
+        Path dest = createQuarantinePath(p);
+        LOG.warn("Quarantining corrupt HFile " + p + " into " + dest);
+        boolean success = fs.mkdirs(dest.getParent());
+        success = success ? fs.rename(p, dest): false;
+        if (!success) {
+          failures.add(p);
+        } else {
+          quarantined.add(dest);
+        }
+      }
+      return;
+    } catch (FileNotFoundException fnfe) {
+      LOG.warn("HFile " + p + " was missing.  Likely removed due to compaction/split?");
+      missing.add(p);
+    } finally {
+      hfilesChecked.addAndGet(1);
+      if (r != null) {
+        r.close(true);
+      }
+    }
+  }
+
+  /**
+   * Given a path, generates a new path to where we move a corrupted hfile (bad
+   * trailer, no trailer).
+   *
+   * @param hFile
+   *          Path to a corrupt hfile (assumes that it is HBASE_DIR/ table
+   *          /region/cf/file)
+   * @return path to where corrupted files are stored. This should be
+   *         HBASE_DIR/.corrupt/table/region/cf/file.
+   */
+  Path createQuarantinePath(Path hFile) {
+    // extract the normal dirs structure
+    Path cfDir = hFile.getParent();
+    Path regionDir = cfDir.getParent();
+    Path tableDir = regionDir.getParent();
+
+    // build up the corrupted dirs strcture
+    Path corruptBaseDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
+        "hbase.hfile.quarantine.dir", HConstants.CORRUPT_DIR_NAME));
+    Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName());
+    Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName());
+    Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName());
+    Path corruptHfile = new Path(corruptFamilyDir, hFile.getName());
+    return corruptHfile;
+  }
+
+  /**
+   * Check all files in a column family dir.
+   *
+   * @param cfDir
+   *          column family directory
+   * @throws IOException
+   */
+  protected void checkColFamDir(Path cfDir) throws IOException {
+    FileStatus[] hfs = fs.listStatus(cfDir, new HFileFilter(fs)); // use same filter as scanner.
+    if (hfs.length == 0 && !fs.exists(cfDir)) {
+      // interestingly, listStatus does not throw an exception if the path does not exist.
+      LOG.warn("Colfam Directory " + cfDir +
+          " does not exist.  Likely due to concurrent split/compaction. Skipping.");
+      missing.add(cfDir);
+      return;
+    }
+    for (FileStatus hfFs : hfs) {
+      Path hf = hfFs.getPath();
+      checkHFile(hf);
+    }
+  }
+
+  /**
+   * Check all column families in a region dir.
+   *
+   * @param regionDir
+   *          region directory
+   * @throws IOException
+   */
+  protected void checkRegionDir(Path regionDir) throws IOException {
+    FileStatus[] cfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
+    if (cfs.length == 0 && !fs.exists(regionDir)) {
+      // interestingly, listStatus does not throw an exception if the path does not exist.
+      LOG.warn("Region Directory " + regionDir +
+          " does not exist.  Likely due to concurrent split/compaction. Skipping.");
+      missing.add(regionDir);
+      return;
+    }
+
+    for (FileStatus cfFs : cfs) {
+      Path cfDir = cfFs.getPath();
+      checkColFamDir(cfDir);
+    }
+  }
+
+  /**
+   * Check all the regiondirs in the specified tableDir
+   *
+   * @param tableDir
+   *          path to a table
+   * @throws IOException
+   */
+  void checkTableDir(Path tableDir) throws IOException {
+    FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
+    if (rds.length == 0 && !fs.exists(tableDir)) {
+      // interestingly listStatus does not throw an exception if the path does not exist.
+      LOG.warn("Table Directory " + tableDir +
+          " does not exist.  Likely due to concurrent delete. Skipping.");
+      missing.add(tableDir);
+      return;
+    }
+
+    // Parallelize check at the region dir level
+    List<RegionDirChecker> rdcs = new ArrayList<RegionDirChecker>();
+    List<Future<Void>> rdFutures;
+
+    for (FileStatus rdFs : rds) {
+      Path rdDir = rdFs.getPath();
+      RegionDirChecker work = new RegionDirChecker(rdDir);
+      rdcs.add(work);
+    }
+
+    // Submit and wait for completion
+    try {
+      rdFutures = executor.invokeAll(rdcs);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+      LOG.warn("Region dirs checking interrupted!", ie);
+      return;
+    }
+
+    for (int i = 0; i < rdFutures.size(); i++) {
+      Future<Void> f = rdFutures.get(i);
+      try {
+        f.get();
+      } catch (ExecutionException e) {
+        LOG.warn("Failed to quaratine an HFile in regiondir "
+            + rdcs.get(i).regionDir, e.getCause());
+        // rethrow IOExceptions
+        if (e.getCause() instanceof IOException) {
+          throw (IOException) e.getCause();
+        }
+
+        // rethrow RuntimeExceptions
+        if (e.getCause() instanceof RuntimeException) {
+          throw (RuntimeException) e.getCause();
+        }
+
+        // this should never happen
+        LOG.error("Unexpected exception encountered", e);
+        return; // bailing out.
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        LOG.warn("Region dirs check interrupted!", ie);
+        // bailing out
+        return;
+      }
+    }
+  }
+
+  /**
+   * An individual work item for parallelized regiondir processing. This is
+   * intentionally an inner class so it can use the shared error sets and fs.
+   */
+  private class RegionDirChecker implements Callable<Void> {
+    final Path regionDir;
+
+    RegionDirChecker(Path regionDir) {
+      this.regionDir = regionDir;
+    }
+
+    @Override
+    public Void call() throws IOException {
+      checkRegionDir(regionDir);
+      return null;
+    }
+  }
+
+  /**
+   * Check the specified table dirs for bad hfiles.
+   */
+  public void checkTables(Collection<Path> tables) throws IOException {
+    for (Path t : tables) {
+      checkTableDir(t);
+    }
+  }
+
+  /**
+   * @return the set of check failure file paths after checkTables is called.
+   */
+  public Collection<Path> getFailures() {
+    return new HashSet<Path>(failures);
+  }
+
+  /**
+   * @return the set of corrupted file paths after checkTables is called.
+   */
+  public Collection<Path> getCorrupted() {
+    return new HashSet<Path>(corrupted);
+  }
+
+  /**
+   * @return number of hfiles checked in the last HfileCorruptionChecker run
+   */
+  public int getHFilesChecked() {
+    return hfilesChecked.get();
+  }
+
+  /**
+   * @return the set of successfully quarantined paths after checkTables is called.
+   */
+  public Collection<Path> getQuarantined() {
+    return new HashSet<Path>(quarantined);
+  }
+
+  /**
+   * @return the set of paths that were missing.  Likely due to deletion/moves from
+   *  compaction or flushes.
+   */
+  public Collection<Path> getMissing() {
+    return new HashSet<Path>(missing);
+  }
+
+  /**
+   * Print a human readable summary of hfile quarantining operations.
+   * @param out
+   */
+  public void report(PrintWriter out) {
+    out.println("Checked " + hfilesChecked.get() + " hfile for corruption");
+    out.println("  HFiles corrupted:                  " + corrupted.size());
+    if (inQuarantineMode) {
+      out.println("    HFiles successfully quarantined: " + quarantined.size());
+      for (Path sq : quarantined) {
+        out.println("      " + sq);
+      }
+      out.println("    HFiles failed quarantine:        " + failures.size());
+      for (Path fq : failures) {
+        out.println("      " + fq);
+      }
+    }
+    out.println("    HFiles moved while checking:     " + missing.size());
+    for (Path mq : missing) {
+      out.println("      " + mq);
+    }
+
+    String initialState = (corrupted.size() == 0) ? "OK" : "CORRUPTED";
+    String fixedState = (corrupted.size() == quarantined.size()) ? "OK"
+        : "CORRUPTED";
+
+    if (inQuarantineMode) {
+      out.println("Summary: " + initialState + " => " + fixedState);
+    } else {
+      out.println("Summary: " + initialState);
+    }
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java?rev=1377941&r1=1377940&r2=1377941&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java Tue Aug 28 01:55:21 2012
@@ -140,7 +140,7 @@ public class TestFixedFileTrailer {
         try {
           readTrailer(trailerPath);
           fail("Exception expected");
-        } catch (IOException ex) {
+        } catch (IllegalArgumentException ex) {
           // Make it easy to debug this.
           String msg = ex.getMessage();
           String cleanMsg = msg.replaceAll(

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java?rev=1377941&r1=1377940&r2=1377941&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java Tue Aug 28 01:55:21 2012
@@ -30,12 +30,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 import org.junit.experimental.categories.Category;
@@ -62,14 +65,12 @@ public class TestHFile extends HBaseTest
 
   @Override
   public void setUp() throws Exception {
-    startingMetrics = SchemaMetrics.getMetricsSnapshot();
     super.setUp();
   }
 
   @Override
   public void tearDown() throws Exception {
     super.tearDown();
-    SchemaMetrics.validateMetricChanges(startingMetrics);
   }
 
 
@@ -90,6 +91,61 @@ public class TestHFile extends HBaseTest
     assertNull(r.getLastKey());
   }
 
+  /**
+   * Create 0-length hfile and show that it fails
+   */
+  public void testCorrupt0LengthHFile() throws IOException {
+    if (cacheConf == null) cacheConf = new CacheConfig(conf);
+    Path f = new Path(ROOT_DIR, getName());
+    FSDataOutputStream fsos = fs.create(f);
+    fsos.close();
+
+    try {
+      Reader r = HFile.createReader(fs, f, cacheConf);
+    } catch (CorruptHFileException che) {
+      // Expected failure
+      return;
+    }
+    fail("Should have thrown exception");
+  }
+
+  public static void truncateFile(FileSystem fs, Path src, Path dst) throws IOException {
+    FileStatus fst = fs.getFileStatus(src);
+    long len = fst.getLen();
+    len = len / 2 ;
+
+    // create a truncated hfile
+    FSDataOutputStream fdos = fs.create(dst);
+    byte[] buf = new byte[(int)len];
+    FSDataInputStream fdis = fs.open(src);
+    fdis.read(buf);
+    fdos.write(buf);
+    fdis.close();
+    fdos.close();
+  }
+
+  /**
+   * Create a truncated hfile and verify that exception thrown.
+   */
+  public void testCorruptTruncatedHFile() throws IOException {
+    if (cacheConf == null) cacheConf = new CacheConfig(conf);
+    Path f = new Path(ROOT_DIR, getName());
+    Writer w = HFile.getWriterFactory(conf, cacheConf).withPath(this.fs, f).create();
+    writeSomeRecords(w, 0, 100);
+    w.close();
+
+    Path trunc = new Path(f.getParent(), "trucated");
+    truncateFile(fs, w.getPath(), trunc);
+
+    try {
+      Reader r = HFile.createReader(fs, trunc, cacheConf);
+    } catch (CorruptHFileException che) {
+      // Expected failure
+      return;
+    }
+    fail("Should have thrown exception");
+  }
+
   // write some records into the tfile
   // write them twice
   private int writeSomeRecords(Writer writer, int start, int n)

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java?rev=1377941&r1=1377940&r2=1377941&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java Tue Aug 28 01:55:21 2012
@@ -35,10 +35,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ClusterStatus;
@@ -48,7 +51,7 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.LargeTests;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.RegionTransition;
 import org.apache.hadoop.hbase.ServerName;
@@ -63,6 +66,7 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.executor.EventHandler.EventType;
+import org.apache.hadoop.hbase.io.hfile.TestHFile;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.RegionStates;
@@ -71,6 +75,8 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
 import org.apache.hadoop.hbase.util.HBaseFsck.HbckInfo;
+import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
+import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
@@ -78,18 +84,20 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 
 import com.google.common.collect.Multimap;
 
 /**
  * This tests HBaseFsck's ability to detect reasons for inconsistent tables.
  */
-@Category(MediumTests.class)
+@Category(LargeTests.class)
 public class TestHBaseFsck {
   final static Log LOG = LogFactory.getLog(TestHBaseFsck.class);
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private final static Configuration conf = TEST_UTIL.getConfiguration();
-  private final static byte[] FAM = Bytes.toBytes("fam");
+  private final static String FAM_STR = "fam";
+  private final static byte[] FAM = Bytes.toBytes(FAM_STR);
   private final static int REGION_ONLINE_TIMEOUT = 800;
   private static RegionStates regionStates;
 
@@ -1275,8 +1283,187 @@ public class TestHBaseFsck {
       deleteTable(table);
     }
   }
-  
+
+  /**
+   * We don't have an easy way to verify that a flush completed, so we loop until we find a
+   * legitimate hfile and return it.
+   * @param fs
+   * @param table
+   * @return Path of a flushed hfile.
+   * @throws IOException
+   */
+  Path getFlushedHFile(FileSystem fs, String table) throws IOException {
+    Path tableDir= FSUtils.getTablePath(FSUtils.getRootDir(conf), table);
+    Path regionDir = FSUtils.getRegionDirs(fs, tableDir).get(0);
+    Path famDir = new Path(regionDir, FAM_STR);
+
+    // keep doing this until we get a legit hfile
+    while (true) {
+      FileStatus[] hfFss = fs.listStatus(famDir);
+      if (hfFss.length == 0) {
+        continue;
+      }
+      for (FileStatus hfs : hfFss) {
+        if (!hfs.isDir()) {
+          return hfs.getPath();
+        }
+      }
+    }
+  }
+
+  /**
+   * This creates a table and then corrupts an hfile.  Hbck should quarantine the file.
+   */
+  @Test(timeout=120000)
+  public void testQuarantineCorruptHFile() throws Exception {
+    String table = name.getMethodName();
+    try {
+      setupTable(table);
+      assertEquals(ROWKEYS.length, countRows());
+      TEST_UTIL.getHBaseAdmin().flush(table); // flush is async.
+
+      FileSystem fs = FileSystem.get(conf);
+      Path hfile = getFlushedHFile(fs, table);
+
+      // Mess it up by leaving a hole in the assignment, meta, and hdfs data
+      TEST_UTIL.getHBaseAdmin().disableTable(table);
+
+      // create new corrupt file called deadbeef (valid hfile name)
+      Path corrupt = new Path(hfile.getParent(), "deadbeef");
+      TestHFile.truncateFile(fs, hfile, corrupt);
+      LOG.info("Created corrupted file " + corrupt);
+      HBaseFsck.debugLsr(conf, FSUtils.getRootDir(conf));
+
+      // we cannot enable here because enable never finished due to the corrupt region.
+      HBaseFsck res = HbckTestingUtil.doHFileQuarantine(conf, table);
+      assertEquals(res.getRetCode(), 0);
+      HFileCorruptionChecker hfcc = res.getHFilecorruptionChecker();
+      assertEquals(hfcc.getHFilesChecked(), 5);
+      assertEquals(hfcc.getCorrupted().size(), 1);
+      assertEquals(hfcc.getFailures().size(), 0);
+      assertEquals(hfcc.getQuarantined().size(), 1);
+      assertEquals(hfcc.getMissing().size(), 0);
+
+      // Its been fixed, verify that we can enable.
+      TEST_UTIL.getHBaseAdmin().enableTable(table);
+    } finally {
+      deleteTable(table);
+    }
+  }
+
+  private void doQuarantineTest(String table, HBaseFsck hbck, int check, int corrupt, int fail,
+      int quar, int missing) throws Exception {
+    try {
+      setupTable(table);
+      assertEquals(ROWKEYS.length, countRows());
+      TEST_UTIL.getHBaseAdmin().flush(table); // flush is async.
+
+      // Mess it up by leaving a hole in the assignment, meta, and hdfs data
+      TEST_UTIL.getHBaseAdmin().disableTable(table);
+
+      String[] args = {"-sidelineCorruptHFiles", "-repairHoles", "-ignorePreCheckPermission", table};
+      ExecutorService exec = new ScheduledThreadPoolExecutor(10);
+      HBaseFsck res = hbck.exec(exec, args);
+
+      HFileCorruptionChecker hfcc = res.getHFilecorruptionChecker();
+      assertEquals(hfcc.getHFilesChecked(), check);
+      assertEquals(hfcc.getCorrupted().size(), corrupt);
+      assertEquals(hfcc.getFailures().size(), fail);
+      assertEquals(hfcc.getQuarantined().size(), quar);
+      assertEquals(hfcc.getMissing().size(), missing);
+
+      // its been fixed, verify that we can enable
+      TEST_UTIL.getHBaseAdmin().enableTable(table);
+    } finally {
+      deleteTable(table);
+    }
+  }
+
+  /**
+   * This creates a table and simulates the race situation where a concurrent compaction or split
+   * has removed an hfile after the corruption checker learned about it.
+   */
+  @Test(timeout=120000)
+  public void testQuarantineMissingHFile() throws Exception {
+    String table = name.getMethodName();
+    ExecutorService exec = new ScheduledThreadPoolExecutor(10);
+    // inject a fault in the hfcc created.
+    final FileSystem fs = FileSystem.get(conf);
+    HBaseFsck hbck = new HBaseFsck(conf, exec) {
+      public HFileCorruptionChecker createHFileCorruptionChecker(boolean sidelineCorruptHFiles) throws IOException {
+        return new HFileCorruptionChecker(conf, executor, sidelineCorruptHFiles) {
+          boolean attemptedFirstHFile = false;
+          protected void checkHFile(Path p) throws IOException {
+            if (!attemptedFirstHFile) {
+              attemptedFirstHFile = true;
+              assertTrue(fs.delete(p, true)); // make sure delete happened.
+            }
+            super.checkHFile(p);
+          }
+        };
+      }
+    };
+    doQuarantineTest(table, hbck, 4, 0, 0, 0, 1); // 4 attempted, but 1 missing.
+  }
+
+  /**
+   * This creates a table and simulates the race situation where a concurrent compaction or split
+   * has removed an colfam dir before the corruption checker got to it.
+   */
+  @Test(timeout=120000)
+  public void testQuarantineMissingFamdir() throws Exception {
+    String table = name.getMethodName();
+    ExecutorService exec = new ScheduledThreadPoolExecutor(10);
+    // inject a fault in the hfcc created.
+    final FileSystem fs = FileSystem.get(conf);
+    HBaseFsck hbck = new HBaseFsck(conf, exec) {
+      public HFileCorruptionChecker createHFileCorruptionChecker(boolean sidelineCorruptHFiles) throws IOException {
+        return new HFileCorruptionChecker(conf, executor, sidelineCorruptHFiles) {
+          boolean attemptedFirstFamDir = false;
+          protected void checkColFamDir(Path p) throws IOException {
+            if (!attemptedFirstFamDir) {
+              attemptedFirstFamDir = true;
+              assertTrue(fs.delete(p, true)); // make sure delete happened.
+            }
+            super.checkColFamDir(p);
+          }
+        };
+      }
+    };
+    doQuarantineTest(table, hbck, 3, 0, 0, 0, 1);
+  }
+
+  /**
+   * This creates a table and simulates the race situation where a concurrent compaction or split
+   * has removed a region dir before the corruption checker got to it.
+   */
+  @Test(timeout=120000)
+  public void testQuarantineMissingRegionDir() throws Exception {
+    String table = name.getMethodName();
+    ExecutorService exec = new ScheduledThreadPoolExecutor(10);
+    // inject a fault in the hfcc created.
+    final FileSystem fs = FileSystem.get(conf);
+    HBaseFsck hbck = new HBaseFsck(conf, exec) {
+      public HFileCorruptionChecker createHFileCorruptionChecker(boolean sidelineCorruptHFiles) throws IOException {
+        return new HFileCorruptionChecker(conf, executor, sidelineCorruptHFiles) {
+          boolean attemptedFirstRegionDir = false;
+          protected void checkRegionDir(Path p) throws IOException {
+            if (!attemptedFirstRegionDir) {
+              attemptedFirstRegionDir = true;
+              assertTrue(fs.delete(p, true)); // make sure delete happened.
+            }
+            super.checkRegionDir(p);
+          }
+        };
+      }
+    };
+    doQuarantineTest(table, hbck, 3, 0, 0, 0, 1);
+  }
+
   @org.junit.Rule
   public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
     new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+
+  @org.junit.Rule
+  public TestName name = new TestName();
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java?rev=1377941&r1=1377940&r2=1377941&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java Tue Aug 28 01:55:21 2012
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEqu
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.HBaseFsck;
@@ -59,6 +61,21 @@ public class HbckTestingUtil {
     return fsck;
   }
 
+  /**
+   * Runs hbck with the -sidelineCorruptHFiles option
+   * @param conf
+   * @param table table constraint
+   * @return <returncode, hbckInstance>
+   * @throws Exception
+   */
+  public static HBaseFsck doHFileQuarantine(Configuration conf, String table) throws Exception {
+    String[] args = {"-sidelineCorruptHFiles", "-ignorePreCheckPermission", table};
+    ExecutorService exec = new ScheduledThreadPoolExecutor(10);
+    HBaseFsck hbck = new HBaseFsck(conf, exec);
+    hbck.exec(exec, args);
+    return hbck;
+  }
+
   public static void assertNoErrors(HBaseFsck fsck) throws Exception {
     List<ERROR_CODE> errs = fsck.getErrors().getErrorList();
     assertEquals(new ArrayList<ERROR_CODE>(), errs);