You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/12/20 13:00:22 UTC
[2/7] hbase git commit: HBASE-19468 FNFE during scans and flushes
(Ram)
HBASE-19468 FNFE during scans and flushes (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d2534fc5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d2534fc5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d2534fc5
Branch: refs/heads/HBASE-19397
Commit: d2534fc5703c3e1a1f70d77a248eb24dffb67880
Parents: e67a67d
Author: Vasudevan <ra...@intel.com>
Authored: Wed Dec 20 11:09:12 2017 +0530
Committer: Vasudevan <ra...@intel.com>
Committed: Wed Dec 20 11:09:12 2017 +0530
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/StoreScanner.java | 25 ++++++++---
.../TestCompactedHFilesDischarger.java | 46 +++++++++++++++++++-
2 files changed, 63 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d2534fc5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 6abca13..571e2c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -148,7 +148,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Indicates whether there was flush during the course of the scan
private volatile boolean flushed = false;
// generally we get one file from a flush
- private final List<HStoreFile> flushedStoreFiles = new ArrayList<>(1);
+ private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1);
// Since CompactingMemstore is now default, we get three memstore scanners from a flush
private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
// The current list of scanners
@@ -479,6 +479,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if (withDelayedScannersClose) {
clearAndClose(scannersForDelayedClose);
clearAndClose(memStoreScannersAfterFlush);
+ clearAndClose(flushedstoreFileScanners);
if (this.heap != null) {
this.heap.close();
this.currentScanners.clear();
@@ -842,7 +843,17 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
flushLock.lock();
try {
flushed = true;
- flushedStoreFiles.addAll(sfs);
+ final boolean isCompaction = false;
+ boolean usePread = get || scanUsePread;
+ // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner
+ // calls next(). So its better we create scanners here rather than next() call. Ensure
+ // these scanners are properly closed() whether or not the scan is completed successfully
+ // Eagerly creating scanners so that we have the ref counting ticking on the newly created
+ // store files. In case of stream scanners this eager creation does not induce performance
+ // penalty because in scans (that uses stream scanners) the next() call is bound to happen.
+ List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread,
+ isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false);
+ flushedstoreFileScanners.addAll(scanners);
if (!CollectionUtils.isEmpty(memStoreScanners)) {
clearAndClose(memStoreScannersAfterFlush);
memStoreScannersAfterFlush.addAll(memStoreScanners);
@@ -865,13 +876,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
List<KeyValueScanner> scanners;
flushLock.lock();
try {
- List<KeyValueScanner> allScanners = new ArrayList<>(flushedStoreFiles.size() + memStoreScannersAfterFlush.size());
- allScanners.addAll(store.getScanners(flushedStoreFiles, cacheBlocks, get,
- scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false));
+ List<KeyValueScanner> allScanners =
+ new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size());
+ allScanners.addAll(flushedstoreFileScanners);
allScanners.addAll(memStoreScannersAfterFlush);
scanners = selectScannersFrom(store, allScanners);
- // Clear the current set of flushed store files so that they don't get added again
- flushedStoreFiles.clear();
+ // Clear the current set of flushed store files scanners so that they don't get added again
+ flushedstoreFileScanners.clear();
memStoreScannersAfterFlush.clear();
} finally {
flushLock.unlock();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d2534fc5/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
index 1cff934..44e97e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -333,6 +334,49 @@ public class TestCompactedHFilesDischarger {
assertTrue(compactedfiles.isEmpty());
}
+ @Test
+ public void testStoreFileMissing() throws Exception {
+ // Write 3 records and create 3 store files.
+ write("row1");
+ region.flush(true);
+ write("row2");
+ region.flush(true);
+ write("row3");
+ region.flush(true);
+
+ Scan scan = new Scan();
+ scan.setCaching(1);
+ RegionScanner scanner = region.getScanner(scan);
+ List<Cell> res = new ArrayList<Cell>();
+ // Read first item
+ scanner.next(res);
+ assertEquals("row1", Bytes.toString(CellUtil.cloneRow(res.get(0))));
+ res.clear();
+ // Create a new file in between scan nexts
+ write("row4");
+ region.flush(true);
+
+ // Compact the table
+ region.compact(true);
+
+ // Create the cleaner object
+ CompactedHFilesDischarger cleaner =
+ new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
+ cleaner.chore();
+ // This issues scan next
+ scanner.next(res);
+ assertEquals("row2", Bytes.toString(CellUtil.cloneRow(res.get(0))));
+
+ scanner.close();
+ }
+
+ private void write(String row1) throws IOException {
+ byte[] row = Bytes.toBytes(row1);
+ Put put = new Put(row);
+ put.addColumn(fam, qual1, row);
+ region.put(put);
+ }
+
protected void countDown() {
// count down 3 times
latch.countDown();
@@ -366,7 +410,7 @@ public class TestCompactedHFilesDischarger {
try {
initiateScan(region);
} catch (IOException e) {
- // do nothing
+ e.printStackTrace();
}
}