You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/12/20 13:00:22 UTC

[2/7] hbase git commit: HBASE-19468 FNFE during scans and flushes (Ram)

HBASE-19468 FNFE during scans and flushes (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d2534fc5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d2534fc5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d2534fc5

Branch: refs/heads/HBASE-19397
Commit: d2534fc5703c3e1a1f70d77a248eb24dffb67880
Parents: e67a67d
Author: Vasudevan <ra...@intel.com>
Authored: Wed Dec 20 11:09:12 2017 +0530
Committer: Vasudevan <ra...@intel.com>
Committed: Wed Dec 20 11:09:12 2017 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/StoreScanner.java | 25 ++++++++---
 .../TestCompactedHFilesDischarger.java          | 46 +++++++++++++++++++-
 2 files changed, 63 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d2534fc5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 6abca13..571e2c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -148,7 +148,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   // Indicates whether there was flush during the course of the scan
   private volatile boolean flushed = false;
   // generally we get one file from a flush
-  private final List<HStoreFile> flushedStoreFiles = new ArrayList<>(1);
+  private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1);
   // Since CompactingMemstore is now default, we get three memstore scanners from a flush
   private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
   // The current list of scanners
@@ -479,6 +479,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     if (withDelayedScannersClose) {
       clearAndClose(scannersForDelayedClose);
       clearAndClose(memStoreScannersAfterFlush);
+      clearAndClose(flushedstoreFileScanners);
       if (this.heap != null) {
         this.heap.close();
         this.currentScanners.clear();
@@ -842,7 +843,17 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     flushLock.lock();
     try {
       flushed = true;
-      flushedStoreFiles.addAll(sfs);
+      final boolean isCompaction = false;
+      boolean usePread = get || scanUsePread;
+      // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner
+      // calls next(). So its better we create scanners here rather than next() call. Ensure
+      // these scanners are properly closed() whether or not the scan is completed successfully
+      // Eagerly creating scanners so that we have the ref counting ticking on the newly created
+      // store files. In case of stream scanners this eager creation does not induce performance
+      // penalty because in scans (that uses stream scanners) the next() call is bound to happen.
+      List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread,
+        isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false);
+      flushedstoreFileScanners.addAll(scanners);
       if (!CollectionUtils.isEmpty(memStoreScanners)) {
         clearAndClose(memStoreScannersAfterFlush);
         memStoreScannersAfterFlush.addAll(memStoreScanners);
@@ -865,13 +876,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     List<KeyValueScanner> scanners;
     flushLock.lock();
     try {
-      List<KeyValueScanner> allScanners = new ArrayList<>(flushedStoreFiles.size() + memStoreScannersAfterFlush.size());
-      allScanners.addAll(store.getScanners(flushedStoreFiles, cacheBlocks, get,
-        scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false));
+      List<KeyValueScanner> allScanners =
+          new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size());
+      allScanners.addAll(flushedstoreFileScanners);
       allScanners.addAll(memStoreScannersAfterFlush);
       scanners = selectScannersFrom(store, allScanners);
-      // Clear the current set of flushed store files so that they don't get added again
-      flushedStoreFiles.clear();
+      // Clear the current set of flushed store files scanners so that they don't get added again
+      flushedstoreFileScanners.clear();
       memStoreScannersAfterFlush.clear();
     } finally {
       flushLock.unlock();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d2534fc5/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
index 1cff934..44e97e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -333,6 +334,49 @@ public class TestCompactedHFilesDischarger {
     assertTrue(compactedfiles.isEmpty());
   }
 
+  @Test
+  public void testStoreFileMissing() throws Exception {
+    // Write 3 records and create 3 store files.
+    write("row1");
+    region.flush(true);
+    write("row2");
+    region.flush(true);
+    write("row3");
+    region.flush(true);
+
+    Scan scan = new Scan();
+    scan.setCaching(1);
+    RegionScanner scanner = region.getScanner(scan);
+    List<Cell> res = new ArrayList<Cell>();
+    // Read first item
+    scanner.next(res);
+    assertEquals("row1", Bytes.toString(CellUtil.cloneRow(res.get(0))));
+    res.clear();
+    // Create a new file in between scan nexts
+    write("row4");
+    region.flush(true);
+
+    // Compact the table
+    region.compact(true);
+
+    // Create the cleaner object
+    CompactedHFilesDischarger cleaner =
+        new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
+    cleaner.chore();
+    // This issues scan next
+    scanner.next(res);
+    assertEquals("row2", Bytes.toString(CellUtil.cloneRow(res.get(0))));
+
+    scanner.close();
+  }
+
+  private void write(String row1) throws IOException {
+    byte[] row = Bytes.toBytes(row1);
+    Put put = new Put(row);
+    put.addColumn(fam, qual1, row);
+    region.put(put);
+  }
+
   protected void countDown() {
     // count down 3 times
     latch.countDown();
@@ -366,7 +410,7 @@ public class TestCompactedHFilesDischarger {
       try {
         initiateScan(region);
       } catch (IOException e) {
-        // do nothing
+        e.printStackTrace();
       }
     }