You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/01/01 13:06:09 UTC
[hbase] branch branch-2.5 updated: HBASE-26494 Using RefCnt to fix the flawed MemStoreLABImpl (#3983)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new efeec91 HBASE-26494 Using RefCnt to fix the flawed MemStoreLABImpl (#3983)
efeec91 is described below
commit efeec919ded0626c21abf12136500da8ec933c3d
Author: chenglei <ch...@apache.org>
AuthorDate: Sat Jan 1 20:50:52 2022 +0800
HBASE-26494 Using RefCnt to fix the flawed MemStoreLABImpl (#3983)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../hbase/regionserver/CompactionPipeline.java | 2 +-
.../hbase/regionserver/ImmutableMemStoreLAB.java | 61 +++++++------
.../hadoop/hbase/regionserver/MemStoreLABImpl.java | 31 ++++---
.../hadoop/hbase/regionserver/TestHStore.java | 99 +++++++++++++++++++++-
.../regionserver/TestStoreScannerClosure.java | 2 +-
5 files changed, 154 insertions(+), 41 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 2576f78..eef25f1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -68,7 +68,7 @@ public class CompactionPipeline {
/**
* <pre>
* Version is volatile to ensure it is atomically read when not using a lock.
- * To indicate whether the suffix of pipleline changes:
+ * To indicate whether the suffix of pipeline changes:
* 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only
* added at Head, {@link #version} not change.
* 2.for {@link CompactionPipeline#swap},{@link #version} increase.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
index 0ce52b6..bc4bf78 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
@@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import com.google.errorprone.annotations.RestrictedApi;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -31,13 +33,16 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class ImmutableMemStoreLAB implements MemStoreLAB {
- private final AtomicInteger openScannerCount = new AtomicInteger();
- private volatile boolean closed = false;
+ private final RefCnt refCnt;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
private final List<MemStoreLAB> mslabs;
public ImmutableMemStoreLAB(List<MemStoreLAB> mslabs) {
this.mslabs = mslabs;
+ this.refCnt = RefCnt.create(() -> {
+ closeMSLABs();
+ });
}
@Override
@@ -85,47 +90,43 @@ public class ImmutableMemStoreLAB implements MemStoreLAB {
@Override
public void close() {
- // 'openScannerCount' here tracks the scanners opened on segments which directly refer to this
- // MSLAB. The individual MSLABs this refers also having its own 'openScannerCount'. The usage of
+ // 'refCnt' here tracks the scanners opened on segments which directly refer to this
+ // MSLAB. The individual MSLABs this refers also having its own 'refCnt'. The usage of
// the variable in close() and decScannerCount() is as as that in HeapMemstoreLAB. Here the
// close just delegates the call to the individual MSLABs. The actual return of the chunks to
// MSLABPool will happen within individual MSLABs only (which is at the leaf level).
// Say an ImmutableMemStoreLAB is created over 2 HeapMemStoreLABs at some point and at that time
- // both of them were referred by ongoing scanners. So they have > 0 'openScannerCount'. Now over
- // the new Segment some scanners come in and this MSLABs 'openScannerCount' also goes up and
+ // both of them were referred by ongoing scanners. So they have > 0 'refCnt'. Now over
+ // the new Segment some scanners come in and this MSLABs 'refCnt' also goes up and
// then come down on finish of scanners. Now a close() call comes to this Immutable MSLAB. As
- // it's 'openScannerCount' is zero it will call close() on both of the Heap MSLABs. Say by that
+ // it's 'refCnt' is zero it will call close() on both of the Heap MSLABs. Say by that
// time the old scanners on one of the MSLAB got over where as on the other, still an old
// scanner is going on. The call close() on that MSLAB will not close it immediately but will
- // just mark it for closure as it's 'openScannerCount' still > 0. Later once the old scan is
+ // just decrease it's 'refCnt' and it's 'refCnt' still > 0. Later once the old scan is
// over, the decScannerCount() call will do the actual close and return of the chunks.
- this.closed = true;
+ if (!this.closed.compareAndSet(false, true)) {
+ return;
+ }
// When there are still on going scanners over this MSLAB, we will defer the close until all
- // scanners finish. We will just mark it for closure. See #decScannerCount(). This will be
- // called at end of every scan. When it is marked for closure and scanner count reached 0, we
- // will do the actual close then.
- checkAndCloseMSLABs(openScannerCount.get());
+ // scanners finish. We will just decrease it's 'refCnt'. See #decScannerCount(). This will be
+ // called at end of every scan. When it's 'refCnt' reached 0, we will do the actual close then.
+ this.refCnt.release();
}
- private void checkAndCloseMSLABs(int openScanners) {
- if (openScanners == 0) {
- for (MemStoreLAB mslab : this.mslabs) {
- mslab.close();
- }
+ private void closeMSLABs() {
+ for (MemStoreLAB mslab : this.mslabs) {
+ mslab.close();
}
}
@Override
public void incScannerCount() {
- this.openScannerCount.incrementAndGet();
+ this.refCnt.retain();
}
@Override
public void decScannerCount() {
- int count = this.openScannerCount.decrementAndGet();
- if (this.closed) {
- checkAndCloseMSLABs(count);
- }
+ this.refCnt.release();
}
@Override
@@ -138,5 +139,17 @@ public class ImmutableMemStoreLAB implements MemStoreLAB {
return ChunkCreator.getInstance().isOffheap();
}
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ int getRefCntValue() {
+ return this.refCnt.refCnt();
+ }
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ boolean isClosed() {
+ return this.closed.get();
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index 5bc47dc..33174ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -18,13 +18,13 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import com.google.errorprone.annotations.RestrictedApi;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ByteBufferExtendedCell;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,8 +86,11 @@ public class MemStoreLABImpl implements MemStoreLAB {
// This flag is for reclaiming chunks. Its set when putting chunks back to
// pool
private final AtomicBoolean reclaimed = new AtomicBoolean(false);
- // Current count of open scanners which reading data from this MemStoreLAB
- private final AtomicInteger openScannerCount = new AtomicInteger();
+ /**
+ * Its initial value is 1, so it is one bigger than the current count of open scanners which
+ * reading data from this MemStoreLAB.
+ */
+ private final RefCnt refCnt;
// Used in testing
public MemStoreLABImpl() {
@@ -100,6 +104,9 @@ public class MemStoreLABImpl implements MemStoreLAB {
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
Preconditions.checkArgument(maxAlloc <= dataChunkSize,
MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
+ this.refCnt = RefCnt.create(() -> {
+ recycleChunks();
+ });
// if user requested to work with MSLABs (whether on- or off-heap), then the
// immutable segments are going to use CellChunkMap as their index
@@ -264,14 +271,13 @@ public class MemStoreLABImpl implements MemStoreLAB {
}
// We could put back the chunks to pool for reusing only when there is no
// opening scanner which will read their data
- int count = openScannerCount.get();
- if(count == 0) {
- recycleChunks();
- }
+ this.refCnt.release();
}
- int getOpenScannerCount() {
- return this.openScannerCount.get();
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ int getRefCntValue() {
+ return this.refCnt.refCnt();
}
/**
@@ -279,7 +285,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
*/
@Override
public void incScannerCount() {
- this.openScannerCount.incrementAndGet();
+ this.refCnt.retain();
}
/**
@@ -287,10 +293,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
*/
@Override
public void decScannerCount() {
- int count = this.openScannerCount.decrementAndGet();
- if (this.closed.get() && count == 0) {
- recycleChunks();
- }
+ this.refCnt.release();
}
private void recycleChunks() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 9009cb2..2fdd627 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -2361,7 +2361,7 @@ public class TestHStore {
MemStoreLABImpl memStoreLAB =
(MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
assertTrue(memStoreLAB.isClosed());
- assertTrue(memStoreLAB.getOpenScannerCount() == 0);
+ assertTrue(memStoreLAB.getRefCntValue() == 0);
assertTrue(memStoreLAB.isReclaimed());
assertTrue(memStoreLAB.chunks.isEmpty());
StoreScanner storeScanner = null;
@@ -2436,6 +2436,103 @@ public class TestHStore {
}
}
+ /**
+ * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB}
+ */
+ @Test
+ public void testImmutableMemStoreLABRefCnt() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+
+ byte[] smallValue = new byte[3];
+ byte[] largeValue = new byte[9];
+ final long timestamp = EnvironmentEdgeManager.currentTime();
+ final long seqId = 100;
+ final Cell smallCell1 = createCell(qf1, timestamp, seqId, smallValue);
+ final Cell largeCell1 = createCell(qf2, timestamp, seqId, largeValue);
+ final Cell smallCell2 = createCell(qf3, timestamp, seqId+1, smallValue);
+ final Cell largeCell2 = createCell(qf4, timestamp, seqId+1, largeValue);
+ final Cell smallCell3 = createCell(qf5, timestamp, seqId+2, smallValue);
+ final Cell largeCell3 = createCell(qf6, timestamp, seqId+2, largeValue);
+
+ int smallCellByteSize = MutableSegment.getCellLength(smallCell1);
+ int largeCellByteSize = MutableSegment.getCellLength(largeCell1);
+ int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
+ int flushByteSize = firstWriteCellByteSize - 2;
+
+ // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
+ conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
+ conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
+ conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
+ conf.setBoolean(WALFactory.WAL_ENABLED, false);
+
+ init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
+ .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
+
+ final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore);
+ assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
+ myCompactingMemStore.allowCompaction.set(false);
+
+ NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
+ store.add(smallCell1, memStoreSizing);
+ store.add(largeCell1, memStoreSizing);
+ store.add(smallCell2, memStoreSizing);
+ store.add(largeCell2, memStoreSizing);
+ store.add(smallCell3, memStoreSizing);
+ store.add(largeCell3, memStoreSizing);
+ VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
+ assertTrue(versionedSegmentsList.getNumOfSegments() == 3);
+ List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments();
+ List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size());
+ for (ImmutableSegment segment : segments) {
+ memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB());
+ }
+ List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
+ for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
+ assertTrue(memStoreLAB.getRefCntValue() == 2);
+ }
+
+ myCompactingMemStore.allowCompaction.set(true);
+ myCompactingMemStore.flushInMemory();
+
+ versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
+ assertTrue(versionedSegmentsList.getNumOfSegments() == 1);
+ ImmutableMemStoreLAB immutableMemStoreLAB =
+ (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB());
+ for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
+ assertTrue(memStoreLAB.getRefCntValue() == 2);
+ }
+
+ List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
+ for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
+ assertTrue(memStoreLAB.getRefCntValue() == 2);
+ }
+ assertTrue(immutableMemStoreLAB.getRefCntValue() == 2);
+ for (KeyValueScanner scanner : scanners1) {
+ scanner.close();
+ }
+ for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
+ assertTrue(memStoreLAB.getRefCntValue() == 1);
+ }
+ for (KeyValueScanner scanner : scanners2) {
+ scanner.close();
+ }
+ for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
+ assertTrue(memStoreLAB.getRefCntValue() == 1);
+ }
+ assertTrue(immutableMemStoreLAB.getRefCntValue() == 1);
+ flushStore(store, id++);
+ for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
+ assertTrue(memStoreLAB.getRefCntValue() == 0);
+ }
+ assertTrue(immutableMemStoreLAB.getRefCntValue() == 0);
+ assertTrue(immutableMemStoreLAB.isClosed());
+ for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
+ assertTrue(memStoreLAB.isClosed());
+ assertTrue(memStoreLAB.isReclaimed());
+ assertTrue(memStoreLAB.chunks.isEmpty());
+ }
+ }
+
private HStoreFile mockStoreFileWithLength(long length) {
HStoreFile sf = mock(HStoreFile.class);
StoreFileReader sfr = mock(StoreFileReader.class);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
index 23b1693..f20691b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
@@ -160,7 +160,7 @@ public class TestStoreScannerClosure {
memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB();
if (memStoreLAB != null) {
// There should be no unpooled chunks
- int refCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount();
+ int refCount = ((MemStoreLABImpl) memStoreLAB).getRefCntValue();
assertTrue("The memstore should not have unpooled chunks", refCount == 0);
}
}