You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2022/11/29 12:22:35 UTC
[hbase] branch branch-2.4 updated: HBASE-27484 FNFE on StoreFileScanner after a flush followed by a compaction (#4882)
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 6e562bb84ba HBASE-27484 FNFE on StoreFileScanner after a flush followed by a compaction (#4882)
6e562bb84ba is described below
commit 6e562bb84bae96a757f4b3814592da8156be4955
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Fri Nov 25 21:58:23 2022 +0000
HBASE-27484 FNFE on StoreFileScanner after a flush followed by a compaction (#4882)
Signed-off-by: Peter Somogyi <ps...@apache.org>
---
.../apache/hadoop/hbase/regionserver/HStore.java | 4 +-
.../hadoop/hbase/regionserver/TestHStore.java | 67 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 1 deletion(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index dda745311d1..fc3dfd340ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1273,10 +1273,10 @@ public class HStore
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
includeStartRow, stopRow, includeStopRow);
memStoreScanners = this.memstore.getScanners(readPt);
+ storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.incrementAndGet());
} finally {
this.lock.readLock().unlock();
}
-
try {
// First the store file scanners
@@ -1293,6 +1293,8 @@ public class HStore
} catch (Throwable t) {
clearAndClose(memStoreScanners);
throw t instanceof IOException ? (IOException) t : new IOException(t);
+ } finally {
+ storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.decrementAndGet());
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 62fba4b6c30..d2e9ac944da 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -41,12 +42,14 @@ import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NavigableSet;
+import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -101,7 +104,9 @@ import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
+import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@@ -1027,6 +1032,68 @@ public class TestHStore {
verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
}
+ @Test
+ public void testScanWithCompactionAfterFlush() throws Exception {
+ TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
+ EverythingPolicy.class.getName());
+ init(name.getMethodName());
+
+ assertEquals(0, this.store.getStorefilesCount());
+
+ KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null);
+ // add some data, flush
+ this.store.add(kv, null);
+ flush(1);
+ kv = new KeyValue(row, family, qf2, 1, (byte[]) null);
+ // add some data, flush
+ this.store.add(kv, null);
+ flush(2);
+ kv = new KeyValue(row, family, qf3, 1, (byte[]) null);
+ // add some data, flush
+ this.store.add(kv, null);
+ flush(3);
+
+ ExecutorService service = Executors.newFixedThreadPool(2);
+
+ Scan scan = new Scan(new Get(row));
+ Future<KeyValueScanner> scanFuture = service.submit(() -> {
+ try {
+ LOG.info(">>>> creating scanner");
+ return this.store.createScanner(scan,
+ new ScanInfo(HBaseConfiguration.create(),
+ ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(),
+ Long.MAX_VALUE, 0, CellComparator.getInstance()),
+ scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ });
+ Future compactFuture = service.submit(() -> {
+ try {
+ LOG.info(">>>>>> starting compaction");
+ Optional<CompactionContext> opCompaction = this.store.requestCompaction();
+ assertTrue(opCompaction.isPresent());
+ store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent());
+ LOG.info(">>>>>> Compaction is finished");
+ this.store.closeAndArchiveCompactedFiles();
+ LOG.info(">>>>>> Compacted files deleted");
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ });
+
+ KeyValueScanner kvs = scanFuture.get();
+ compactFuture.get();
+ ((StoreScanner) kvs).currentScanners.forEach(s -> {
+ if (s instanceof StoreFileScanner) {
+ assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount());
+ }
+ });
+ kvs.seek(kv);
+ service.shutdownNow();
+ }
+
private long countMemStoreScanner(StoreScanner scanner) {
if (scanner.currentScanners == null) {
return 0;