You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/01 15:05:55 UTC
[hbase] branch master updated: HBASE-26384 Segment already flushed
to hfile may still be remained in CompactingMemStore (#3777)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 558ab92 HBASE-26384 Segment already flushed to hfile may still be remained in CompactingMemStore (#3777)
558ab92 is described below
commit 558ab925ed2ba567c3b533516799ddbaf2471516
Author: chenglei <ch...@apache.org>
AuthorDate: Mon Nov 1 23:05:15 2021 +0800
HBASE-26384 Segment already flushed to hfile may still be remained in CompactingMemStore (#3777)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../hbase/regionserver/CompactingMemStore.java | 8 +-
.../hbase/regionserver/CompactionPipeline.java | 56 ++-
.../hadoop/hbase/regionserver/TestHStore.java | 509 +++++++++++++++++++++
3 files changed, 568 insertions(+), 5 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 5da0de9..a9683ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -567,12 +567,12 @@ public class CompactingMemStore extends AbstractMemStore {
boolean done = false;
while (!done) {
iterationsCnt++;
- VersionedSegmentsList segments = pipeline.getVersionedList();
+ VersionedSegmentsList segments = getImmutableSegments();
pushToSnapshot(segments.getStoreSegments());
// swap can return false in case the pipeline was updated by ongoing compaction
// and the version increase, the chance of it happenning is very low
// In Swap: don't close segments (they are in snapshot now) and don't update the region size
- done = pipeline.swap(segments, null, false, false);
+ done = swapPipelineWithNull(segments);
if (iterationsCnt>2) {
// practically it is impossible that this loop iterates more than two times
// (because the compaction is stopped and none restarts it while in snapshot request),
@@ -585,6 +585,10 @@ public class CompactingMemStore extends AbstractMemStore {
}
}
+ protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
+ return pipeline.swap(segments, null, false, false);
+ }
+
private void pushToSnapshot(List<ImmutableSegment> segments) {
if(segments.isEmpty()) return;
if(segments.size() == 1 && !segments.get(0).isEmpty()) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 9655297..5f52b95 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.ListIterator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -64,7 +65,16 @@ public class CompactionPipeline {
private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
// The list is volatile to avoid reading a new allocated reference before the c'tor is executed
private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
- // Version is volatile to ensure it is atomically read when not using a lock
+ /**
+ * <pre>
+ * Version is volatile to ensure it is atomically read when not using a lock.
+ * To indicate whether the suffix of pipleline changes:
+ * 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only
+ * added at Head, {@link #version} not change.
+ * 2.for {@link CompactionPipeline#swap},{@link #version} increase.
+ * 3.for {@link CompactionPipeline#replaceAtIndex},{@link #version} increase.
+ * </pre>
+ */
private volatile long version = 0;
public CompactionPipeline(RegionServicesForStores region) {
@@ -290,10 +300,15 @@ public class CompactionPipeline {
return memStoreSizing.getMemStoreSize();
}
+ /**
+ * Must be called under the {@link CompactionPipeline#pipeline} Lock.
+ */
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
boolean closeSegmentsInSuffix) {
- pipeline.removeAll(suffix);
- if(segment != null) pipeline.addLast(segment);
+ matchAndRemoveSuffixFromPipeline(suffix);
+ if (segment != null) {
+ pipeline.addLast(segment);
+ }
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
@@ -307,6 +322,41 @@ public class CompactionPipeline {
}
}
+ /**
+ * Checking that the {@link Segment}s in suffix input parameter is same as the {@link Segment}s in
+ * {@link CompactionPipeline#pipeline} one by one from the last element to the first element of
+ * suffix. If matched, remove suffix from {@link CompactionPipeline#pipeline}. <br/>
+ * Must be called under the {@link CompactionPipeline#pipeline} Lock.
+ */
+ private void matchAndRemoveSuffixFromPipeline(List<? extends Segment> suffix) {
+ if (suffix.isEmpty()) {
+ return;
+ }
+ if (pipeline.size() < suffix.size()) {
+ throw new IllegalStateException(
+ "CODE-BUG:pipleine size:[" + pipeline.size() + "],suffix size:[" + suffix.size()
+ + "],pipeline size must greater than or equals suffix size");
+ }
+
+ ListIterator<? extends Segment> suffixIterator = suffix.listIterator(suffix.size());
+ ListIterator<? extends Segment> pipelineIterator = pipeline.listIterator(pipeline.size());
+ int count = 0;
+ while (suffixIterator.hasPrevious()) {
+ Segment suffixSegment = suffixIterator.previous();
+ Segment pipelineSegment = pipelineIterator.previous();
+ if (suffixSegment != pipelineSegment) {
+ throw new IllegalStateException("CODE-BUG:suffix last:[" + count + "]" + suffixSegment
+ + " is not pipleline segment:[" + pipelineSegment + "]");
+ }
+ count++;
+ }
+
+ for (int index = 1; index <= count; index++) {
+ pipeline.pollLast();
+ }
+
+ }
+
// replacing one segment in the pipeline with a new one exactly at the same index
// need to be called only within synchronized block
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index b64f521..6618742 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -96,6 +96,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
+import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
@@ -1921,6 +1922,225 @@ public class TestHStore {
}
}
+ /**
+ * <pre>
+ * This test is for HBASE-26384,
+ * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
+ * execute concurrently.
+ * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
+ * for both branch-2 and master):
+ * 1. The {@link CompactingMemStore} size exceeds
+ * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
+ * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a
+ * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
+ * 2. The in memory compact thread starts and then stopping before
+ * {@link CompactingMemStore#flattenOneSegment}.
+ * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
+ * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
+ * compact thread continues.
+ * Assuming {@link VersionedSegmentsList#version} returned from
+ * {@link CompactingMemStore#getImmutableSegments} is v.
+ * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
+ * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
+ * {@link CompactionPipeline#version} is still v.
+ * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
+ * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
+ * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
+ * {@link CompactionPipeline} has changed because
+ * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
+ * removed in fact and still remaining in {@link CompactionPipeline}.
+ *
+ * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
+ * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
+ * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
+ * v+1.
+ * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
+ * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
+ * failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
+ * again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
+ * {@link CompactingMemStore#swapPipelineWithNull} succeeds.
+ * </pre>
+ */
+ @Test
+ public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+
+ byte[] smallValue = new byte[3];
+ byte[] largeValue = new byte[9];
+ final long timestamp = EnvironmentEdgeManager.currentTime();
+ final long seqId = 100;
+ final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
+ final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
+ int smallCellByteSize = MutableSegment.getCellLength(smallCell);
+ int largeCellByteSize = MutableSegment.getCellLength(largeCell);
+ int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
+ int flushByteSize = totalCellByteSize - 2;
+
+ // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
+ conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
+ conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
+ conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
+
+ init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
+ .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
+
+ MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
+ assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
+
+ store.add(smallCell, new NonThreadSafeMemStoreSizing());
+ store.add(largeCell, new NonThreadSafeMemStoreSizing());
+
+ String oldThreadName = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
+ /**
+ * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
+ * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
+ * would invoke {@link CompactingMemStore#stopCompaction}.
+ */
+ myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
+
+ MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
+ myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
+
+ assertTrue(memStoreSnapshot.getCellsCount() == 2);
+ assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
+ VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
+ assertTrue(segments.getNumOfSegments() == 0);
+ assertTrue(segments.getNumOfCells() == 0);
+ assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
+ assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
+ } finally {
+ Thread.currentThread().setName(oldThreadName);
+ }
+ }
+
+ /**
+ * <pre>
+ * This test is for HBASE-26384,
+ * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
+ * and writeMemStore execute concurrently.
+ * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
+ * for both branch-2 and master):
+ * 1. The {@link CompactingMemStore} size exceeds
+ * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
+ * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a
+ * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
+ * 2. The in memory compact thread starts and then stopping before
+ * {@link CompactingMemStore#flattenOneSegment}.
+ * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
+ * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
+ * compact thread continues.
+ * Assuming {@link VersionedSegmentsList#version} returned from
+ * {@link CompactingMemStore#getImmutableSegments} is v.
+ * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
+ * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
+ * {@link CompactionPipeline#version} is still v.
+ * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
+ * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
+ * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
+ * {@link CompactionPipeline} has changed because
+ * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
+ * removed in fact and still remaining in {@link CompactionPipeline}.
+ *
+ * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
+ * and I add step 7-8 to test there is new segment added before retry.
+ * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
+ * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
+ * v+1.
+ * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
+ * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
+ * failed and retry,{@link VersionedSegmentsList#version} returned from
+ * {@link CompactingMemStore#getImmutableSegments} is v+1.
+ * 7. The write thread continues writing to {@link CompactingMemStore} and
+ * {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
+ * {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
+ * {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
+ * {@link CompactionPipeline#version} is still v+1.
+ * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
+ * {@link CompactionPipeline#version} is still v+1,
+ * {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
+ * remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
+ * {@link CompactingMemStore#swapPipelineWithNull}.
+ * </pre>
+ */
+ @Test
+ public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+
+ byte[] smallValue = new byte[3];
+ byte[] largeValue = new byte[9];
+ final long timestamp = EnvironmentEdgeManager.currentTime();
+ final long seqId = 100;
+ final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
+ final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
+ int smallCellByteSize = MutableSegment.getCellLength(smallCell);
+ int largeCellByteSize = MutableSegment.getCellLength(largeCell);
+ int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
+ int flushByteSize = firstWriteCellByteSize - 2;
+
+ // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
+ conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
+ conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
+ conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
+
+ init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
+ .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
+
+ final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
+ assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
+
+ store.add(smallCell, new NonThreadSafeMemStoreSizing());
+ store.add(largeCell, new NonThreadSafeMemStoreSizing());
+
+ final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
+ final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
+ final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
+ final int writeAgainCellByteSize = MutableSegment.getCellLength(writeAgainCell1)
+ + MutableSegment.getCellLength(writeAgainCell2);
+ final Thread writeAgainThread = new Thread(() -> {
+ try {
+ myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
+
+ store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
+ store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
+
+ myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
+ } catch (Throwable exception) {
+ exceptionRef.set(exception);
+ }
+ });
+ writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
+ writeAgainThread.start();
+
+ String oldThreadName = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
+ /**
+ * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
+ * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
+ * would invoke {@link CompactingMemStore#stopCompaction}.
+ */
+ myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
+ MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
+ myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
+ writeAgainThread.join();
+
+ assertTrue(memStoreSnapshot.getCellsCount() == 2);
+ assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
+ VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
+ assertTrue(segments.getNumOfSegments() == 1);
+ assertTrue(
+ ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
+ assertTrue(segments.getNumOfCells() == 2);
+ assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
+ assertTrue(exceptionRef.get() == null);
+ assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
+ } finally {
+ Thread.currentThread().setName(oldThreadName);
+ }
+ }
+
private HStoreFile mockStoreFileWithLength(long length) {
HStoreFile sf = mock(HStoreFile.class);
StoreFileReader sfr = mock(StoreFileReader.class);
@@ -2286,4 +2506,293 @@ public class TestHStore {
}
}
+
+ public static class MyCompactingMemStore4 extends CompactingMemStore {
+ private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
+ /**
+ * {@link CompactingMemStore#flattenOneSegment} must execute after
+ * {@link CompactingMemStore#getImmutableSegments}
+ */
+ private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
+ /**
+ * Only after {@link CompactingMemStore#flattenOneSegment} completed,
+ * {@link CompactingMemStore#swapPipelineWithNull} could execute.
+ */
+ private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
+ /**
+ * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
+ * snapshot thread starts {@link CompactingMemStore#snapshot},because
+ * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
+ */
+ private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
+ /**
+ * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
+ */
+ private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
+ private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
+ private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
+ private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
+ private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
+
+ public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
+ HStore store, RegionServicesForStores regionServices,
+ MemoryCompactionPolicy compactionPolicy) throws IOException {
+ super(conf, cellComparator, store, regionServices, compactionPolicy);
+ }
+
+ @Override
+ public VersionedSegmentsList getImmutableSegments() {
+ VersionedSegmentsList result = super.getImmutableSegments();
+ if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
+ int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
+ if (currentCount <= 1) {
+ try {
+ flattenOneSegmentPreCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
+ if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
+ int currentCount = swapPipelineWithNullCounter.incrementAndGet();
+ if (currentCount <= 1) {
+ try {
+ flattenOneSegmentPostCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ boolean result = super.swapPipelineWithNull(segments);
+ if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
+ int currentCount = swapPipelineWithNullCounter.get();
+ if (currentCount <= 1) {
+ assertTrue(!result);
+ }
+ if (currentCount == 2) {
+ assertTrue(result);
+ }
+ }
+ return result;
+
+ }
+
+ @Override
+ public void flattenOneSegment(long requesterVersion, Action action) {
+ int currentCount = flattenOneSegmentCounter.incrementAndGet();
+ if (currentCount <= 1) {
+ try {
+ /**
+ * {@link CompactingMemStore#snapshot} could start.
+ */
+ snapShotStartCyclicCyclicBarrier.await();
+ flattenOneSegmentPreCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ super.flattenOneSegment(requesterVersion, action);
+ if (currentCount <= 1) {
+ try {
+ flattenOneSegmentPostCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ protected boolean setInMemoryCompactionFlag() {
+ boolean result = super.setInMemoryCompactionFlag();
+ assertTrue(result);
+ setInMemoryCompactionFlagCounter.incrementAndGet();
+ return result;
+ }
+
+ @Override
+ void inMemoryCompaction() {
+ try {
+ super.inMemoryCompaction();
+ } finally {
+ try {
+ inMemoryCompactionEndCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+ }
+
+ }
+
+ public static class MyCompactingMemStore5 extends CompactingMemStore {
+ private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
+ private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
+ /**
+ * {@link CompactingMemStore#flattenOneSegment} must execute after
+ * {@link CompactingMemStore#getImmutableSegments}
+ */
+ private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
+ /**
+ * Only after {@link CompactingMemStore#flattenOneSegment} completed,
+ * {@link CompactingMemStore#swapPipelineWithNull} could execute.
+ */
+ private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
+ /**
+ * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
+ * snapshot thread starts {@link CompactingMemStore#snapshot},because
+ * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
+ */
+ private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
+ /**
+ * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
+ */
+ private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
+ private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
+ private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
+ private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
+ private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
+ /**
+ * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
+ * thread could start.
+ */
+ private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
+ /**
+ * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
+ * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
+ * execute,and in memory compact thread would exit,because we expect that in memory compact
+ * executing only once.
+ */
+ private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
+
+ public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
+ HStore store, RegionServicesForStores regionServices,
+ MemoryCompactionPolicy compactionPolicy) throws IOException {
+ super(conf, cellComparator, store, regionServices, compactionPolicy);
+ }
+
+ @Override
+ public VersionedSegmentsList getImmutableSegments() {
+ VersionedSegmentsList result = super.getImmutableSegments();
+ if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
+ int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
+ if (currentCount <= 1) {
+ try {
+ flattenOneSegmentPreCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ return result;
+ }
+
+ @Override
+ protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
+ if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
+ int currentCount = swapPipelineWithNullCounter.incrementAndGet();
+ if (currentCount <= 1) {
+ try {
+ flattenOneSegmentPostCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ if (currentCount == 2) {
+ try {
+ /**
+ * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
+ * writeAgain thread could start.
+ */
+ writeMemStoreAgainStartCyclicBarrier.await();
+ /**
+ * Only the writeAgain thread completes, retry
+ * {@link CompactingMemStore#swapPipelineWithNull} would execute.
+ */
+ writeMemStoreAgainEndCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+ boolean result = super.swapPipelineWithNull(segments);
+ if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
+ int currentCount = swapPipelineWithNullCounter.get();
+ if (currentCount <= 1) {
+ assertTrue(!result);
+ }
+ if (currentCount == 2) {
+ assertTrue(result);
+ }
+ }
+ return result;
+
+ }
+
+ @Override
+ public void flattenOneSegment(long requesterVersion, Action action) {
+ int currentCount = flattenOneSegmentCounter.incrementAndGet();
+ if (currentCount <= 1) {
+ try {
+ /**
+ * {@link CompactingMemStore#snapshot} could start.
+ */
+ snapShotStartCyclicCyclicBarrier.await();
+ flattenOneSegmentPreCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ super.flattenOneSegment(requesterVersion, action);
+ if (currentCount <= 1) {
+ try {
+ flattenOneSegmentPostCyclicBarrier.await();
+ /**
+ * Only the writeAgain thread completes, in memory compact thread would exit,because we
+ * expect that in memory compact executing only once.
+ */
+ writeMemStoreAgainEndCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+ }
+
+ @Override
+ protected boolean setInMemoryCompactionFlag() {
+ boolean result = super.setInMemoryCompactionFlag();
+ int count = setInMemoryCompactionFlagCounter.incrementAndGet();
+ if (count <= 1) {
+ assertTrue(result);
+ }
+ if (count == 2) {
+ assertTrue(!result);
+ }
+ return result;
+ }
+
+ @Override
+ void inMemoryCompaction() {
+ try {
+ super.inMemoryCompaction();
+ } finally {
+ try {
+ inMemoryCompactionEndCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+ }
+ }
}