You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/05/15 20:15:16 UTC

[1/4] hbase git commit: HBASE-13651 Handle StoreFileScanner FileNotFoundException

Repository: hbase
Updated Branches:
  refs/heads/0.94 fb4346177 -> ca254ed51
  refs/heads/0.98 2810ba63a -> 42e3e37ee
  refs/heads/branch-1 41aceca80 -> 6968834c9
  refs/heads/master 9ba7337ac -> fec091a80


HBASE-13651 Handle StoreFileScanner FileNotFoundException


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fec091a8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fec091a8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fec091a8

Branch: refs/heads/master
Commit: fec091a8073ab000e47120c244238f1b3642d560
Parents: 9ba7337
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri May 15 18:54:37 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri May 15 18:57:58 2015 +0100

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 112 ++++++---
 .../hbase/regionserver/StoreFileScanner.java    |   9 +
 .../TestCorruptedRegionStoreFile.java           | 250 +++++++++++++++++++
 3 files changed, 333 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/fec091a8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 5c117f0..3994027 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1103,7 +1103,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   public long getNumMutationsWithoutWAL() {
     return numMutationsWithoutWAL.get();
   }
-  
+
   @Override
   public long getDataInMemoryWithoutWAL() {
     return dataInMemoryWithoutWAL.get();
@@ -2360,7 +2360,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     LOG.info(msg);
     status.setStatus(msg);
 
-    return new FlushResultImpl(compactionRequested ? 
+    return new FlushResultImpl(compactionRequested ?
         FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
           FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED,
         flushOpSeqId);
@@ -4713,7 +4713,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public boolean refreshStoreFiles() throws IOException {
-    if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+    return refreshStoreFiles(false);
+  }
+
+  protected boolean refreshStoreFiles(boolean force) throws IOException {
+    if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
       return false; // if primary nothing to do
     }
 
@@ -5192,15 +5196,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
      * If the joined heap data gathering is interrupted due to scan limits, this will
      * contain the row for which we are populating the values.*/
     protected Cell joinedContinuationRow = null;
+    private boolean filterClosed = false;
+
+    protected final int isScan;
     protected final byte[] stopRow;
+    protected final HRegion region;
+    protected final CellComparator comparator;
+
+    private final long readPt;
+    private final long maxResultSize;
+    private final ScannerContext defaultScannerContext;
     private final FilterWrapper filter;
-    private ScannerContext defaultScannerContext;
-    protected int isScan;
-    private boolean filterClosed = false;
-    private long readPt;
-    private long maxResultSize;
-    protected HRegion region;
-    protected CellComparator comparator;
 
     @Override
     public HRegionInfo getRegionInfo() {
@@ -5209,7 +5215,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
         throws IOException {
-
       this.region = region;
       this.maxResultSize = scan.getMaxResultSize();
       if (scan.hasFilter()) {
@@ -5251,10 +5256,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         scanners.addAll(additionalScanners);
       }
 
-      for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
-          scan.getFamilyMap().entrySet()) {
+      for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
         Store store = stores.get(entry.getKey());
-        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+        KeyValueScanner scanner;
+        try {
+          scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+        } catch (FileNotFoundException e) {
+          throw handleFileNotFound(e);
+        }
         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
           || this.filter.isFamilyEssential(entry.getKey())) {
           scanners.add(scanner);
@@ -5346,7 +5355,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         moreValues = nextInternal(tmpList, scannerContext);
         outResults.addAll(tmpList);
       }
-      
+
       // If the size limit was reached it means a partial Result is being returned. Returning a
       // partial Result means that we should not reset the filters; filters should only be reset in
       // between rows
@@ -5397,30 +5406,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       boolean tmpKeepProgress = scannerContext.getKeepProgress();
       // Scanning between column families and thus the scope is between cells
       LimitScope limitScope = LimitScope.BETWEEN_CELLS;
-      do {
-        // We want to maintain any progress that is made towards the limits while scanning across
-        // different column families. To do this, we toggle the keep progress flag on during calls
-        // to the StoreScanner to ensure that any progress made thus far is not wiped away.
-        scannerContext.setKeepProgress(true);
-        heap.next(results, scannerContext);
-        scannerContext.setKeepProgress(tmpKeepProgress);
-
-        nextKv = heap.peek();
-        moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
-
-        if (scannerContext.checkBatchLimit(limitScope)) {
-          return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
-        } else if (scannerContext.checkSizeLimit(limitScope)) {
-          ScannerContext.NextState state =
+      try {
+        do {
+          // We want to maintain any progress that is made towards the limits while scanning across
+          // different column families. To do this, we toggle the keep progress flag on during calls
+          // to the StoreScanner to ensure that any progress made thus far is not wiped away.
+          scannerContext.setKeepProgress(true);
+          heap.next(results, scannerContext);
+          scannerContext.setKeepProgress(tmpKeepProgress);
+
+          nextKv = heap.peek();
+          moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
+
+          if (scannerContext.checkBatchLimit(limitScope)) {
+            return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
+          } else if (scannerContext.checkSizeLimit(limitScope)) {
+            ScannerContext.NextState state =
               moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
-          return scannerContext.setScannerState(state).hasMoreValues();
-        } else if (scannerContext.checkTimeLimit(limitScope)) {
-          ScannerContext.NextState state =
+            return scannerContext.setScannerState(state).hasMoreValues();
+          } else if (scannerContext.checkTimeLimit(limitScope)) {
+            ScannerContext.NextState state =
               moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
-          return scannerContext.setScannerState(state).hasMoreValues();
-        }
-      } while (moreCellsInRow);
-
+            return scannerContext.setScannerState(state).hasMoreValues();
+          }
+        } while (moreCellsInRow);
+      } catch (FileNotFoundException e) {
+        throw handleFileNotFound(e);
+      }
       return nextKv != null;
     }
 
@@ -5748,18 +5760,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
       boolean result = false;
       startRegionOperation();
+      KeyValue kv = KeyValueUtil.createFirstOnRow(row);
       try {
-        KeyValue kv = KeyValueUtil.createFirstOnRow(row);
         // use request seek to make use of the lazy seek option. See HBASE-5520
         result = this.storeHeap.requestSeek(kv, true, true);
         if (this.joinedHeap != null) {
           result = this.joinedHeap.requestSeek(kv, true, true) || result;
         }
+      } catch (FileNotFoundException e) {
+        throw handleFileNotFound(e);
       } finally {
         closeRegionOperation();
       }
       return result;
     }
+
+    private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException {
+      // tries to refresh the store files, otherwise shutdown the RS.
+      // TODO: add support for abort() of a single region and trigger reassignment.
+      try {
+        region.refreshStoreFiles(true);
+        return new IOException("unable to read store file");
+      } catch (IOException e) {
+        String msg = "a store file got lost: " + fnfe.getMessage();
+        LOG.error("unable to refresh store files", e);
+        abortRegionServer(msg);
+        return new NotServingRegionException(
+          getRegionInfo().getRegionNameAsString() + " is closing");
+      }
+    }
+
+    private void abortRegionServer(String msg) throws IOException {
+      if (rsServices instanceof HRegionServer) {
+        ((HRegionServer)rsServices).abort(msg);
+      }
+      throw new UnsupportedOperationException("not able to abort RS after: " + msg);
+    }
   }
 
   // Utility methods

http://git-wip-us.apache.org/repos/asf/hbase/blob/fec091a8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index c0a5d5e..42a378d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -143,6 +144,8 @@ public class StoreFileScanner implements KeyValueScanner {
           skipKVsNewerThanReadpoint();
         }
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch(IOException e) {
       throw new IOException("Could not iterate " + this, e);
     }
@@ -169,6 +172,8 @@ public class StoreFileScanner implements KeyValueScanner {
       } finally {
         realSeekDone = true;
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch (IOException ioe) {
       throw new IOException("Could not seek " + this + " to key " + key, ioe);
     }
@@ -193,6 +198,8 @@ public class StoreFileScanner implements KeyValueScanner {
       } finally {
         realSeekDone = true;
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch (IOException ioe) {
       throw new IOException("Could not reseek " + this + " to key " + key,
           ioe);
@@ -451,6 +458,8 @@ public class StoreFileScanner implements KeyValueScanner {
       } finally {
         realSeekDone = true;
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch (IOException ioe) {
       throw new IOException("Could not seekToPreviousRow " + this + " to key "
           + key, ioe);

http://git-wip-us.apache.org/repos/asf/hbase/blob/fec091a8/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
new file mode 100644
index 0000000..8c9312c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FSVisitor;
+import org.apache.hadoop.hbase.util.TestTableName;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, LargeTests.class})
+public class TestCorruptedRegionStoreFile {
+  private static final Log LOG = LogFactory.getLog(TestCorruptedRegionStoreFile.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static final String FAMILY_NAME_STR = "f";
+  private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
+
+  private static final int NUM_FILES = 25;
+  private static final int ROW_PER_FILE = 2000;
+  private static final int NUM_ROWS = NUM_FILES * ROW_PER_FILE;
+
+  @Rule public TestTableName TEST_TABLE = new TestTableName();
+
+  private final ArrayList<Path> storeFiles = new ArrayList<Path>();
+  private Path tableDir;
+  private int rowCount;
+
+  private static void setupConf(Configuration conf) {
+    conf.setLong("hbase.hstore.compaction.min", 20);
+    conf.setLong("hbase.hstore.compaction.max", 39);
+    conf.setLong("hbase.hstore.blockingStoreFiles", 40);
+  }
+
+  private void setupTable(final TableName tableName) throws IOException {
+    // load the table
+    Table table = UTIL.createTable(tableName, FAMILY_NAME);
+    try {
+      rowCount = 0;
+      byte[] value = new byte[1024];
+      byte[] q = Bytes.toBytes("q");
+      while (rowCount < NUM_ROWS) {
+        Put put = new Put(Bytes.toBytes(String.format("%010d", rowCount)));
+        put.setDurability(Durability.SKIP_WAL);
+        put.add(FAMILY_NAME, q, value);
+        table.put(put);
+
+        if ((rowCount++ % ROW_PER_FILE) == 0) {
+          // flush it
+          UTIL.getHBaseAdmin().flush(tableName);
+        }
+      }
+    } finally {
+      UTIL.getHBaseAdmin().flush(tableName);
+      table.close();
+    }
+
+    assertEquals(NUM_ROWS, rowCount);
+
+    // get the store file paths
+    storeFiles.clear();
+    tableDir = FSUtils.getTableDir(getRootDir(), tableName);
+    FSVisitor.visitTableStoreFiles(getFileSystem(), tableDir, new FSVisitor.StoreFileVisitor() {
+      @Override
+      public void storeFile(final String region, final String family, final String hfile)
+          throws IOException {
+        HFileLink link = HFileLink.build(UTIL.getConfiguration(), tableName, region, family, hfile);
+        storeFiles.add(link.getOriginPath());
+      }
+    });
+    assertTrue("expected at least 1 store file", storeFiles.size() > 0);
+    LOG.info("store-files: " + storeFiles);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    setupConf(UTIL.getConfiguration());
+    UTIL.startMiniCluster(2, 3);
+
+    setupTable(TEST_TABLE.getTableName());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      UTIL.shutdownMiniCluster();
+    } catch (Exception e) {
+      LOG.warn("failure shutting down cluster", e);
+    }
+  }
+
+  @Test(timeout=180000)
+  public void testLosingFileDuringScan() throws Exception {
+    assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
+
+    final FileSystem fs = getFileSystem();
+    final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+    // try to query with the missing file
+    int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
+      private boolean hasFile = true;
+
+      @Override
+      public void beforeScanNext(Table table) throws Exception {
+        // move the path away (now the region is corrupted)
+        if (hasFile) {
+          fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+          LOG.info("Move file to local");
+          evictHFileCache(storeFiles.get(0));
+          hasFile = false;
+        }
+      }
+    });
+    assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
+               count >= (NUM_ROWS - ROW_PER_FILE));
+  }
+
+  @Test(timeout=180000)
+  public void testLosingFileAfterScannerInit() throws Exception {
+    assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
+
+    final FileSystem fs = getFileSystem();
+    final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+    // try to query with the missing file
+    int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
+      private boolean hasFile = true;
+
+      @Override
+      public void beforeScan(Table table, Scan scan) throws Exception {
+        // move the path away (now the region is corrupted)
+        if (hasFile) {
+          fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+          LOG.info("Move file to local");
+          evictHFileCache(storeFiles.get(0));
+          hasFile = false;
+        }
+      }
+    });
+    assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
+               count >= (NUM_ROWS - ROW_PER_FILE));
+  }
+
+  // ==========================================================================
+  //  Helpers
+  // ==========================================================================
+  private FileSystem getFileSystem() {
+    return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+  }
+
+  private Path getRootDir() {
+    return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+  }
+
+  private void evictHFileCache(final Path hfile) throws Exception {
+    for (RegionServerThread rst: UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+      HRegionServer rs = rst.getRegionServer();
+      rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName());
+    }
+    Thread.sleep(6000);
+  }
+
+  private int fullScanAndCount(final TableName tableName) throws Exception {
+    return fullScanAndCount(tableName, new ScanInjector());
+  }
+
+  private int fullScanAndCount(final TableName tableName, final ScanInjector injector)
+      throws Exception {
+    Table table = UTIL.getConnection().getTable(tableName);
+    int count = 0;
+    try {
+      Scan scan = new Scan();
+      scan.setCaching(1);
+      scan.setCacheBlocks(false);
+      injector.beforeScan(table, scan);
+      ResultScanner scanner = table.getScanner(scan);
+      try {
+        while (true) {
+          injector.beforeScanNext(table);
+          Result result = scanner.next();
+          injector.afterScanNext(table, result);
+          if (result == null) break;
+          if ((count++ % (ROW_PER_FILE / 2)) == 0) {
+            LOG.debug("scan next " + count);
+          }
+        }
+      } finally {
+        scanner.close();
+        injector.afterScan(table);
+      }
+    } finally {
+      table.close();
+    }
+    return count;
+  }
+
+  private class ScanInjector {
+    protected void beforeScan(Table table, Scan scan) throws Exception {}
+    protected void beforeScanNext(Table table) throws Exception {}
+    protected void afterScanNext(Table table, Result result) throws Exception {}
+    protected void afterScan(Table table) throws Exception {}
+  }
+}


[2/4] hbase git commit: HBASE-13651 Handle StoreFileScanner FileNotFoundExceptin

Posted by mb...@apache.org.
HBASE-13651 Handle StoreFileScanner FileNotFoundExceptin


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6968834c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6968834c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6968834c

Branch: refs/heads/branch-1
Commit: 6968834c9c96c103e3a87f1be0dace49f2c9461e
Parents: 41aceca
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri May 15 19:04:22 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri May 15 19:04:22 2015 +0100

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 114 ++++++---
 .../hbase/regionserver/StoreFileScanner.java    |   9 +
 .../TestCorruptedRegionStoreFile.java           | 249 +++++++++++++++++++
 3 files changed, 333 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6968834c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 66c09a8..1bb0865 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1108,7 +1108,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   public long getNumMutationsWithoutWAL() {
     return numMutationsWithoutWAL.get();
   }
-  
+
   @Override
   public long getDataInMemoryWithoutWAL() {
     return dataInMemoryWithoutWAL.get();
@@ -2365,7 +2365,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     LOG.info(msg);
     status.setStatus(msg);
 
-    return new FlushResultImpl(compactionRequested ? 
+    return new FlushResultImpl(compactionRequested ?
         FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
           FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED,
         flushOpSeqId);
@@ -4721,7 +4721,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public boolean refreshStoreFiles() throws IOException {
-    if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+    return refreshStoreFiles(false);
+  }
+
+  protected boolean refreshStoreFiles(boolean force) throws IOException {
+    if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
       return false; // if primary nothing to do
     }
 
@@ -5200,14 +5204,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
      * If the joined heap data gathering is interrupted due to scan limits, this will
      * contain the row for which we are populating the values.*/
     protected Cell joinedContinuationRow = null;
+    private boolean filterClosed = false;
+
+    protected final int isScan;
     protected final byte[] stopRow;
+    protected final HRegion region;
+
+    private final long readPt;
+    private final long maxResultSize;
+    private final ScannerContext defaultScannerContext;
     private final FilterWrapper filter;
-    private ScannerContext defaultScannerContext;
-    protected int isScan;
-    private boolean filterClosed = false;
-    private long readPt;
-    private long maxResultSize;
-    protected HRegion region;
 
     @Override
     public HRegionInfo getRegionInfo() {
@@ -5216,7 +5222,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
         throws IOException {
-
       this.region = region;
       this.maxResultSize = scan.getMaxResultSize();
       if (scan.hasFilter()) {
@@ -5257,10 +5262,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         scanners.addAll(additionalScanners);
       }
 
-      for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
-          scan.getFamilyMap().entrySet()) {
+      for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
         Store store = stores.get(entry.getKey());
-        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+        KeyValueScanner scanner;
+        try {
+          scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+        } catch (FileNotFoundException e) {
+          throw handleFileNotFound(e);
+        }
         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
           || this.filter.isFamilyEssential(entry.getKey())) {
           scanners.add(scanner);
@@ -5352,7 +5361,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         moreValues = nextInternal(tmpList, scannerContext);
         outResults.addAll(tmpList);
       }
-      
+
       // If the size limit was reached it means a partial Result is being returned. Returning a
       // partial Result means that we should not reset the filters; filters should only be reset in
       // between rows
@@ -5403,30 +5412,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       boolean tmpKeepProgress = scannerContext.getKeepProgress();
       // Scanning between column families and thus the scope is between cells
       LimitScope limitScope = LimitScope.BETWEEN_CELLS;
-      do {
-        // We want to maintain any progress that is made towards the limits while scanning across
-        // different column families. To do this, we toggle the keep progress flag on during calls
-        // to the StoreScanner to ensure that any progress made thus far is not wiped away.
-        scannerContext.setKeepProgress(true);
-        heap.next(results, scannerContext);
-        scannerContext.setKeepProgress(tmpKeepProgress);
-
-        nextKv = heap.peek();
-        moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
-
-        if (scannerContext.checkBatchLimit(limitScope)) {
-          return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
-        } else if (scannerContext.checkSizeLimit(limitScope)) {
-          ScannerContext.NextState state =
-              moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
-          return scannerContext.setScannerState(state).hasMoreValues();
-        } else if (scannerContext.checkTimeLimit(limitScope)) {
-          ScannerContext.NextState state =
-              moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
-          return scannerContext.setScannerState(state).hasMoreValues();
-        }
-      } while (moreCellsInRow);
-
+      try {
+        do {
+          // We want to maintain any progress that is made towards the limits while scanning across
+          // different column families. To do this, we toggle the keep progress flag on during calls
+          // to the StoreScanner to ensure that any progress made thus far is not wiped away.
+          scannerContext.setKeepProgress(true);
+          heap.next(results, scannerContext);
+          scannerContext.setKeepProgress(tmpKeepProgress);
+
+          nextKv = heap.peek();
+          moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
+
+          if (scannerContext.checkBatchLimit(limitScope)) {
+            return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
+          } else if (scannerContext.checkSizeLimit(limitScope)) {
+            ScannerContext.NextState state =
+                moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
+            return scannerContext.setScannerState(state).hasMoreValues();
+          } else if (scannerContext.checkTimeLimit(limitScope)) {
+            ScannerContext.NextState state =
+                moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
+            return scannerContext.setScannerState(state).hasMoreValues();
+          }
+        } while (moreCellsInRow);
+      } catch (FileNotFoundException e) {
+        throw handleFileNotFound(e);
+      }
       return nextKv != null;
     }
 
@@ -5753,18 +5765,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
       boolean result = false;
       startRegionOperation();
+      KeyValue kv = KeyValueUtil.createFirstOnRow(row);
       try {
-        KeyValue kv = KeyValueUtil.createFirstOnRow(row);
         // use request seek to make use of the lazy seek option. See HBASE-5520
         result = this.storeHeap.requestSeek(kv, true, true);
         if (this.joinedHeap != null) {
           result = this.joinedHeap.requestSeek(kv, true, true) || result;
         }
+      } catch (FileNotFoundException e) {
+        throw handleFileNotFound(e);
       } finally {
         closeRegionOperation();
       }
       return result;
     }
+
+    private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException {
+      // tries to refresh the store files, otherwise shutdown the RS.
+      // TODO: add support for abort() of a single region and trigger reassignment.
+      try {
+        region.refreshStoreFiles(true);
+        return new IOException("unable to read store file");
+      } catch (IOException e) {
+        String msg = "a store file got lost: " + fnfe.getMessage();
+        LOG.error(msg);
+        LOG.error("unable to refresh store files", e);
+        abortRegionServer(msg);
+        return new NotServingRegionException(getRegionInfo().getRegionNameAsString() +" is closing");
+      }
+    }
+
+    private void abortRegionServer(String msg) throws IOException {
+      if (rsServices instanceof HRegionServer) {
+        ((HRegionServer)rsServices).abort(msg);
+      }
+      throw new UnsupportedOperationException("not able to abort RS after: " + msg);
+    }
   }
 
   // Utility methods

http://git-wip-us.apache.org/repos/asf/hbase/blob/6968834c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index c27b455..961352d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -142,6 +143,8 @@ public class StoreFileScanner implements KeyValueScanner {
           skipKVsNewerThanReadpoint();
         }
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch(IOException e) {
       throw new IOException("Could not iterate " + this, e);
     }
@@ -168,6 +171,8 @@ public class StoreFileScanner implements KeyValueScanner {
       } finally {
         realSeekDone = true;
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch (IOException ioe) {
       throw new IOException("Could not seek " + this + " to key " + key, ioe);
     }
@@ -192,6 +197,8 @@ public class StoreFileScanner implements KeyValueScanner {
       } finally {
         realSeekDone = true;
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch (IOException ioe) {
       throw new IOException("Could not reseek " + this + " to key " + key,
           ioe);
@@ -453,6 +460,8 @@ public class StoreFileScanner implements KeyValueScanner {
       } finally {
         realSeekDone = true;
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch (IOException ioe) {
       throw new IOException("Could not seekToPreviousRow " + this + " to key "
           + key, ioe);

http://git-wip-us.apache.org/repos/asf/hbase/blob/6968834c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
new file mode 100644
index 0000000..dce19d6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FSVisitor;
+import org.apache.hadoop.hbase.util.TestTableName;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category(LargeTests.class)
+public class TestCorruptedRegionStoreFile {
+  private static final Log LOG = LogFactory.getLog(TestCorruptedRegionStoreFile.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static final String FAMILY_NAME_STR = "f";
+  private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
+
+  private static final int NUM_FILES = 25;
+  private static final int ROW_PER_FILE = 2000;
+  private static final int NUM_ROWS = NUM_FILES * ROW_PER_FILE;
+
+  @Rule public TestTableName TEST_TABLE = new TestTableName();
+
+  private final ArrayList<Path> storeFiles = new ArrayList<Path>();
+  private Path tableDir;
+  private int rowCount;
+
+  private static void setupConf(Configuration conf) {
+    conf.setLong("hbase.hstore.compaction.min", 20);
+    conf.setLong("hbase.hstore.compaction.max", 39);
+    conf.setLong("hbase.hstore.blockingStoreFiles", 40);
+  }
+
+  private void setupTable(final TableName tableName) throws IOException {
+    // load the table
+    Table table = UTIL.createTable(tableName, FAMILY_NAME);
+    try {
+      rowCount = 0;
+      byte[] value = new byte[1024];
+      byte[] q = Bytes.toBytes("q");
+      while (rowCount < NUM_ROWS) {
+        Put put = new Put(Bytes.toBytes(String.format("%010d", rowCount)));
+        put.setDurability(Durability.SKIP_WAL);
+        put.add(FAMILY_NAME, q, value);
+        table.put(put);
+
+        if ((rowCount++ % ROW_PER_FILE) == 0) {
+          // flush it
+          UTIL.getHBaseAdmin().flush(tableName);
+        }
+      }
+    } finally {
+      UTIL.getHBaseAdmin().flush(tableName);
+      table.close();
+    }
+
+    assertEquals(NUM_ROWS, rowCount);
+
+    // get the store file paths
+    storeFiles.clear();
+    tableDir = FSUtils.getTableDir(getRootDir(), tableName);
+    FSVisitor.visitTableStoreFiles(getFileSystem(), tableDir, new FSVisitor.StoreFileVisitor() {
+      @Override
+      public void storeFile(final String region, final String family, final String hfile)
+          throws IOException {
+        HFileLink link = HFileLink.build(UTIL.getConfiguration(), tableName, region, family, hfile);
+        storeFiles.add(link.getOriginPath());
+      }
+    });
+    assertTrue("expected at least 1 store file", storeFiles.size() > 0);
+    LOG.info("store-files: " + storeFiles);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    setupConf(UTIL.getConfiguration());
+    UTIL.startMiniCluster(2, 3);
+
+    setupTable(TEST_TABLE.getTableName());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      UTIL.shutdownMiniCluster();
+    } catch (Exception e) {
+      LOG.warn("failure shutting down cluster", e);
+    }
+  }
+
+  @Test(timeout=180000)
+  public void testLosingFileDuringScan() throws Exception {
+    assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
+
+    final FileSystem fs = getFileSystem();
+    final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+    // try to query with the missing file
+    int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
+      private boolean hasFile = true;
+
+      @Override
+      public void beforeScanNext(Table table) throws Exception {
+        // move the path away (now the region is corrupted)
+        if (hasFile) {
+          fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+          LOG.info("Move file to local");
+          evictHFileCache(storeFiles.get(0));
+          hasFile = false;
+        }
+      }
+    });
+    assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
+               count >= (NUM_ROWS - ROW_PER_FILE));
+  }
+
+  @Test(timeout=180000)
+  public void testLosingFileAfterScannerInit() throws Exception {
+    assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
+
+    final FileSystem fs = getFileSystem();
+    final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+    // try to query with the missing file
+    int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
+      private boolean hasFile = true;
+
+      @Override
+      public void beforeScan(Table table, Scan scan) throws Exception {
+        // move the path away (now the region is corrupted)
+        if (hasFile) {
+          fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+          LOG.info("Move file to local");
+          evictHFileCache(storeFiles.get(0));
+          hasFile = false;
+        }
+      }
+    });
+    assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
+               count >= (NUM_ROWS - ROW_PER_FILE));
+  }
+
+  // ==========================================================================
+  //  Helpers
+  // ==========================================================================
+  private FileSystem getFileSystem() {
+    return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+  }
+
+  private Path getRootDir() {
+    return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+  }
+
+  private void evictHFileCache(final Path hfile) throws Exception {
+    for (RegionServerThread rst: UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+      HRegionServer rs = rst.getRegionServer();
+      rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName());
+    }
+    Thread.sleep(6000);
+  }
+
+  private int fullScanAndCount(final TableName tableName) throws Exception {
+    return fullScanAndCount(tableName, new ScanInjector());
+  }
+
+  private int fullScanAndCount(final TableName tableName, final ScanInjector injector)
+      throws Exception {
+    Table table = UTIL.getConnection().getTable(tableName);
+    int count = 0;
+    try {
+      Scan scan = new Scan();
+      scan.setCaching(1);
+      scan.setCacheBlocks(false);
+      injector.beforeScan(table, scan);
+      ResultScanner scanner = table.getScanner(scan);
+      try {
+        while (true) {
+          injector.beforeScanNext(table);
+          Result result = scanner.next();
+          injector.afterScanNext(table, result);
+          if (result == null) break;
+          if ((count++ % (ROW_PER_FILE / 2)) == 0) {
+            LOG.debug("scan next " + count);
+          }
+        }
+      } finally {
+        scanner.close();
+        injector.afterScan(table);
+      }
+    } finally {
+      table.close();
+    }
+    return count;
+  }
+
+  private class ScanInjector {
+    protected void beforeScan(Table table, Scan scan) throws Exception {}
+    protected void beforeScanNext(Table table) throws Exception {}
+    protected void afterScanNext(Table table, Result result) throws Exception {}
+    protected void afterScan(Table table) throws Exception {}
+  }
+}


[4/4] hbase git commit: HBASE-13651 Handle StoreFileScanner FileNotFoundExceptin

Posted by mb...@apache.org.
HBASE-13651 Handle StoreFileScanner FileNotFoundExceptin


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ca254ed5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ca254ed5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ca254ed5

Branch: refs/heads/0.94
Commit: ca254ed51514d2f14fc1f751778fb82df423aab8
Parents: fb43461
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri May 15 19:09:40 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri May 15 19:09:40 2015 +0100

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  37 ++-
 .../hbase/regionserver/HRegionServer.java       |   4 +
 .../apache/hadoop/hbase/regionserver/Store.java |   1 +
 .../hbase/regionserver/StoreFileScanner.java    |   7 +
 .../TestCorruptedRegionStoreFile.java           | 244 +++++++++++++++++++
 5 files changed, 285 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ca254ed5/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 5da00dc..8f20376 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -4024,7 +4024,13 @@ public class HRegion implements HeapSize { // , Writable{
       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
           scan.getFamilyMap().entrySet()) {
         Store store = stores.get(entry.getKey());
-        KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
+        KeyValueScanner scanner;
+        try {
+          scanner = store.getScanner(scan, entry.getValue());
+        } catch (FileNotFoundException e) {
+          abortRegionServer(e.getMessage());
+          throw new NotServingRegionException(regionInfo.getRegionNameAsString() + " is closing");
+        }
         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
           || FilterBase.isFamilyEssential(this.filter, entry.getKey())) {
           scanners.add(scanner);
@@ -4152,13 +4158,18 @@ public class HRegion implements HeapSize { // , Writable{
     private KeyValue populateResult(List<KeyValue> results, KeyValueHeap heap, int limit,
         byte[] currentRow, int offset, short length, String metric) throws IOException {
       KeyValue nextKv;
-      do {
-        heap.next(results, limit - results.size(), metric);
-        if (limit > 0 && results.size() == limit) {
-          return KV_LIMIT;
-        }
-        nextKv = heap.peek();
-      } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+      try {
+        do {
+          heap.next(results, limit - results.size(), metric);
+          if (limit > 0 && results.size() == limit) {
+            return KV_LIMIT;
+          }
+          nextKv = heap.peek();
+        } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+      } catch (FileNotFoundException e) {
+        abortRegionServer(e.getMessage());
+        throw new NotServingRegionException(regionInfo.getRegionNameAsString() + " is closing");
+      }
       return nextKv;
     }
 
@@ -4368,6 +4379,9 @@ public class HRegion implements HeapSize { // , Writable{
         if (this.joinedHeap != null) {
           result = this.joinedHeap.requestSeek(kv, true, true) || result;
         }
+      } catch (FileNotFoundException e) {
+        abortRegionServer(e.getMessage());
+        throw new NotServingRegionException(regionInfo.getRegionNameAsString() + " is closing");
       } finally {
         closeRegionOperation();
       }
@@ -6034,6 +6048,13 @@ public class HRegion implements HeapSize { // , Writable{
     lock.readLock().unlock();
   }
 
+  public void abortRegionServer(String msg) throws IOException {
+    RegionServerServices rs = getRegionServerServices();
+    if (rs instanceof HRegionServer) {
+      ((HRegionServer)rs).abort(msg);
+    }
+  }
+
   /**
    * This method needs to be called before any public call that reads or
    * modifies stores in bulk. It has to be called just before a try.

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca254ed5/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index eadc9e8..5499135 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -3446,6 +3446,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
     return conf;
   }
 
+  public CacheConfig getCacheConfig() {
+    return cacheConfig;
+  }
+
   /** @return the write lock for the server */
   ReentrantReadWriteLock.WriteLock getWriteLock() {
     return lock.writeLock();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca254ed5/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 7424788..3bd9f11 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -19,6 +19,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca254ed5/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index b055630..1594a0a 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -20,6 +20,7 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -129,6 +130,8 @@ public class StoreFileScanner implements KeyValueScanner {
         if (hasMVCCInfo)
           skipKVsNewerThanReadpoint();
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch(IOException e) {
       throw new IOException("Could not iterate " + this, e);
     }
@@ -151,6 +154,8 @@ public class StoreFileScanner implements KeyValueScanner {
       } finally {
         realSeekDone = true;
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch (IOException ioe) {
       throw new IOException("Could not seek " + this + " to key " + key, ioe);
     }
@@ -171,6 +176,8 @@ public class StoreFileScanner implements KeyValueScanner {
       } finally {
         realSeekDone = true;
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch (IOException ioe) {
       throw new IOException("Could not reseek " + this + " to key " + key,
           ioe);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca254ed5/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
new file mode 100644
index 0000000..c4e0a7d
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FSVisitor;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestCorruptedRegionStoreFile {
+  private static final Log LOG = LogFactory.getLog(TestCorruptedRegionStoreFile.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static final String FAMILY_NAME_STR = "f";
+  private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
+
+  private static final int NUM_FILES = 25;
+  private static final int ROW_PER_FILE = 2000;
+  private static final int NUM_ROWS = NUM_FILES * ROW_PER_FILE;
+
+  private final ArrayList<Path> storeFiles = new ArrayList<Path>();
+  private Path tableDir;
+  private int rowCount;
+
+  private static void setupConf(Configuration conf) {
+    conf.setLong("hbase.hstore.compaction.min", 20);
+    conf.setLong("hbase.hstore.compaction.max", 39);
+    conf.setLong("hbase.hstore.blockingStoreFiles", 40);
+  }
+
+  private void setupTable(final String tableName) throws Exception {
+    // load the table
+    HTable table = UTIL.createTable(Bytes.toBytes(tableName), FAMILY_NAME);
+    try {
+      rowCount = 0;
+      byte[] value = new byte[1024];
+      byte[] q = Bytes.toBytes("q");
+      while (rowCount < NUM_ROWS) {
+        Put put = new Put(Bytes.toBytes(String.format("%010d", rowCount)));
+        put.setDurability(Durability.SKIP_WAL);
+        put.add(FAMILY_NAME, q, value);
+        table.put(put);
+
+        if ((rowCount++ % ROW_PER_FILE) == 0) {
+          // flush it
+          UTIL.getHBaseAdmin().flush(tableName);
+        }
+      }
+    } finally {
+      UTIL.getHBaseAdmin().flush(tableName);
+      table.close();
+    }
+
+    assertEquals(NUM_ROWS, rowCount);
+
+    // get the store file paths
+    storeFiles.clear();
+    tableDir = FSUtils.getTablePath(getRootDir(), tableName);
+    FSVisitor.visitTableStoreFiles(getFileSystem(), tableDir, new FSVisitor.StoreFileVisitor() {
+      @Override
+      public void storeFile(final String region, final String family, final String hfile)
+          throws IOException {
+        HFileLink link = HFileLink.create(UTIL.getConfiguration(), tableName, region, family, hfile);
+        storeFiles.add(link.getOriginPath());
+      }
+    });
+    assertTrue("expected at least 1 store file", storeFiles.size() > 0);
+    LOG.info("store-files: " + storeFiles);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    setupConf(UTIL.getConfiguration());
+    UTIL.startMiniCluster(2, 3);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      UTIL.shutdownMiniCluster();
+    } catch (Exception e) {
+      LOG.warn("failure shutting down cluster", e);
+    }
+  }
+
+  @Test(timeout=90000)
+  public void testLosingFileDuringScan() throws Exception {
+    final String tableName = "testLosingFileDuringScan";
+    setupTable(tableName);
+    assertEquals(rowCount, fullScanAndCount(tableName));
+
+    final FileSystem fs = getFileSystem();
+    final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+    // try to query with the missing file
+    int count = fullScanAndCount(tableName, new ScanInjector() {
+      private boolean hasFile = true;
+
+      @Override
+      public void beforeScanNext(HTable table) throws Exception {
+        // move the path away (now the region is corrupted)
+        if (hasFile) {
+          fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+          LOG.info("Move file to local");
+          evictHFileCache(storeFiles.get(0));
+          hasFile = false;
+        }
+      }
+    });
+    assertTrue("expected one file lost: rowCount=" + count, count >= (NUM_ROWS - ROW_PER_FILE));
+  }
+
+  @Test(timeout=90000)
+  public void testLosingFileAfterScannerInit() throws Exception {
+    final String tableName = "testLosingFileAfterScannerInit";
+    setupTable(tableName);
+    assertEquals(rowCount, fullScanAndCount(tableName));
+
+    final FileSystem fs = getFileSystem();
+    final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+    // try to query with the missing file
+    int count = fullScanAndCount(tableName, new ScanInjector() {
+      private boolean hasFile = true;
+
+      @Override
+      public void beforeScan(HTable table, Scan scan) throws Exception {
+        // move the path away (now the region is corrupted)
+        if (hasFile) {
+          fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+          LOG.info("Move file to local");
+          evictHFileCache(storeFiles.get(0));
+          hasFile = false;
+        }
+      }
+    });
+    assertTrue("expected one file lost: rowCount=" + count, count >= (NUM_ROWS - ROW_PER_FILE));
+  }
+
+  // ==========================================================================
+  //  Helpers
+  // ==========================================================================
+  private FileSystem getFileSystem() {
+    return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+  }
+
+  private Path getRootDir() {
+    return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+  }
+
+  private void evictHFileCache(final Path hfile) throws Exception {
+    for (RegionServerThread rst: UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+      HRegionServer rs = rst.getRegionServer();
+      rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName());
+    }
+    Thread.sleep(6000);
+  }
+
+  private int fullScanAndCount(final String tableName) throws Exception {
+    return fullScanAndCount(tableName, new ScanInjector());
+  }
+
+  private int fullScanAndCount(final String tableName, final ScanInjector injector)
+      throws Exception {
+    HTable table = new HTable(UTIL.getConfiguration(), tableName);
+    int count = 0;
+    try {
+      Scan scan = new Scan();
+      scan.setCaching(1);
+      scan.setCacheBlocks(false);
+      injector.beforeScan(table, scan);
+      ResultScanner scanner = table.getScanner(scan);
+      try {
+        while (true) {
+          injector.beforeScanNext(table);
+          Result result = scanner.next();
+          injector.afterScanNext(table, result);
+          if (result == null) break;
+          if ((count++ % 1000) == 0) {
+            LOG.debug("scan next " + count);
+          }
+        }
+      } finally {
+        scanner.close();
+        injector.afterScan(table);
+      }
+    } finally {
+      table.close();
+    }
+    return count;
+  }
+
+  private class ScanInjector {
+    protected void beforeScan(HTable table, Scan scan) throws Exception {}
+    protected void beforeScanNext(HTable table) throws Exception {}
+    protected void afterScanNext(HTable table, Result result) throws Exception {}
+    protected void afterScan(HTable table) throws Exception {}
+  }
+}


[3/4] hbase git commit: HBASE-13651 Handle StoreFileScanner FileNotFoundExceptin

Posted by mb...@apache.org.
HBASE-13651 Handle StoreFileScanner FileNotFoundExceptin


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/42e3e37e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/42e3e37e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/42e3e37e

Branch: refs/heads/0.98
Commit: 42e3e37ee3b3d3fc3d348e3888f07237b680e594
Parents: 2810ba6
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri May 15 19:08:42 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri May 15 19:08:42 2015 +0100

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  39 ++-
 .../apache/hadoop/hbase/regionserver/Store.java |   1 +
 .../hbase/regionserver/StoreFileScanner.java    |   7 +
 .../TestCorruptedRegionStoreFile.java           | 250 +++++++++++++++++++
 4 files changed, 288 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/42e3e37e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 8148da5..14a6ae8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.EOFException;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.UnsupportedEncodingException;
@@ -3971,7 +3972,13 @@ public class HRegion implements HeapSize { // , Writable{
       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
           scan.getFamilyMap().entrySet()) {
         Store store = stores.get(entry.getKey());
-        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+        KeyValueScanner scanner;
+        try {
+          scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+        } catch (FileNotFoundException e) {
+          abortRegionServer(e.getMessage());
+          throw new NotServingRegionException(region.getRegionNameAsString() + " is closing");
+        }
         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
           || this.filter.isFamilyEssential(entry.getKey())) {
           scanners.add(scanner);
@@ -4105,14 +4112,18 @@ public class HRegion implements HeapSize { // , Writable{
     private KeyValue populateResult(List<Cell> results, KeyValueHeap heap, int limit,
         byte[] currentRow, int offset, short length) throws IOException {
       KeyValue nextKv;
-      do {
-        heap.next(results, limit - results.size());
-        if (limit > 0 && results.size() == limit) {
-          return KV_LIMIT;
-        }
-        nextKv = heap.peek();
-      } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
-
+      try {
+        do {
+          heap.next(results, limit - results.size());
+          if (limit > 0 && results.size() == limit) {
+            return KV_LIMIT;
+          }
+          nextKv = heap.peek();
+        } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+      } catch (FileNotFoundException e) {
+        abortRegionServer(e.getMessage());
+        throw new NotServingRegionException(region.getRegionNameAsString() + " is closing");
+      }
       return nextKv;
     }
 
@@ -4336,6 +4347,9 @@ public class HRegion implements HeapSize { // , Writable{
         if (this.joinedHeap != null) {
           result = this.joinedHeap.requestSeek(kv, true, true) || result;
         }
+      } catch (FileNotFoundException e) {
+        abortRegionServer(e.getMessage());
+        throw new NotServingRegionException(region.getRegionNameAsString() + " is closing");
       } finally {
         closeRegionOperation();
       }
@@ -6078,6 +6092,13 @@ public class HRegion implements HeapSize { // , Writable{
     this.coprocessorHost = coprocessorHost;
   }
 
+  public void abortRegionServer(String msg) throws IOException {
+    RegionServerServices rs = getRegionServerServices();
+    if (rs instanceof HRegionServer) {
+      ((HRegionServer)rs).abort(msg);
+    }
+  }
+
   /**
    * This method needs to be called before any public call that reads or
    * modifies data. It has to be called just before a try.

http://git-wip-us.apache.org/repos/asf/hbase/blob/42e3e37e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index a116a4f..c9d101f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/hbase/blob/42e3e37e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index ac3d12f..e4116fd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -138,6 +139,8 @@ public class StoreFileScanner implements KeyValueScanner {
         if (hasMVCCInfo)
           skipKVsNewerThanReadpoint();
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch(IOException e) {
       throw new IOException("Could not iterate " + this, e);
     }
@@ -160,6 +163,8 @@ public class StoreFileScanner implements KeyValueScanner {
       } finally {
         realSeekDone = true;
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch (IOException ioe) {
       throw new IOException("Could not seek " + this + " to key " + key, ioe);
     }
@@ -180,6 +185,8 @@ public class StoreFileScanner implements KeyValueScanner {
       } finally {
         realSeekDone = true;
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch (IOException ioe) {
       throw new IOException("Could not reseek " + this + " to key " + key,
           ioe);

http://git-wip-us.apache.org/repos/asf/hbase/blob/42e3e37e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
new file mode 100644
index 0000000..d9d00f5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FSVisitor;
+import org.apache.hadoop.hbase.util.TestTableName;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category(LargeTests.class)
+public class TestCorruptedRegionStoreFile {
+  private static final Log LOG = LogFactory.getLog(TestCorruptedRegionStoreFile.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static final String FAMILY_NAME_STR = "f";
+  private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
+
+  private static final int NUM_FILES = 25;
+  private static final int ROW_PER_FILE = 2000;
+  private static final int NUM_ROWS = NUM_FILES * ROW_PER_FILE;
+
+  @Rule public TestTableName TEST_TABLE = new TestTableName();
+
+  private final ArrayList<Path> storeFiles = new ArrayList<Path>();
+  private Path tableDir;
+  private int rowCount;
+
+  private static void setupConf(Configuration conf) {
+    conf.setLong("hbase.hstore.compaction.min", 20);
+    conf.setLong("hbase.hstore.compaction.max", 39);
+    conf.setLong("hbase.hstore.blockingStoreFiles", 40);
+  }
+
+  private void setupTable(final TableName tableName) throws Exception {
+    // load the table
+    String tableNameStr = tableName.getNameAsString();
+    HTable table = UTIL.createTable(tableName, FAMILY_NAME);
+    try {
+      rowCount = 0;
+      byte[] value = new byte[1024];
+      byte[] q = Bytes.toBytes("q");
+      while (rowCount < NUM_ROWS) {
+        Put put = new Put(Bytes.toBytes(String.format("%010d", rowCount)));
+        put.setDurability(Durability.SKIP_WAL);
+        put.add(FAMILY_NAME, q, value);
+        table.put(put);
+
+        if ((rowCount++ % ROW_PER_FILE) == 0) {
+          // flush it
+          UTIL.getHBaseAdmin().flush(tableNameStr);
+        }
+      }
+    } finally {
+      UTIL.getHBaseAdmin().flush(tableNameStr);
+      table.close();
+    }
+
+    assertEquals(NUM_ROWS, rowCount);
+
+    // get the store file paths
+    storeFiles.clear();
+    tableDir = FSUtils.getTableDir(getRootDir(), tableName);
+    FSVisitor.visitTableStoreFiles(getFileSystem(), tableDir, new FSVisitor.StoreFileVisitor() {
+      @Override
+      public void storeFile(final String region, final String family, final String hfile)
+          throws IOException {
+        HFileLink link = HFileLink.create(UTIL.getConfiguration(), tableName, region, family, hfile);
+        storeFiles.add(link.getOriginPath());
+      }
+    });
+    assertTrue("expected at least 1 store file", storeFiles.size() > 0);
+    LOG.info("store-files: " + storeFiles);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    setupConf(UTIL.getConfiguration());
+    UTIL.startMiniCluster(2, 3);
+
+    setupTable(TEST_TABLE.getTableName());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      UTIL.shutdownMiniCluster();
+    } catch (Exception e) {
+      LOG.warn("failure shutting down cluster", e);
+    }
+  }
+
+  @Test(timeout=180000)
+  public void testLosingFileDuringScan() throws Exception {
+    assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
+
+    final FileSystem fs = getFileSystem();
+    final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+    // try to query with the missing file
+    int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
+      private boolean hasFile = true;
+
+      @Override
+      public void beforeScanNext(HTable table) throws Exception {
+        // move the path away (now the region is corrupted)
+        if (hasFile) {
+          fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+          LOG.info("Move file to local");
+          evictHFileCache(storeFiles.get(0));
+          hasFile = false;
+        }
+      }
+    });
+    assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
+               count >= (NUM_ROWS - ROW_PER_FILE));
+  }
+
+  @Test(timeout=180000)
+  public void testLosingFileAfterScannerInit() throws Exception {
+    assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
+
+    final FileSystem fs = getFileSystem();
+    final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+    // try to query with the missing file
+    int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
+      private boolean hasFile = true;
+
+      @Override
+      public void beforeScan(HTable table, Scan scan) throws Exception {
+        // move the path away (now the region is corrupted)
+        if (hasFile) {
+          fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+          LOG.info("Move file to local");
+          evictHFileCache(storeFiles.get(0));
+          hasFile = false;
+        }
+      }
+    });
+    assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
+               count >= (NUM_ROWS - ROW_PER_FILE));
+  }
+
+  // ==========================================================================
+  //  Helpers
+  // ==========================================================================
+  private FileSystem getFileSystem() {
+    return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+  }
+
+  private Path getRootDir() {
+    return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+  }
+
+  private void evictHFileCache(final Path hfile) throws Exception {
+    for (RegionServerThread rst: UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+      HRegionServer rs = rst.getRegionServer();
+      rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName());
+    }
+    Thread.sleep(6000);
+  }
+
+  private int fullScanAndCount(final TableName tableName) throws Exception {
+    return fullScanAndCount(tableName, new ScanInjector());
+  }
+
+  private int fullScanAndCount(final TableName tableName, final ScanInjector injector)
+      throws Exception {
+    HTable table = new HTable(UTIL.getConfiguration(), tableName);
+    int count = 0;
+    try {
+      Scan scan = new Scan();
+      scan.setCaching(1);
+      scan.setCacheBlocks(false);
+      injector.beforeScan(table, scan);
+      ResultScanner scanner = table.getScanner(scan);
+      try {
+        while (true) {
+          injector.beforeScanNext(table);
+          Result result = scanner.next();
+          injector.afterScanNext(table, result);
+          if (result == null) break;
+          if ((count++ % (ROW_PER_FILE / 2)) == 0) {
+            LOG.debug("scan next " + count);
+          }
+        }
+      } finally {
+        scanner.close();
+        injector.afterScan(table);
+      }
+    } finally {
+      table.close();
+    }
+    return count;
+  }
+
+  private class ScanInjector {
+    protected void beforeScan(HTable table, Scan scan) throws Exception {}
+    protected void beforeScanNext(HTable table) throws Exception {}
+    protected void afterScanNext(HTable table, Result result) throws Exception {}
+    protected void afterScan(HTable table) throws Exception {}
+  }
+}