You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/25 13:40:10 UTC
[hbase] branch branch-2 updated: HBASE-26465 MemStoreLAB may be released early when its SegmentScanner is scanning (#3859)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 5101f37 HBASE-26465 MemStoreLAB may be released early when its SegmentScanner is scanning (#3859)
5101f37 is described below
commit 5101f372e46cca30e64e55b99eb0b5e394409c98
Author: chenglei <ch...@apache.org>
AuthorDate: Thu Nov 25 20:32:28 2021 +0800
HBASE-26465 MemStoreLAB may be released early when its SegmentScanner is scanning (#3859)
Signed-off-by: Duo Zhang <zh...@apache.org>
Reviewed-by: Anoop Sam John <an...@apache.org>
---
.../hbase/regionserver/AbstractMemStore.java | 6 +
.../hbase/regionserver/CompactingMemStore.java | 3 +
.../hadoop/hbase/regionserver/DefaultMemStore.java | 9 +-
.../apache/hadoop/hbase/regionserver/HStore.java | 23 ++-
.../hadoop/hbase/regionserver/MemStoreLABImpl.java | 23 ++-
.../hadoop/hbase/regionserver/TestHStore.java | 225 +++++++++++++++++++++
.../hadoop/hbase/regionserver/TestMemStoreLAB.java | 5 +-
.../regionserver/TestStoreScannerClosure.java | 4 +-
8 files changed, 278 insertions(+), 20 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index 4b923ff..56dab21 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -232,6 +232,8 @@ public abstract class AbstractMemStore implements MemStore {
}
/**
+ * This method is protected under {@link HStore#lock} write lock,<br/>
+ * and this method is used by {@link HStore#updateStorefiles} after flushing is completed.<br/>
* The passed snapshot was successfully persisted; it can be let go.
* @param id Id of the snapshot to clean out.
* @see MemStore#snapshot()
@@ -245,6 +247,10 @@ public abstract class AbstractMemStore implements MemStore {
}
// OK. Passed in snapshot is same as current snapshot. If not-empty,
// create a new snapshot and let the old one go.
+ doClearSnapShot();
+ }
+
+ protected void doClearSnapShot() {
Segment oldSnapshot = this.snapshot;
if (!this.snapshot.isEmpty()) {
this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index a9683ac..eac5fc1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -394,6 +394,9 @@ public class CompactingMemStore extends AbstractMemStore {
return Bytes.toString(getFamilyNameInBytes());
}
+ /**
+ * This method is protected under {@link HStore#lock} read lock.
+ */
@Override
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
MutableSegment activeTmp = getActive();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index e38c5a3..76bef7e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -132,16 +132,21 @@ public class DefaultMemStore extends AbstractMemStore {
}
@Override
- /*
+ /**
+ * This method is protected under {@link HStore#lock} read lock. <br/>
* Scanners are ordered from 0 (oldest) to newest in increasing order.
*/
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
List<KeyValueScanner> list = new ArrayList<>();
addToScanners(getActive(), readPt, list);
- addToScanners(snapshot.getAllSegments(), readPt, list);
+ addToScanners(getSnapshotSegments(), readPt, list);
return list;
}
+ protected List<Segment> getSnapshotSegments() {
+ return snapshot.getAllSegments();
+ }
+
@Override
protected List<Segment> getSegments() throws IOException {
List<Segment> list = new ArrayList<>(2);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 7aa55e0..df49e39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -150,8 +150,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000;
private static final Logger LOG = LoggerFactory.getLogger(HStore.class);
-
- protected final MemStore memstore;
+ /**
+ * TODO:After making the {@link DefaultMemStore} extensible in {@link HStore} by HBASE-26476,we
+ * change it back to final.
+ */
+ protected MemStore memstore;
// This stores directory in the filesystem.
private final HRegion region;
protected Configuration conf;
@@ -1222,6 +1225,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
this.lock.writeLock().lock();
try {
this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
+ /**
+ * NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may
+ * close {@link DefaultMemStore#snapshot}, which may be used by
+ * {@link DefaultMemStore#getScanners}.
+ */
+ if (snapshotId > 0) {
+ this.memstore.clearSnapshot(snapshotId);
+ }
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
@@ -1230,13 +1241,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
// the lock.
this.lock.writeLock().unlock();
}
- // We do not need to call clearSnapshot method inside the write lock.
- // The clearSnapshot itself is thread safe, which can be called at the same time with other
- // memstore operations expect snapshot and clearSnapshot. And for these two methods, in HRegion
- // we can guarantee that there is only one onging flush, so they will be no race.
- if (snapshotId > 0) {
- this.memstore.clearSnapshot(snapshotId);
- }
+
// notify to be called here - only in case of flushes
notifyChangedReadersObservers(sfs);
if (LOG.isTraceEnabled()) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index 0d13e49..5bc47dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -81,10 +81,10 @@ public class MemStoreLABImpl implements MemStoreLAB {
// This flag is for closing this instance, its set when clearing snapshot of
// memstore
- private volatile boolean closed = false;
+ private final AtomicBoolean closed = new AtomicBoolean(false);;
// This flag is for reclaiming chunks. Its set when putting chunks back to
// pool
- private AtomicBoolean reclaimed = new AtomicBoolean(false);
+ private final AtomicBoolean reclaimed = new AtomicBoolean(false);
// Current count of open scanners which reading data from this MemStoreLAB
private final AtomicInteger openScannerCount = new AtomicInteger();
@@ -259,7 +259,9 @@ public class MemStoreLABImpl implements MemStoreLAB {
*/
@Override
public void close() {
- this.closed = true;
+ if (!this.closed.compareAndSet(false, true)) {
+ return;
+ }
// We could put back the chunks to pool for reusing only when there is no
// opening scanner which will read their data
int count = openScannerCount.get();
@@ -286,7 +288,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
@Override
public void decScannerCount() {
int count = this.openScannerCount.decrementAndGet();
- if (this.closed && count == 0) {
+ if (this.closed.get() && count == 0) {
recycleChunks();
}
}
@@ -294,6 +296,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
private void recycleChunks() {
if (reclaimed.compareAndSet(false, true)) {
chunkCreator.putbackChunks(chunks);
+ chunks.clear();
}
}
@@ -409,13 +412,21 @@ public class MemStoreLABImpl implements MemStoreLAB {
return pooledChunks;
}
- Integer getNumOfChunksReturnedToPool() {
+ Integer getNumOfChunksReturnedToPool(Set<Integer> chunksId) {
int i = 0;
- for (Integer id : this.chunks) {
+ for (Integer id : chunksId) {
if (chunkCreator.isChunkInPool(id)) {
i++;
}
}
return i;
}
+
+ boolean isReclaimed() {
+ return reclaimed.get();
+ }
+
+ boolean isClosed() {
+ return closed.get();
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 1a0291a..ad40a98 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -2160,6 +2161,122 @@ public class TestHStore {
}
}
+ /**
+ * <pre>
+ * This test is for HBASE-26465,
+ * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute
+ * concurrently. The threads sequence before HBASE-26465 is:
+ * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to
+ * {@link DefaultMemStore}.
+ * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in
+ * {@link HStore#updateStorefiles} after completed flushing memStore to hfile.
+ * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in
+ * {@link DefaultMemStore#getScanners},here the scan thread gets the
+ * {@link DefaultMemStore#snapshot} which is created by the flush thread.
+ * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close
+ * {@link DefaultMemStore#snapshot},because the reference count of the corresponding
+ * {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl}
+ * are recycled.
+ * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a
+ * {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the
+ * reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in
+ * corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may
+ * be overwritten by other write threads,which may cause serious problem.
+ * After HBASE-26465,{@link DefaultMemStore#getScanners} and
+ * {@link DefaultMemStore#clearSnapshot} could not execute concurrently.
+ * </pre>
+ */
+ @Test
+ public void testClearSnapshotGetScannerConcurrently() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+
+ byte[] smallValue = new byte[3];
+ byte[] largeValue = new byte[9];
+ final long timestamp = EnvironmentEdgeManager.currentTime();
+ final long seqId = 100;
+ final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
+ final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
+ TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ quals.add(qf1);
+ quals.add(qf2);
+
+ conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName());
+ conf.setBoolean(WALFactory.WAL_ENABLED, false);
+
+ init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
+ MyDefaultMemStore myDefaultMemStore = new MyDefaultMemStore(store.conf, store.getComparator(),
+ store.getHRegion().getRegionServicesForStores());
+ store.memstore = myDefaultMemStore;
+ myDefaultMemStore.store = store;
+
+ MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
+ store.add(smallCell, memStoreSizing);
+ store.add(largeCell, memStoreSizing);
+
+ final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
+ final Thread flushThread = new Thread(() -> {
+ try {
+ flushStore(store, id++);
+ } catch (Throwable exception) {
+ exceptionRef.set(exception);
+ }
+ });
+ flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME);
+ flushThread.start();
+
+ String oldThreadName = Thread.currentThread().getName();
+ StoreScanner storeScanner = null;
+ try {
+ Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME);
+
+ /**
+ * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot}
+ */
+ myDefaultMemStore.getScannerCyclicBarrier.await();
+
+ storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
+ flushThread.join();
+
+ if (myDefaultMemStore.shouldWait) {
+ SegmentScanner segmentScanner = getSegmentScanner(storeScanner);
+ MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
+ assertTrue(memStoreLAB.isClosed());
+ assertTrue(!memStoreLAB.chunks.isEmpty());
+ assertTrue(!memStoreLAB.isReclaimed());
+
+ Cell cell1 = segmentScanner.next();
+ CellUtil.equals(smallCell, cell1);
+ Cell cell2 = segmentScanner.next();
+ CellUtil.equals(largeCell, cell2);
+ assertNull(segmentScanner.next());
+ } else {
+ List<Cell> results = new ArrayList<>();
+ storeScanner.next(results);
+ assertEquals(2, results.size());
+ CellUtil.equals(smallCell, results.get(0));
+ CellUtil.equals(largeCell, results.get(1));
+ }
+ assertTrue(exceptionRef.get() == null);
+ } finally {
+ if (storeScanner != null) {
+ storeScanner.close();
+ }
+ Thread.currentThread().setName(oldThreadName);
+ }
+ }
+
+ private SegmentScanner getSegmentScanner(StoreScanner storeScanner) {
+ List<SegmentScanner> segmentScanners = new ArrayList<SegmentScanner>();
+ for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
+ if (keyValueScanner instanceof SegmentScanner) {
+ segmentScanners.add((SegmentScanner) keyValueScanner);
+ }
+ }
+
+ assertTrue(segmentScanners.size() == 1);
+ return segmentScanners.get(0);
+ }
+
@Test
public void testOnConfigurationChange() throws IOException {
final int COMMON_MAX_FILES_TO_COMPACT = 10;
@@ -2861,4 +2978,112 @@ public class TestHStore {
}
}
}
+
+ public static class MyDefaultMemStore extends DefaultMemStore {
+ private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread";
+ private static final String FLUSH_THREAD_NAME = "flushMyThread";
+ /**
+ * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread
+ * could start.
+ */
+ private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2);
+ /**
+ * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments}
+ * completed, {@link DefaultMemStore#doClearSnapShot} could continue.
+ */
+ private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2);
+ /**
+ * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot}
+ * completed, {@link DefaultMemStore#getScanners} could continue.
+ */
+ private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2);
+ private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0);
+ private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0);
+ private volatile boolean shouldWait = true;
+ private volatile HStore store = null;
+
+ public MyDefaultMemStore(Configuration conf, CellComparator cellComparator,
+ RegionServicesForStores regionServices)
+ throws IOException {
+ super(conf, cellComparator, regionServices);
+ }
+
+ @Override
+ protected List<Segment> getSnapshotSegments() {
+
+ List<Segment> result = super.getSnapshotSegments();
+
+ if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) {
+ int currentCount = getSnapshotSegmentsCounter.incrementAndGet();
+ if (currentCount == 1) {
+ if (this.shouldWait) {
+ try {
+ /**
+ * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed,
+ * {@link DefaultMemStore#doClearSnapShot} could continue.
+ */
+ preClearSnapShotCyclicBarrier.await();
+ /**
+ * Wait for {@link DefaultMemStore#doClearSnapShot} completed.
+ */
+ postClearSnapShotCyclicBarrier.await();
+
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+
+ @Override
+ protected void doClearSnapShot() {
+ if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
+ int currentCount = clearSnapshotCounter.incrementAndGet();
+ if (currentCount == 1) {
+ try {
+ if (store.lock.isWriteLockedByCurrentThread()) {
+ shouldWait = false;
+ }
+ /**
+ * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner
+ * thread could start.
+ */
+ getScannerCyclicBarrier.await();
+
+ if (shouldWait) {
+ /**
+ * Wait for {@link DefaultMemStore#getSnapshotSegments} completed.
+ */
+ preClearSnapShotCyclicBarrier.await();
+ }
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ super.doClearSnapShot();
+
+ if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
+ int currentCount = clearSnapshotCounter.get();
+ if (currentCount == 1) {
+ if (shouldWait) {
+ try {
+ /**
+ * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed,
+ * {@link DefaultMemStore#getScanners} could continue.
+ */
+ postClearSnapShotCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ }
+
+
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
index d9b1035..0c1bad3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
@@ -24,9 +24,11 @@ import static org.junit.Assert.assertTrue;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ByteBufferKeyValue;
@@ -256,11 +258,12 @@ public class TestMemStoreLAB {
// none of the chunkIds would have been returned back
assertTrue("All the chunks must have been cleared",
ChunkCreator.instance.numberOfMappedChunks() != 0);
+ Set<Integer> chunkIds = new HashSet<Integer>(mslab.chunks);
int pooledChunksNum = mslab.getPooledChunks().size();
// close the mslab
mslab.close();
// make sure all chunks where reclaimed back to pool
- int queueLength = mslab.getNumOfChunksReturnedToPool();
+ int queueLength = mslab.getNumOfChunksReturnedToPool(chunkIds);
assertTrue("All chunks in chunk queue should be reclaimed or removed"
+ " after mslab closed but actually: " + (pooledChunksNum-queueLength),
pooledChunksNum-queueLength == 0);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
index 46e5b06..23b1693 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
@@ -160,8 +160,8 @@ public class TestStoreScannerClosure {
memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB();
if (memStoreLAB != null) {
// There should be no unpooled chunks
- int openScannerCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount();
- assertTrue("The memstore should not have unpooled chunks", openScannerCount == 0);
+ int refCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount();
+ assertTrue("The memstore should not have unpooled chunks", refCount == 0);
}
}
}