You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/05/15 20:15:16 UTC
[1/4] hbase git commit: HBASE-13651 Handle StoreFileScanner
FileNotFoundException
Repository: hbase
Updated Branches:
refs/heads/0.94 fb4346177 -> ca254ed51
refs/heads/0.98 2810ba63a -> 42e3e37ee
refs/heads/branch-1 41aceca80 -> 6968834c9
refs/heads/master 9ba7337ac -> fec091a80
HBASE-13651 Handle StoreFileScanner FileNotFoundException
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fec091a8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fec091a8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fec091a8
Branch: refs/heads/master
Commit: fec091a8073ab000e47120c244238f1b3642d560
Parents: 9ba7337
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri May 15 18:54:37 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri May 15 18:57:58 2015 +0100
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 112 ++++++---
.../hbase/regionserver/StoreFileScanner.java | 9 +
.../TestCorruptedRegionStoreFile.java | 250 +++++++++++++++++++
3 files changed, 333 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fec091a8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 5c117f0..3994027 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1103,7 +1103,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public long getNumMutationsWithoutWAL() {
return numMutationsWithoutWAL.get();
}
-
+
@Override
public long getDataInMemoryWithoutWAL() {
return dataInMemoryWithoutWAL.get();
@@ -2360,7 +2360,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
LOG.info(msg);
status.setStatus(msg);
- return new FlushResultImpl(compactionRequested ?
+ return new FlushResultImpl(compactionRequested ?
FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED,
flushOpSeqId);
@@ -4713,7 +4713,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public boolean refreshStoreFiles() throws IOException {
- if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+ return refreshStoreFiles(false);
+ }
+
+ protected boolean refreshStoreFiles(boolean force) throws IOException {
+ if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
return false; // if primary nothing to do
}
@@ -5192,15 +5196,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* If the joined heap data gathering is interrupted due to scan limits, this will
* contain the row for which we are populating the values.*/
protected Cell joinedContinuationRow = null;
+ private boolean filterClosed = false;
+
+ protected final int isScan;
protected final byte[] stopRow;
+ protected final HRegion region;
+ protected final CellComparator comparator;
+
+ private final long readPt;
+ private final long maxResultSize;
+ private final ScannerContext defaultScannerContext;
private final FilterWrapper filter;
- private ScannerContext defaultScannerContext;
- protected int isScan;
- private boolean filterClosed = false;
- private long readPt;
- private long maxResultSize;
- protected HRegion region;
- protected CellComparator comparator;
@Override
public HRegionInfo getRegionInfo() {
@@ -5209,7 +5215,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
throws IOException {
-
this.region = region;
this.maxResultSize = scan.getMaxResultSize();
if (scan.hasFilter()) {
@@ -5251,10 +5256,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
scanners.addAll(additionalScanners);
}
- for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
- scan.getFamilyMap().entrySet()) {
+ for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
- KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+ KeyValueScanner scanner;
+ try {
+ scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+ } catch (FileNotFoundException e) {
+ throw handleFileNotFound(e);
+ }
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner);
@@ -5346,7 +5355,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
moreValues = nextInternal(tmpList, scannerContext);
outResults.addAll(tmpList);
}
-
+
// If the size limit was reached it means a partial Result is being returned. Returning a
// partial Result means that we should not reset the filters; filters should only be reset in
// between rows
@@ -5397,30 +5406,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean tmpKeepProgress = scannerContext.getKeepProgress();
// Scanning between column families and thus the scope is between cells
LimitScope limitScope = LimitScope.BETWEEN_CELLS;
- do {
- // We want to maintain any progress that is made towards the limits while scanning across
- // different column families. To do this, we toggle the keep progress flag on during calls
- // to the StoreScanner to ensure that any progress made thus far is not wiped away.
- scannerContext.setKeepProgress(true);
- heap.next(results, scannerContext);
- scannerContext.setKeepProgress(tmpKeepProgress);
-
- nextKv = heap.peek();
- moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
-
- if (scannerContext.checkBatchLimit(limitScope)) {
- return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
- } else if (scannerContext.checkSizeLimit(limitScope)) {
- ScannerContext.NextState state =
+ try {
+ do {
+ // We want to maintain any progress that is made towards the limits while scanning across
+ // different column families. To do this, we toggle the keep progress flag on during calls
+ // to the StoreScanner to ensure that any progress made thus far is not wiped away.
+ scannerContext.setKeepProgress(true);
+ heap.next(results, scannerContext);
+ scannerContext.setKeepProgress(tmpKeepProgress);
+
+ nextKv = heap.peek();
+ moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
+
+ if (scannerContext.checkBatchLimit(limitScope)) {
+ return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
+ } else if (scannerContext.checkSizeLimit(limitScope)) {
+ ScannerContext.NextState state =
moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
- return scannerContext.setScannerState(state).hasMoreValues();
- } else if (scannerContext.checkTimeLimit(limitScope)) {
- ScannerContext.NextState state =
+ return scannerContext.setScannerState(state).hasMoreValues();
+ } else if (scannerContext.checkTimeLimit(limitScope)) {
+ ScannerContext.NextState state =
moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
- return scannerContext.setScannerState(state).hasMoreValues();
- }
- } while (moreCellsInRow);
-
+ return scannerContext.setScannerState(state).hasMoreValues();
+ }
+ } while (moreCellsInRow);
+ } catch (FileNotFoundException e) {
+ throw handleFileNotFound(e);
+ }
return nextKv != null;
}
@@ -5748,18 +5760,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
boolean result = false;
startRegionOperation();
+ KeyValue kv = KeyValueUtil.createFirstOnRow(row);
try {
- KeyValue kv = KeyValueUtil.createFirstOnRow(row);
// use request seek to make use of the lazy seek option. See HBASE-5520
result = this.storeHeap.requestSeek(kv, true, true);
if (this.joinedHeap != null) {
result = this.joinedHeap.requestSeek(kv, true, true) || result;
}
+ } catch (FileNotFoundException e) {
+ throw handleFileNotFound(e);
} finally {
closeRegionOperation();
}
return result;
}
+
+ private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException {
+ // tries to refresh the store files, otherwise shutdown the RS.
+ // TODO: add support for abort() of a single region and trigger reassignment.
+ try {
+ region.refreshStoreFiles(true);
+ return new IOException("unable to read store file");
+ } catch (IOException e) {
+ String msg = "a store file got lost: " + fnfe.getMessage();
+ LOG.error("unable to refresh store files", e);
+ abortRegionServer(msg);
+ return new NotServingRegionException(
+ getRegionInfo().getRegionNameAsString() + " is closing");
+ }
+ }
+
+ private void abortRegionServer(String msg) throws IOException {
+ if (rsServices instanceof HRegionServer) {
+ ((HRegionServer)rsServices).abort(msg);
+ }
+ throw new UnsupportedOperationException("not able to abort RS after: " + msg);
+ }
}
// Utility methods
http://git-wip-us.apache.org/repos/asf/hbase/blob/fec091a8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index c0a5d5e..42a378d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -143,6 +144,8 @@ public class StoreFileScanner implements KeyValueScanner {
skipKVsNewerThanReadpoint();
}
}
+ } catch (FileNotFoundException e) {
+ throw e;
} catch(IOException e) {
throw new IOException("Could not iterate " + this, e);
}
@@ -169,6 +172,8 @@ public class StoreFileScanner implements KeyValueScanner {
} finally {
realSeekDone = true;
}
+ } catch (FileNotFoundException e) {
+ throw e;
} catch (IOException ioe) {
throw new IOException("Could not seek " + this + " to key " + key, ioe);
}
@@ -193,6 +198,8 @@ public class StoreFileScanner implements KeyValueScanner {
} finally {
realSeekDone = true;
}
+ } catch (FileNotFoundException e) {
+ throw e;
} catch (IOException ioe) {
throw new IOException("Could not reseek " + this + " to key " + key,
ioe);
@@ -451,6 +458,8 @@ public class StoreFileScanner implements KeyValueScanner {
} finally {
realSeekDone = true;
}
+ } catch (FileNotFoundException e) {
+ throw e;
} catch (IOException ioe) {
throw new IOException("Could not seekToPreviousRow " + this + " to key "
+ key, ioe);
http://git-wip-us.apache.org/repos/asf/hbase/blob/fec091a8/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
new file mode 100644
index 0000000..8c9312c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FSVisitor;
+import org.apache.hadoop.hbase.util.TestTableName;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, LargeTests.class})
+public class TestCorruptedRegionStoreFile {
+ private static final Log LOG = LogFactory.getLog(TestCorruptedRegionStoreFile.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static final String FAMILY_NAME_STR = "f";
+ private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
+
+ private static final int NUM_FILES = 25;
+ private static final int ROW_PER_FILE = 2000;
+ private static final int NUM_ROWS = NUM_FILES * ROW_PER_FILE;
+
+ @Rule public TestTableName TEST_TABLE = new TestTableName();
+
+ private final ArrayList<Path> storeFiles = new ArrayList<Path>();
+ private Path tableDir;
+ private int rowCount;
+
+ private static void setupConf(Configuration conf) {
+ conf.setLong("hbase.hstore.compaction.min", 20);
+ conf.setLong("hbase.hstore.compaction.max", 39);
+ conf.setLong("hbase.hstore.blockingStoreFiles", 40);
+ }
+
+ private void setupTable(final TableName tableName) throws IOException {
+ // load the table
+ Table table = UTIL.createTable(tableName, FAMILY_NAME);
+ try {
+ rowCount = 0;
+ byte[] value = new byte[1024];
+ byte[] q = Bytes.toBytes("q");
+ while (rowCount < NUM_ROWS) {
+ Put put = new Put(Bytes.toBytes(String.format("%010d", rowCount)));
+ put.setDurability(Durability.SKIP_WAL);
+ put.add(FAMILY_NAME, q, value);
+ table.put(put);
+
+ if ((rowCount++ % ROW_PER_FILE) == 0) {
+ // flush it
+ UTIL.getHBaseAdmin().flush(tableName);
+ }
+ }
+ } finally {
+ UTIL.getHBaseAdmin().flush(tableName);
+ table.close();
+ }
+
+ assertEquals(NUM_ROWS, rowCount);
+
+ // get the store file paths
+ storeFiles.clear();
+ tableDir = FSUtils.getTableDir(getRootDir(), tableName);
+ FSVisitor.visitTableStoreFiles(getFileSystem(), tableDir, new FSVisitor.StoreFileVisitor() {
+ @Override
+ public void storeFile(final String region, final String family, final String hfile)
+ throws IOException {
+ HFileLink link = HFileLink.build(UTIL.getConfiguration(), tableName, region, family, hfile);
+ storeFiles.add(link.getOriginPath());
+ }
+ });
+ assertTrue("expected at least 1 store file", storeFiles.size() > 0);
+ LOG.info("store-files: " + storeFiles);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster(2, 3);
+
+ setupTable(TEST_TABLE.getTableName());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ UTIL.shutdownMiniCluster();
+ } catch (Exception e) {
+ LOG.warn("failure shutting down cluster", e);
+ }
+ }
+
+ @Test(timeout=180000)
+ public void testLosingFileDuringScan() throws Exception {
+ assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
+
+ final FileSystem fs = getFileSystem();
+ final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+ // try to query with the missing file
+ int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
+ private boolean hasFile = true;
+
+ @Override
+ public void beforeScanNext(Table table) throws Exception {
+ // move the path away (now the region is corrupted)
+ if (hasFile) {
+ fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+ LOG.info("Move file to local");
+ evictHFileCache(storeFiles.get(0));
+ hasFile = false;
+ }
+ }
+ });
+ assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
+ count >= (NUM_ROWS - ROW_PER_FILE));
+ }
+
+ @Test(timeout=180000)
+ public void testLosingFileAfterScannerInit() throws Exception {
+ assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
+
+ final FileSystem fs = getFileSystem();
+ final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+ // try to query with the missing file
+ int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
+ private boolean hasFile = true;
+
+ @Override
+ public void beforeScan(Table table, Scan scan) throws Exception {
+ // move the path away (now the region is corrupted)
+ if (hasFile) {
+ fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+ LOG.info("Move file to local");
+ evictHFileCache(storeFiles.get(0));
+ hasFile = false;
+ }
+ }
+ });
+ assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
+ count >= (NUM_ROWS - ROW_PER_FILE));
+ }
+
+ // ==========================================================================
+ // Helpers
+ // ==========================================================================
+ private FileSystem getFileSystem() {
+ return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+ }
+
+ private Path getRootDir() {
+ return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+ }
+
+ private void evictHFileCache(final Path hfile) throws Exception {
+ for (RegionServerThread rst: UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+ HRegionServer rs = rst.getRegionServer();
+ rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName());
+ }
+ Thread.sleep(6000);
+ }
+
+ private int fullScanAndCount(final TableName tableName) throws Exception {
+ return fullScanAndCount(tableName, new ScanInjector());
+ }
+
+ private int fullScanAndCount(final TableName tableName, final ScanInjector injector)
+ throws Exception {
+ Table table = UTIL.getConnection().getTable(tableName);
+ int count = 0;
+ try {
+ Scan scan = new Scan();
+ scan.setCaching(1);
+ scan.setCacheBlocks(false);
+ injector.beforeScan(table, scan);
+ ResultScanner scanner = table.getScanner(scan);
+ try {
+ while (true) {
+ injector.beforeScanNext(table);
+ Result result = scanner.next();
+ injector.afterScanNext(table, result);
+ if (result == null) break;
+ if ((count++ % (ROW_PER_FILE / 2)) == 0) {
+ LOG.debug("scan next " + count);
+ }
+ }
+ } finally {
+ scanner.close();
+ injector.afterScan(table);
+ }
+ } finally {
+ table.close();
+ }
+ return count;
+ }
+
+ private class ScanInjector {
+ protected void beforeScan(Table table, Scan scan) throws Exception {}
+ protected void beforeScanNext(Table table) throws Exception {}
+ protected void afterScanNext(Table table, Result result) throws Exception {}
+ protected void afterScan(Table table) throws Exception {}
+ }
+}
[2/4] hbase git commit: HBASE-13651 Handle StoreFileScanner
FileNotFoundExceptin
Posted by mb...@apache.org.
HBASE-13651 Handle StoreFileScanner FileNotFoundExceptin
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6968834c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6968834c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6968834c
Branch: refs/heads/branch-1
Commit: 6968834c9c96c103e3a87f1be0dace49f2c9461e
Parents: 41aceca
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri May 15 19:04:22 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri May 15 19:04:22 2015 +0100
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 114 ++++++---
.../hbase/regionserver/StoreFileScanner.java | 9 +
.../TestCorruptedRegionStoreFile.java | 249 +++++++++++++++++++
3 files changed, 333 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6968834c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 66c09a8..1bb0865 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1108,7 +1108,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public long getNumMutationsWithoutWAL() {
return numMutationsWithoutWAL.get();
}
-
+
@Override
public long getDataInMemoryWithoutWAL() {
return dataInMemoryWithoutWAL.get();
@@ -2365,7 +2365,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
LOG.info(msg);
status.setStatus(msg);
- return new FlushResultImpl(compactionRequested ?
+ return new FlushResultImpl(compactionRequested ?
FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED,
flushOpSeqId);
@@ -4721,7 +4721,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public boolean refreshStoreFiles() throws IOException {
- if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+ return refreshStoreFiles(false);
+ }
+
+ protected boolean refreshStoreFiles(boolean force) throws IOException {
+ if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
return false; // if primary nothing to do
}
@@ -5200,14 +5204,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* If the joined heap data gathering is interrupted due to scan limits, this will
* contain the row for which we are populating the values.*/
protected Cell joinedContinuationRow = null;
+ private boolean filterClosed = false;
+
+ protected final int isScan;
protected final byte[] stopRow;
+ protected final HRegion region;
+
+ private final long readPt;
+ private final long maxResultSize;
+ private final ScannerContext defaultScannerContext;
private final FilterWrapper filter;
- private ScannerContext defaultScannerContext;
- protected int isScan;
- private boolean filterClosed = false;
- private long readPt;
- private long maxResultSize;
- protected HRegion region;
@Override
public HRegionInfo getRegionInfo() {
@@ -5216,7 +5222,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
throws IOException {
-
this.region = region;
this.maxResultSize = scan.getMaxResultSize();
if (scan.hasFilter()) {
@@ -5257,10 +5262,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
scanners.addAll(additionalScanners);
}
- for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
- scan.getFamilyMap().entrySet()) {
+ for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
- KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+ KeyValueScanner scanner;
+ try {
+ scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+ } catch (FileNotFoundException e) {
+ throw handleFileNotFound(e);
+ }
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner);
@@ -5352,7 +5361,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
moreValues = nextInternal(tmpList, scannerContext);
outResults.addAll(tmpList);
}
-
+
// If the size limit was reached it means a partial Result is being returned. Returning a
// partial Result means that we should not reset the filters; filters should only be reset in
// between rows
@@ -5403,30 +5412,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean tmpKeepProgress = scannerContext.getKeepProgress();
// Scanning between column families and thus the scope is between cells
LimitScope limitScope = LimitScope.BETWEEN_CELLS;
- do {
- // We want to maintain any progress that is made towards the limits while scanning across
- // different column families. To do this, we toggle the keep progress flag on during calls
- // to the StoreScanner to ensure that any progress made thus far is not wiped away.
- scannerContext.setKeepProgress(true);
- heap.next(results, scannerContext);
- scannerContext.setKeepProgress(tmpKeepProgress);
-
- nextKv = heap.peek();
- moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
-
- if (scannerContext.checkBatchLimit(limitScope)) {
- return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
- } else if (scannerContext.checkSizeLimit(limitScope)) {
- ScannerContext.NextState state =
- moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
- return scannerContext.setScannerState(state).hasMoreValues();
- } else if (scannerContext.checkTimeLimit(limitScope)) {
- ScannerContext.NextState state =
- moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
- return scannerContext.setScannerState(state).hasMoreValues();
- }
- } while (moreCellsInRow);
-
+ try {
+ do {
+ // We want to maintain any progress that is made towards the limits while scanning across
+ // different column families. To do this, we toggle the keep progress flag on during calls
+ // to the StoreScanner to ensure that any progress made thus far is not wiped away.
+ scannerContext.setKeepProgress(true);
+ heap.next(results, scannerContext);
+ scannerContext.setKeepProgress(tmpKeepProgress);
+
+ nextKv = heap.peek();
+ moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
+
+ if (scannerContext.checkBatchLimit(limitScope)) {
+ return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
+ } else if (scannerContext.checkSizeLimit(limitScope)) {
+ ScannerContext.NextState state =
+ moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
+ return scannerContext.setScannerState(state).hasMoreValues();
+ } else if (scannerContext.checkTimeLimit(limitScope)) {
+ ScannerContext.NextState state =
+ moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
+ return scannerContext.setScannerState(state).hasMoreValues();
+ }
+ } while (moreCellsInRow);
+ } catch (FileNotFoundException e) {
+ throw handleFileNotFound(e);
+ }
return nextKv != null;
}
@@ -5753,18 +5765,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
boolean result = false;
startRegionOperation();
+ KeyValue kv = KeyValueUtil.createFirstOnRow(row);
try {
- KeyValue kv = KeyValueUtil.createFirstOnRow(row);
// use request seek to make use of the lazy seek option. See HBASE-5520
result = this.storeHeap.requestSeek(kv, true, true);
if (this.joinedHeap != null) {
result = this.joinedHeap.requestSeek(kv, true, true) || result;
}
+ } catch (FileNotFoundException e) {
+ throw handleFileNotFound(e);
} finally {
closeRegionOperation();
}
return result;
}
+
+ private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException {
+ // tries to refresh the store files, otherwise shutdown the RS.
+ // TODO: add support for abort() of a single region and trigger reassignment.
+ try {
+ region.refreshStoreFiles(true);
+ return new IOException("unable to read store file");
+ } catch (IOException e) {
+ String msg = "a store file got lost: " + fnfe.getMessage();
+ LOG.error(msg);
+ LOG.error("unable to refresh store files", e);
+ abortRegionServer(msg);
+ return new NotServingRegionException(getRegionInfo().getRegionNameAsString() +" is closing");
+ }
+ }
+
+ private void abortRegionServer(String msg) throws IOException {
+ if (rsServices instanceof HRegionServer) {
+ ((HRegionServer)rsServices).abort(msg);
+ }
+ throw new UnsupportedOperationException("not able to abort RS after: " + msg);
+ }
}
// Utility methods
http://git-wip-us.apache.org/repos/asf/hbase/blob/6968834c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index c27b455..961352d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -142,6 +143,8 @@ public class StoreFileScanner implements KeyValueScanner {
skipKVsNewerThanReadpoint();
}
}
+ } catch (FileNotFoundException e) {
+ throw e;
} catch(IOException e) {
throw new IOException("Could not iterate " + this, e);
}
@@ -168,6 +171,8 @@ public class StoreFileScanner implements KeyValueScanner {
} finally {
realSeekDone = true;
}
+ } catch (FileNotFoundException e) {
+ throw e;
} catch (IOException ioe) {
throw new IOException("Could not seek " + this + " to key " + key, ioe);
}
@@ -192,6 +197,8 @@ public class StoreFileScanner implements KeyValueScanner {
} finally {
realSeekDone = true;
}
+ } catch (FileNotFoundException e) {
+ throw e;
} catch (IOException ioe) {
throw new IOException("Could not reseek " + this + " to key " + key,
ioe);
@@ -453,6 +460,8 @@ public class StoreFileScanner implements KeyValueScanner {
} finally {
realSeekDone = true;
}
+ } catch (FileNotFoundException e) {
+ throw e;
} catch (IOException ioe) {
throw new IOException("Could not seekToPreviousRow " + this + " to key "
+ key, ioe);
http://git-wip-us.apache.org/repos/asf/hbase/blob/6968834c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
new file mode 100644
index 0000000..dce19d6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FSVisitor;
+import org.apache.hadoop.hbase.util.TestTableName;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category(LargeTests.class)
+public class TestCorruptedRegionStoreFile {
+ private static final Log LOG = LogFactory.getLog(TestCorruptedRegionStoreFile.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static final String FAMILY_NAME_STR = "f";
+ private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
+
+ private static final int NUM_FILES = 25;
+ private static final int ROW_PER_FILE = 2000;
+ private static final int NUM_ROWS = NUM_FILES * ROW_PER_FILE;
+
+ @Rule public TestTableName TEST_TABLE = new TestTableName();
+
+ private final ArrayList<Path> storeFiles = new ArrayList<Path>();
+ private Path tableDir;
+ private int rowCount;
+
+ private static void setupConf(Configuration conf) {
+ conf.setLong("hbase.hstore.compaction.min", 20);
+ conf.setLong("hbase.hstore.compaction.max", 39);
+ conf.setLong("hbase.hstore.blockingStoreFiles", 40);
+ }
+
+ private void setupTable(final TableName tableName) throws IOException {
+ // load the table
+ Table table = UTIL.createTable(tableName, FAMILY_NAME);
+ try {
+ rowCount = 0;
+ byte[] value = new byte[1024];
+ byte[] q = Bytes.toBytes("q");
+ while (rowCount < NUM_ROWS) {
+ Put put = new Put(Bytes.toBytes(String.format("%010d", rowCount)));
+ put.setDurability(Durability.SKIP_WAL);
+ put.add(FAMILY_NAME, q, value);
+ table.put(put);
+
+ if ((rowCount++ % ROW_PER_FILE) == 0) {
+ // flush it
+ UTIL.getHBaseAdmin().flush(tableName);
+ }
+ }
+ } finally {
+ UTIL.getHBaseAdmin().flush(tableName);
+ table.close();
+ }
+
+ assertEquals(NUM_ROWS, rowCount);
+
+ // get the store file paths
+ storeFiles.clear();
+ tableDir = FSUtils.getTableDir(getRootDir(), tableName);
+ FSVisitor.visitTableStoreFiles(getFileSystem(), tableDir, new FSVisitor.StoreFileVisitor() {
+ @Override
+ public void storeFile(final String region, final String family, final String hfile)
+ throws IOException {
+ HFileLink link = HFileLink.build(UTIL.getConfiguration(), tableName, region, family, hfile);
+ storeFiles.add(link.getOriginPath());
+ }
+ });
+ assertTrue("expected at least 1 store file", storeFiles.size() > 0);
+ LOG.info("store-files: " + storeFiles);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster(2, 3);
+
+ setupTable(TEST_TABLE.getTableName());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ UTIL.shutdownMiniCluster();
+ } catch (Exception e) {
+ LOG.warn("failure shutting down cluster", e);
+ }
+ }
+
+ @Test(timeout=180000)
+ public void testLosingFileDuringScan() throws Exception {
+ assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
+
+ final FileSystem fs = getFileSystem();
+ final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+ // try to query with the missing file
+ int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
+ private boolean hasFile = true;
+
+ @Override
+ public void beforeScanNext(Table table) throws Exception {
+ // move the path away (now the region is corrupted)
+ if (hasFile) {
+ fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+ LOG.info("Move file to local");
+ evictHFileCache(storeFiles.get(0));
+ hasFile = false;
+ }
+ }
+ });
+ assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
+ count >= (NUM_ROWS - ROW_PER_FILE));
+ }
+
+ @Test(timeout=180000)
+ public void testLosingFileAfterScannerInit() throws Exception {
+ assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
+
+ final FileSystem fs = getFileSystem();
+ final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+ // try to query with the missing file
+ int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
+ private boolean hasFile = true;
+
+ @Override
+ public void beforeScan(Table table, Scan scan) throws Exception {
+ // move the path away (now the region is corrupted)
+ if (hasFile) {
+ fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+ LOG.info("Move file to local");
+ evictHFileCache(storeFiles.get(0));
+ hasFile = false;
+ }
+ }
+ });
+ assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
+ count >= (NUM_ROWS - ROW_PER_FILE));
+ }
+
+ // ==========================================================================
+ // Helpers
+ // ==========================================================================
+ private FileSystem getFileSystem() {
+ return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+ }
+
+ private Path getRootDir() {
+ return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+ }
+
+ private void evictHFileCache(final Path hfile) throws Exception {
+ for (RegionServerThread rst: UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+ HRegionServer rs = rst.getRegionServer();
+ rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName());
+ }
+ Thread.sleep(6000);
+ }
+
+ private int fullScanAndCount(final TableName tableName) throws Exception {
+ return fullScanAndCount(tableName, new ScanInjector());
+ }
+
+ private int fullScanAndCount(final TableName tableName, final ScanInjector injector)
+ throws Exception {
+ Table table = UTIL.getConnection().getTable(tableName);
+ int count = 0;
+ try {
+ Scan scan = new Scan();
+ scan.setCaching(1);
+ scan.setCacheBlocks(false);
+ injector.beforeScan(table, scan);
+ ResultScanner scanner = table.getScanner(scan);
+ try {
+ while (true) {
+ injector.beforeScanNext(table);
+ Result result = scanner.next();
+ injector.afterScanNext(table, result);
+ if (result == null) break;
+ if ((count++ % (ROW_PER_FILE / 2)) == 0) {
+ LOG.debug("scan next " + count);
+ }
+ }
+ } finally {
+ scanner.close();
+ injector.afterScan(table);
+ }
+ } finally {
+ table.close();
+ }
+ return count;
+ }
+
+ private class ScanInjector {
+ protected void beforeScan(Table table, Scan scan) throws Exception {}
+ protected void beforeScanNext(Table table) throws Exception {}
+ protected void afterScanNext(Table table, Result result) throws Exception {}
+ protected void afterScan(Table table) throws Exception {}
+ }
+}
[4/4] hbase git commit: HBASE-13651 Handle StoreFileScanner
FileNotFoundExceptin
Posted by mb...@apache.org.
HBASE-13651 Handle StoreFileScanner FileNotFoundExceptin
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ca254ed5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ca254ed5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ca254ed5
Branch: refs/heads/0.94
Commit: ca254ed51514d2f14fc1f751778fb82df423aab8
Parents: fb43461
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri May 15 19:09:40 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri May 15 19:09:40 2015 +0100
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 37 ++-
.../hbase/regionserver/HRegionServer.java | 4 +
.../apache/hadoop/hbase/regionserver/Store.java | 1 +
.../hbase/regionserver/StoreFileScanner.java | 7 +
.../TestCorruptedRegionStoreFile.java | 244 +++++++++++++++++++
5 files changed, 285 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca254ed5/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 5da00dc..8f20376 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -4024,7 +4024,13 @@ public class HRegion implements HeapSize { // , Writable{
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
- KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
+ KeyValueScanner scanner;
+ try {
+ scanner = store.getScanner(scan, entry.getValue());
+ } catch (FileNotFoundException e) {
+ abortRegionServer(e.getMessage());
+ throw new NotServingRegionException(regionInfo.getRegionNameAsString() + " is closing");
+ }
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| FilterBase.isFamilyEssential(this.filter, entry.getKey())) {
scanners.add(scanner);
@@ -4152,13 +4158,18 @@ public class HRegion implements HeapSize { // , Writable{
private KeyValue populateResult(List<KeyValue> results, KeyValueHeap heap, int limit,
byte[] currentRow, int offset, short length, String metric) throws IOException {
KeyValue nextKv;
- do {
- heap.next(results, limit - results.size(), metric);
- if (limit > 0 && results.size() == limit) {
- return KV_LIMIT;
- }
- nextKv = heap.peek();
- } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+ try {
+ do {
+ heap.next(results, limit - results.size(), metric);
+ if (limit > 0 && results.size() == limit) {
+ return KV_LIMIT;
+ }
+ nextKv = heap.peek();
+ } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+ } catch (FileNotFoundException e) {
+ abortRegionServer(e.getMessage());
+ throw new NotServingRegionException(regionInfo.getRegionNameAsString() + " is closing");
+ }
return nextKv;
}
@@ -4368,6 +4379,9 @@ public class HRegion implements HeapSize { // , Writable{
if (this.joinedHeap != null) {
result = this.joinedHeap.requestSeek(kv, true, true) || result;
}
+ } catch (FileNotFoundException e) {
+ abortRegionServer(e.getMessage());
+ throw new NotServingRegionException(regionInfo.getRegionNameAsString() + " is closing");
} finally {
closeRegionOperation();
}
@@ -6034,6 +6048,13 @@ public class HRegion implements HeapSize { // , Writable{
lock.readLock().unlock();
}
+ public void abortRegionServer(String msg) throws IOException {
+ RegionServerServices rs = getRegionServerServices();
+ if (rs instanceof HRegionServer) {
+ ((HRegionServer)rs).abort(msg);
+ }
+ }
+
/**
* This method needs to be called before any public call that reads or
* modifies stores in bulk. It has to be called just before a try.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca254ed5/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index eadc9e8..5499135 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -3446,6 +3446,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
return conf;
}
+ public CacheConfig getCacheConfig() {
+ return cacheConfig;
+ }
+
/** @return the write lock for the server */
ReentrantReadWriteLock.WriteLock getWriteLock() {
return lock.writeLock();
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca254ed5/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 7424788..3bd9f11 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca254ed5/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index b055630..1594a0a 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -129,6 +130,8 @@ public class StoreFileScanner implements KeyValueScanner {
if (hasMVCCInfo)
skipKVsNewerThanReadpoint();
}
+ } catch (FileNotFoundException e) {
+ throw e;
} catch(IOException e) {
throw new IOException("Could not iterate " + this, e);
}
@@ -151,6 +154,8 @@ public class StoreFileScanner implements KeyValueScanner {
} finally {
realSeekDone = true;
}
+ } catch (FileNotFoundException e) {
+ throw e;
} catch (IOException ioe) {
throw new IOException("Could not seek " + this + " to key " + key, ioe);
}
@@ -171,6 +176,8 @@ public class StoreFileScanner implements KeyValueScanner {
} finally {
realSeekDone = true;
}
+ } catch (FileNotFoundException e) {
+ throw e;
} catch (IOException ioe) {
throw new IOException("Could not reseek " + this + " to key " + key,
ioe);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca254ed5/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
new file mode 100644
index 0000000..c4e0a7d
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FSVisitor;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestCorruptedRegionStoreFile {
+ private static final Log LOG = LogFactory.getLog(TestCorruptedRegionStoreFile.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static final String FAMILY_NAME_STR = "f";
+ private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
+
+ private static final int NUM_FILES = 25;
+ private static final int ROW_PER_FILE = 2000;
+ private static final int NUM_ROWS = NUM_FILES * ROW_PER_FILE;
+
+ private final ArrayList<Path> storeFiles = new ArrayList<Path>();
+ private Path tableDir;
+ private int rowCount;
+
+ private static void setupConf(Configuration conf) {
+ conf.setLong("hbase.hstore.compaction.min", 20);
+ conf.setLong("hbase.hstore.compaction.max", 39);
+ conf.setLong("hbase.hstore.blockingStoreFiles", 40);
+ }
+
+ private void setupTable(final String tableName) throws Exception {
+ // load the table
+ HTable table = UTIL.createTable(Bytes.toBytes(tableName), FAMILY_NAME);
+ try {
+ rowCount = 0;
+ byte[] value = new byte[1024];
+ byte[] q = Bytes.toBytes("q");
+ while (rowCount < NUM_ROWS) {
+ Put put = new Put(Bytes.toBytes(String.format("%010d", rowCount)));
+ put.setDurability(Durability.SKIP_WAL);
+ put.add(FAMILY_NAME, q, value);
+ table.put(put);
+
+ if ((rowCount++ % ROW_PER_FILE) == 0) {
+ // flush it
+ UTIL.getHBaseAdmin().flush(tableName);
+ }
+ }
+ } finally {
+ UTIL.getHBaseAdmin().flush(tableName);
+ table.close();
+ }
+
+ assertEquals(NUM_ROWS, rowCount);
+
+ // get the store file paths
+ storeFiles.clear();
+ tableDir = FSUtils.getTablePath(getRootDir(), tableName);
+ FSVisitor.visitTableStoreFiles(getFileSystem(), tableDir, new FSVisitor.StoreFileVisitor() {
+ @Override
+ public void storeFile(final String region, final String family, final String hfile)
+ throws IOException {
+ HFileLink link = HFileLink.create(UTIL.getConfiguration(), tableName, region, family, hfile);
+ storeFiles.add(link.getOriginPath());
+ }
+ });
+ assertTrue("expected at least 1 store file", storeFiles.size() > 0);
+ LOG.info("store-files: " + storeFiles);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster(2, 3);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ UTIL.shutdownMiniCluster();
+ } catch (Exception e) {
+ LOG.warn("failure shutting down cluster", e);
+ }
+ }
+
+ @Test(timeout=90000)
+ public void testLosingFileDuringScan() throws Exception {
+ final String tableName = "testLosingFileDuringScan";
+ setupTable(tableName);
+ assertEquals(rowCount, fullScanAndCount(tableName));
+
+ final FileSystem fs = getFileSystem();
+ final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+ // try to query with the missing file
+ int count = fullScanAndCount(tableName, new ScanInjector() {
+ private boolean hasFile = true;
+
+ @Override
+ public void beforeScanNext(HTable table) throws Exception {
+ // move the path away (now the region is corrupted)
+ if (hasFile) {
+ fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+ LOG.info("Move file to local");
+ evictHFileCache(storeFiles.get(0));
+ hasFile = false;
+ }
+ }
+ });
+ assertTrue("expected one file lost: rowCount=" + count, count >= (NUM_ROWS - ROW_PER_FILE));
+ }
+
+ @Test(timeout=90000)
+ public void testLosingFileAfterScannerInit() throws Exception {
+ final String tableName = "testLosingFileAfterScannerInit";
+ setupTable(tableName);
+ assertEquals(rowCount, fullScanAndCount(tableName));
+
+ final FileSystem fs = getFileSystem();
+ final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+ // try to query with the missing file
+ int count = fullScanAndCount(tableName, new ScanInjector() {
+ private boolean hasFile = true;
+
+ @Override
+ public void beforeScan(HTable table, Scan scan) throws Exception {
+ // move the path away (now the region is corrupted)
+ if (hasFile) {
+ fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+ LOG.info("Move file to local");
+ evictHFileCache(storeFiles.get(0));
+ hasFile = false;
+ }
+ }
+ });
+ assertTrue("expected one file lost: rowCount=" + count, count >= (NUM_ROWS - ROW_PER_FILE));
+ }
+
+ // ==========================================================================
+ // Helpers
+ // ==========================================================================
+ private FileSystem getFileSystem() {
+ return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+ }
+
+ private Path getRootDir() {
+ return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+ }
+
+ private void evictHFileCache(final Path hfile) throws Exception {
+ for (RegionServerThread rst: UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+ HRegionServer rs = rst.getRegionServer();
+ rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName());
+ }
+ Thread.sleep(6000);
+ }
+
+ private int fullScanAndCount(final String tableName) throws Exception {
+ return fullScanAndCount(tableName, new ScanInjector());
+ }
+
+ private int fullScanAndCount(final String tableName, final ScanInjector injector)
+ throws Exception {
+ HTable table = new HTable(UTIL.getConfiguration(), tableName);
+ int count = 0;
+ try {
+ Scan scan = new Scan();
+ scan.setCaching(1);
+ scan.setCacheBlocks(false);
+ injector.beforeScan(table, scan);
+ ResultScanner scanner = table.getScanner(scan);
+ try {
+ while (true) {
+ injector.beforeScanNext(table);
+ Result result = scanner.next();
+ injector.afterScanNext(table, result);
+ if (result == null) break;
+ if ((count++ % 1000) == 0) {
+ LOG.debug("scan next " + count);
+ }
+ }
+ } finally {
+ scanner.close();
+ injector.afterScan(table);
+ }
+ } finally {
+ table.close();
+ }
+ return count;
+ }
+
+ private class ScanInjector {
+ protected void beforeScan(HTable table, Scan scan) throws Exception {}
+ protected void beforeScanNext(HTable table) throws Exception {}
+ protected void afterScanNext(HTable table, Result result) throws Exception {}
+ protected void afterScan(HTable table) throws Exception {}
+ }
+}
[3/4] hbase git commit: HBASE-13651 Handle StoreFileScanner
FileNotFoundExceptin
Posted by mb...@apache.org.
HBASE-13651 Handle StoreFileScanner FileNotFoundExceptin
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/42e3e37e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/42e3e37e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/42e3e37e
Branch: refs/heads/0.98
Commit: 42e3e37ee3b3d3fc3d348e3888f07237b680e594
Parents: 2810ba6
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri May 15 19:08:42 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri May 15 19:08:42 2015 +0100
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 39 ++-
.../apache/hadoop/hbase/regionserver/Store.java | 1 +
.../hbase/regionserver/StoreFileScanner.java | 7 +
.../TestCorruptedRegionStoreFile.java | 250 +++++++++++++++++++
4 files changed, 288 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/42e3e37e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 8148da5..14a6ae8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.EOFException;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
@@ -3971,7 +3972,13 @@ public class HRegion implements HeapSize { // , Writable{
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
- KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+ KeyValueScanner scanner;
+ try {
+ scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+ } catch (FileNotFoundException e) {
+ abortRegionServer(e.getMessage());
+ throw new NotServingRegionException(region.getRegionNameAsString() + " is closing");
+ }
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner);
@@ -4105,14 +4112,18 @@ public class HRegion implements HeapSize { // , Writable{
private KeyValue populateResult(List<Cell> results, KeyValueHeap heap, int limit,
byte[] currentRow, int offset, short length) throws IOException {
KeyValue nextKv;
- do {
- heap.next(results, limit - results.size());
- if (limit > 0 && results.size() == limit) {
- return KV_LIMIT;
- }
- nextKv = heap.peek();
- } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
-
+ try {
+ do {
+ heap.next(results, limit - results.size());
+ if (limit > 0 && results.size() == limit) {
+ return KV_LIMIT;
+ }
+ nextKv = heap.peek();
+ } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+ } catch (FileNotFoundException e) {
+ abortRegionServer(e.getMessage());
+ throw new NotServingRegionException(region.getRegionNameAsString() + " is closing");
+ }
return nextKv;
}
@@ -4336,6 +4347,9 @@ public class HRegion implements HeapSize { // , Writable{
if (this.joinedHeap != null) {
result = this.joinedHeap.requestSeek(kv, true, true) || result;
}
+ } catch (FileNotFoundException e) {
+ abortRegionServer(e.getMessage());
+ throw new NotServingRegionException(region.getRegionNameAsString() + " is closing");
} finally {
closeRegionOperation();
}
@@ -6078,6 +6092,13 @@ public class HRegion implements HeapSize { // , Writable{
this.coprocessorHost = coprocessorHost;
}
+ public void abortRegionServer(String msg) throws IOException {
+ RegionServerServices rs = getRegionServerServices();
+ if (rs instanceof HRegionServer) {
+ ((HRegionServer)rs).abort(msg);
+ }
+ }
+
/**
* This method needs to be called before any public call that reads or
* modifies data. It has to be called just before a try.
http://git-wip-us.apache.org/repos/asf/hbase/blob/42e3e37e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index a116a4f..c9d101f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/hbase/blob/42e3e37e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index ac3d12f..e4116fd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -138,6 +139,8 @@ public class StoreFileScanner implements KeyValueScanner {
if (hasMVCCInfo)
skipKVsNewerThanReadpoint();
}
+ } catch (FileNotFoundException e) {
+ throw e;
} catch(IOException e) {
throw new IOException("Could not iterate " + this, e);
}
@@ -160,6 +163,8 @@ public class StoreFileScanner implements KeyValueScanner {
} finally {
realSeekDone = true;
}
+ } catch (FileNotFoundException e) {
+ throw e;
} catch (IOException ioe) {
throw new IOException("Could not seek " + this + " to key " + key, ioe);
}
@@ -180,6 +185,8 @@ public class StoreFileScanner implements KeyValueScanner {
} finally {
realSeekDone = true;
}
+ } catch (FileNotFoundException e) {
+ throw e;
} catch (IOException ioe) {
throw new IOException("Could not reseek " + this + " to key " + key,
ioe);
http://git-wip-us.apache.org/repos/asf/hbase/blob/42e3e37e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
new file mode 100644
index 0000000..d9d00f5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FSVisitor;
+import org.apache.hadoop.hbase.util.TestTableName;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category(LargeTests.class)
+public class TestCorruptedRegionStoreFile {
+ private static final Log LOG = LogFactory.getLog(TestCorruptedRegionStoreFile.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static final String FAMILY_NAME_STR = "f";
+ private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
+
+ private static final int NUM_FILES = 25;
+ private static final int ROW_PER_FILE = 2000;
+ private static final int NUM_ROWS = NUM_FILES * ROW_PER_FILE;
+
+ @Rule public TestTableName TEST_TABLE = new TestTableName();
+
+ private final ArrayList<Path> storeFiles = new ArrayList<Path>();
+ private Path tableDir;
+ private int rowCount;
+
+ private static void setupConf(Configuration conf) {
+ conf.setLong("hbase.hstore.compaction.min", 20);
+ conf.setLong("hbase.hstore.compaction.max", 39);
+ conf.setLong("hbase.hstore.blockingStoreFiles", 40);
+ }
+
+ private void setupTable(final TableName tableName) throws Exception {
+ // load the table
+ String tableNameStr = tableName.getNameAsString();
+ HTable table = UTIL.createTable(tableName, FAMILY_NAME);
+ try {
+ rowCount = 0;
+ byte[] value = new byte[1024];
+ byte[] q = Bytes.toBytes("q");
+ while (rowCount < NUM_ROWS) {
+ Put put = new Put(Bytes.toBytes(String.format("%010d", rowCount)));
+ put.setDurability(Durability.SKIP_WAL);
+ put.add(FAMILY_NAME, q, value);
+ table.put(put);
+
+ if ((rowCount++ % ROW_PER_FILE) == 0) {
+ // flush it
+ UTIL.getHBaseAdmin().flush(tableNameStr);
+ }
+ }
+ } finally {
+ UTIL.getHBaseAdmin().flush(tableNameStr);
+ table.close();
+ }
+
+ assertEquals(NUM_ROWS, rowCount);
+
+ // get the store file paths
+ storeFiles.clear();
+ tableDir = FSUtils.getTableDir(getRootDir(), tableName);
+ FSVisitor.visitTableStoreFiles(getFileSystem(), tableDir, new FSVisitor.StoreFileVisitor() {
+ @Override
+ public void storeFile(final String region, final String family, final String hfile)
+ throws IOException {
+ HFileLink link = HFileLink.create(UTIL.getConfiguration(), tableName, region, family, hfile);
+ storeFiles.add(link.getOriginPath());
+ }
+ });
+ assertTrue("expected at least 1 store file", storeFiles.size() > 0);
+ LOG.info("store-files: " + storeFiles);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster(2, 3);
+
+ setupTable(TEST_TABLE.getTableName());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ UTIL.shutdownMiniCluster();
+ } catch (Exception e) {
+ LOG.warn("failure shutting down cluster", e);
+ }
+ }
+
+ @Test(timeout=180000)
+ public void testLosingFileDuringScan() throws Exception {
+ assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
+
+ final FileSystem fs = getFileSystem();
+ final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+ // try to query with the missing file
+ int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
+ private boolean hasFile = true;
+
+ @Override
+ public void beforeScanNext(HTable table) throws Exception {
+ // move the path away (now the region is corrupted)
+ if (hasFile) {
+ fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+ LOG.info("Move file to local");
+ evictHFileCache(storeFiles.get(0));
+ hasFile = false;
+ }
+ }
+ });
+ assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
+ count >= (NUM_ROWS - ROW_PER_FILE));
+ }
+
+ @Test(timeout=180000)
+ public void testLosingFileAfterScannerInit() throws Exception {
+ assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
+
+ final FileSystem fs = getFileSystem();
+ final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+ // try to query with the missing file
+ int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
+ private boolean hasFile = true;
+
+ @Override
+ public void beforeScan(HTable table, Scan scan) throws Exception {
+ // move the path away (now the region is corrupted)
+ if (hasFile) {
+ fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+ LOG.info("Move file to local");
+ evictHFileCache(storeFiles.get(0));
+ hasFile = false;
+ }
+ }
+ });
+ assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
+ count >= (NUM_ROWS - ROW_PER_FILE));
+ }
+
+ // ==========================================================================
+ // Helpers
+ // ==========================================================================
+ private FileSystem getFileSystem() {
+ return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+ }
+
+ private Path getRootDir() {
+ return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+ }
+
+ private void evictHFileCache(final Path hfile) throws Exception {
+ for (RegionServerThread rst: UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+ HRegionServer rs = rst.getRegionServer();
+ rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName());
+ }
+ Thread.sleep(6000);
+ }
+
+ private int fullScanAndCount(final TableName tableName) throws Exception {
+ return fullScanAndCount(tableName, new ScanInjector());
+ }
+
+ private int fullScanAndCount(final TableName tableName, final ScanInjector injector)
+ throws Exception {
+ HTable table = new HTable(UTIL.getConfiguration(), tableName);
+ int count = 0;
+ try {
+ Scan scan = new Scan();
+ scan.setCaching(1);
+ scan.setCacheBlocks(false);
+ injector.beforeScan(table, scan);
+ ResultScanner scanner = table.getScanner(scan);
+ try {
+ while (true) {
+ injector.beforeScanNext(table);
+ Result result = scanner.next();
+ injector.afterScanNext(table, result);
+ if (result == null) break;
+ if ((count++ % (ROW_PER_FILE / 2)) == 0) {
+ LOG.debug("scan next " + count);
+ }
+ }
+ } finally {
+ scanner.close();
+ injector.afterScan(table);
+ }
+ } finally {
+ table.close();
+ }
+ return count;
+ }
+
+ private class ScanInjector {
+ protected void beforeScan(HTable table, Scan scan) throws Exception {}
+ protected void beforeScanNext(HTable table) throws Exception {}
+ protected void afterScanNext(HTable table, Result result) throws Exception {}
+ protected void afterScan(HTable table) throws Exception {}
+ }
+}