You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2022/11/29 12:22:35 UTC

[hbase] branch branch-2.4 updated: HBASE-27484 FNFE on StoreFileScanner after a flush followed by a compaction (#4882)

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 6e562bb84ba HBASE-27484 FNFE on StoreFileScanner after a flush followed by a compaction (#4882)
6e562bb84ba is described below

commit 6e562bb84bae96a757f4b3814592da8156be4955
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Fri Nov 25 21:58:23 2022 +0000

    HBASE-27484 FNFE on StoreFileScanner after a flush followed by a compaction (#4882)
    
    Signed-off-by: Peter Somogyi <ps...@apache.org>
---
 .../apache/hadoop/hbase/regionserver/HStore.java   |  4 +-
 .../hadoop/hbase/regionserver/TestHStore.java      | 67 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 1 deletion(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index dda745311d1..fc3dfd340ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1273,10 +1273,10 @@ public class HStore
       storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
         includeStartRow, stopRow, includeStopRow);
       memStoreScanners = this.memstore.getScanners(readPt);
+      storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.incrementAndGet());
     } finally {
       this.lock.readLock().unlock();
     }
-
     try {
       // First the store file scanners
 
@@ -1293,6 +1293,8 @@ public class HStore
     } catch (Throwable t) {
       clearAndClose(memStoreScanners);
       throw t instanceof IOException ? (IOException) t : new IOException(t);
+    } finally {
+      storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.decrementAndGet());
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 62fba4b6c30..d2e9ac944da 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -41,12 +42,14 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.NavigableSet;
+import java.util.Optional;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -101,7 +104,9 @@ import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
 import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
 import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
+import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy;
 import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@@ -1027,6 +1032,68 @@ public class TestHStore {
     verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
   }
 
+  @Test
+  public void testScanWithCompactionAfterFlush() throws Exception {
+    TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
+      EverythingPolicy.class.getName());
+    init(name.getMethodName());
+
+    assertEquals(0, this.store.getStorefilesCount());
+
+    KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null);
+    // add some data, flush
+    this.store.add(kv, null);
+    flush(1);
+    kv = new KeyValue(row, family, qf2, 1, (byte[]) null);
+    // add some data, flush
+    this.store.add(kv, null);
+    flush(2);
+    kv = new KeyValue(row, family, qf3, 1, (byte[]) null);
+    // add some data, flush
+    this.store.add(kv, null);
+    flush(3);
+
+    ExecutorService service = Executors.newFixedThreadPool(2);
+
+    Scan scan = new Scan(new Get(row));
+    Future<KeyValueScanner> scanFuture = service.submit(() -> {
+      try {
+        LOG.info(">>>> creating scanner");
+        return this.store.createScanner(scan,
+          new ScanInfo(HBaseConfiguration.create(),
+            ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(),
+            Long.MAX_VALUE, 0, CellComparator.getInstance()),
+          scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0);
+      } catch (IOException e) {
+        e.printStackTrace();
+        return null;
+      }
+    });
+    Future compactFuture = service.submit(() -> {
+      try {
+        LOG.info(">>>>>> starting compaction");
+        Optional<CompactionContext> opCompaction = this.store.requestCompaction();
+        assertTrue(opCompaction.isPresent());
+        store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent());
+        LOG.info(">>>>>> Compaction is finished");
+        this.store.closeAndArchiveCompactedFiles();
+        LOG.info(">>>>>> Compacted files deleted");
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    });
+
+    KeyValueScanner kvs = scanFuture.get();
+    compactFuture.get();
+    ((StoreScanner) kvs).currentScanners.forEach(s -> {
+      if (s instanceof StoreFileScanner) {
+        assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount());
+      }
+    });
+    kvs.seek(kv);
+    service.shutdownNow();
+  }
+
   private long countMemStoreScanner(StoreScanner scanner) {
     if (scanner.currentScanners == null) {
       return 0;