You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/08/12 15:30:29 UTC
[hbase] branch branch-2 updated: HBASE-26026 HBase Write may be
stuck forever when using CompactingMemStore (#3421)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 725474b HBASE-26026 HBase Write may be stuck forever when using CompactingMemStore (#3421)
725474b is described below
commit 725474bbe3b21fcee16cda56bcf9ac5268ed2bd7
Author: chenglei <ch...@apache.org>
AuthorDate: Thu Aug 12 23:11:19 2021 +0800
HBASE-26026 HBase Write may be stuck forever when using CompactingMemStore (#3421)
Signed-off-by: Andrew Purtell <ap...@apache.org>
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../hbase/regionserver/AbstractMemStore.java | 6 +-
.../hbase/regionserver/CompactingMemStore.java | 27 +++-
.../hbase/regionserver/CompactionPipeline.java | 10 +-
.../hbase/regionserver/MemStoreCompactor.java | 3 +-
.../hbase/regionserver/TestCompactingMemStore.java | 8 +-
.../hbase/regionserver/TestDefaultMemStore.java | 21 +++-
.../hadoop/hbase/regionserver/TestHStore.java | 140 +++++++++++++++++++++
7 files changed, 202 insertions(+), 13 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index 556cf04..4b923ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -70,7 +70,9 @@ public abstract class AbstractMemStore implements MemStore {
protected static void addToScanners(Segment segment, long readPt,
List<KeyValueScanner> scanners) {
- scanners.add(segment.getScanner(readPt));
+ if (!segment.isEmpty()) {
+ scanners.add(segment.getScanner(readPt));
+ }
}
protected AbstractMemStore(final Configuration conf, final CellComparator c,
@@ -156,7 +158,7 @@ public abstract class AbstractMemStore implements MemStore {
}
}
- private void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
+ protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false);
boolean mslabUsed = (toAdd != cell);
// This cell data is backed by the same byte[] where we read request in RPC(See
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 4ee2db9..973d5a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -453,9 +453,14 @@ public class CompactingMemStore extends AbstractMemStore {
inMemoryCompaction();
}
- private void flushInMemory(MutableSegment currActive) {
+ protected void flushInMemory(MutableSegment currActive) {
LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
- pushActiveToPipeline(currActive);
+ // NOTE: Due to concurrent writes and because we first add cell size to currActive.getDataSize
+ // and then actually add cell to currActive.cellSet, it is possible that
+ // currActive.getDataSize could not accommodate cellToAdd but currActive.cellSet is still
+ // empty if pending writes which not yet add cells to currActive.cellSet.
+ // so here we should not check currActive.isEmpty or not.
+ pushActiveToPipeline(currActive, false);
}
void inMemoryCompaction() {
@@ -524,7 +529,23 @@ public class CompactingMemStore extends AbstractMemStore {
}
protected void pushActiveToPipeline(MutableSegment currActive) {
- if (!currActive.isEmpty()) {
+ pushActiveToPipeline(currActive, true);
+ }
+
+ /**
+ * NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls this method, due to
+ * concurrent writes and because we first add cell size to currActive.getDataSize and then
+ * actually add cell to currActive.cellSet, it is possible that currActive.getDataSize could not
+ * accommodate cellToAdd but currActive.cellSet is still empty if pending writes which not yet add
+ * cells to currActive.cellSet,so for
+ * {@link CompactingMemStore#flushInMemory(MutableSegment)},checkEmpty parameter is false. But if
+ * {@link CompactingMemStore#snapshot} called this method,because there is no pending
+ * write,checkEmpty parameter could be true.
+ * @param currActive
+ * @param checkEmpty
+ */
+ protected void pushActiveToPipeline(MutableSegment currActive, boolean checkEmpty) {
+ if (!checkEmpty || !currActive.isEmpty()) {
pipeline.pushHead(currActive);
resetActive();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 711cfd3..194e065 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -223,10 +223,16 @@ public class CompactionPipeline {
LOG.warn("Segment flattening failed, because versions do not match");
return false;
}
- int i = 0;
+ int i = -1;
for (ImmutableSegment s : pipeline) {
+ i++;
if ( s.canBeFlattened() ) {
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
+ if (s.isEmpty()) {
+ // after s.waitForUpdates() is called, there is no updates preceding,if no cells in s,
+ // we can skip it.
+ continue;
+ }
// size to be updated
MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
@@ -242,9 +248,7 @@ public class CompactionPipeline {
LOG.debug("Compaction pipeline segment {} flattened", s);
return true;
}
- i++;
}
-
}
// do not update the global memstore size counter and do not increase the version,
// because all the cells remain in place
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index 4df9ae0..4a99430 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -208,7 +208,8 @@ public class MemStoreCompactor {
MemStoreSegmentsIterator iterator = null;
List<ImmutableSegment> segments = versionedList.getStoreSegments();
for (ImmutableSegment s : segments) {
- s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
+ s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed.
+ // we skip empty segment when create MemStoreSegmentsIterator following.
}
switch (action) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 38fa587..297f5cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -176,11 +176,15 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
// use case 1: both kvs in kvset
this.memstore.add(kv1.clone(), null);
this.memstore.add(kv2.clone(), null);
- verifyScanAcrossSnapshot2(kv1, kv2);
+ // snapshot is empty,active segment is not empty,
+ // empty segment is skipped.
+ verifyOneScanAcrossSnapshot2(kv1, kv2);
// use case 2: both kvs in snapshot
this.memstore.snapshot();
- verifyScanAcrossSnapshot2(kv1, kv2);
+ // active segment is empty,snapshot is not empty,
+ // empty segment is skipped.
+ verifyOneScanAcrossSnapshot2(kv1, kv2);
// use case 3: first in snapshot second in kvset
this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 973547d..0dc35c5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -255,11 +255,16 @@ public class TestDefaultMemStore {
// use case 1: both kvs in kvset
this.memstore.add(kv1.clone(), null);
this.memstore.add(kv2.clone(), null);
- verifyScanAcrossSnapshot2(kv1, kv2);
+ // snapshot is empty,active segment is not empty,
+ // empty segment is skipped.
+ verifyOneScanAcrossSnapshot2(kv1, kv2);
// use case 2: both kvs in snapshot
+ // active segment is empty,snapshot is not empty,
+ // empty segment is skipped.
this.memstore.snapshot();
- verifyScanAcrossSnapshot2(kv1, kv2);
+ //
+ verifyOneScanAcrossSnapshot2(kv1, kv2);
// use case 3: first in snapshot second in kvset
this.memstore = new DefaultMemStore();
@@ -288,6 +293,18 @@ public class TestDefaultMemStore {
assertNull(scanner1.next());
}
+ protected void verifyOneScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException {
+ List<KeyValueScanner> memstorescanners = this.memstore.getScanners(mvcc.getReadPoint());
+ assertEquals(1, memstorescanners.size());
+ final KeyValueScanner scanner0 = memstorescanners.get(0);
+ scanner0.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
+ Cell n0 = scanner0.next();
+ Cell n1 = scanner0.next();
+ assertTrue(kv1.equals(n0));
+ assertTrue(kv2.equals(n1));
+ assertNull(scanner0.next());
+ }
+
protected void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected)
throws IOException {
scanner.seek(KeyValueUtil.createFirstOnRow(new byte[]{}));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index e024eec..0e0a352 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -43,6 +43,7 @@ import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
@@ -1726,6 +1727,67 @@ public class TestHStore {
assertArrayEquals(table, hFileContext.getTableName());
}
+ @Test
+ public void testCompactingMemStoreStuckBug26026() throws IOException, InterruptedException {
+ Configuration conf = HBaseConfiguration.create();
+
+ byte[] smallValue = new byte[3];
+ byte[] largeValue = new byte[9];
+ final long timestamp = EnvironmentEdgeManager.currentTime();
+ final long seqId = 100;
+ final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
+ final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
+ int smallCellByteSize = MutableSegment.getCellLength(smallCell);
+ int largeCellByteSize = MutableSegment.getCellLength(largeCell);
+ int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
+
+ // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
+ conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
+ conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
+ conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
+
+ init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
+ .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
+
+ MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
+ assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
+ myCompactingMemStore.smallCellPreUpdateCounter.set(0);
+ myCompactingMemStore.smallCellPostUpdateCounter.set(0);
+ myCompactingMemStore.largeCellPreUpdateCounter.set(0);
+ myCompactingMemStore.largeCellPostUpdateCounter.set(0);
+
+ Thread smallCellThread = new Thread(() -> {
+ store.add(smallCell, new NonThreadSafeMemStoreSizing());
+ });
+ smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
+ smallCellThread.start();
+
+ String oldThreadName = Thread.currentThread().getName();
+ try {
+ /**
+ * 1.smallCellThread enters CompactingMemStore.shouldFlushInMemory first, when largeCellThread
+ * enters CompactingMemStore.shouldFlushInMemory, CompactingMemStore.active.getDataSize could
+ * not accommodate cellToAdd and CompactingMemStore.shouldFlushInMemory return true.
+ * <p/>
+ * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
+ * can add cell to currentActive . That is to say when largeCellThread called flushInMemory
+ * method, CompactingMemStore.active has no cell.
+ */
+ Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
+ store.add(largeCell, new NonThreadSafeMemStoreSizing());
+ smallCellThread.join();
+
+ for (int i = 0; i < 100; i++) {
+ long currentTimestamp = timestamp + 100 + i;
+ Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
+ store.add(cell, new NonThreadSafeMemStoreSizing());
+ }
+ } finally {
+ Thread.currentThread().setName(oldThreadName);
+ }
+
+ }
+
private HStoreFile mockStoreFileWithLength(long length) {
HStoreFile sf = mock(HStoreFile.class);
StoreFileReader sfr = mock(StoreFileReader.class);
@@ -1925,4 +1987,82 @@ public class TestHStore {
@Override
public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
}
+
+ public static class MyCompactingMemStore2 extends CompactingMemStore {
+ private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
+ private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
+ private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
+ private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
+ private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
+ private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
+ private final AtomicInteger largeCellPostUpdateCounter = new AtomicInteger(0);
+ private final AtomicInteger smallCellPostUpdateCounter = new AtomicInteger(0);
+
+ public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
+ HStore store, RegionServicesForStores regionServices,
+ MemoryCompactionPolicy compactionPolicy) throws IOException {
+ super(conf, cellComparator, store, regionServices, compactionPolicy);
+ }
+
+ protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd,
+ MemStoreSizing memstoreSizing) {
+ if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
+ int currentCount = largeCellPreUpdateCounter.incrementAndGet();
+ if (currentCount <= 1) {
+ try {
+ /**
+ * smallCellThread enters super.shouldFlushInMemory first, when largeCellThread enters
+ * super.shouldFlushInMemory, currActive.getDataSize could not accommodate cellToAdd and
+ * super.shouldFlushInMemory return true.
+ */
+ preCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ boolean returnValue = super.shouldFlushInMemory(currActive, cellToAdd, memstoreSizing);
+ if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
+ try {
+ preCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return returnValue;
+ }
+
+ @Override
+ protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
+ if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
+ try {
+ /**
+ * After largeCellThread finished flushInMemory method, smallCellThread can add cell to
+ * currentActive . That is to say when largeCellThread called flushInMemory method,
+ * currentActive has no cell.
+ */
+ postCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ super.doAdd(currentActive, cell, memstoreSizing);
+ }
+
+ @Override
+ protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
+ super.flushInMemory(currentActiveMutableSegment);
+ if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
+ if (largeCellPreUpdateCounter.get() <= 1) {
+ try {
+ postCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ }
}