You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/01 15:06:41 UTC

[hbase] branch branch-2 updated: HBASE-26384 Segment already flushed to hfile may still be remained in CompactingMemStore (#3779)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new e89f62e  HBASE-26384 Segment already flushed to hfile may still be remained in CompactingMemStore (#3779)
e89f62e is described below

commit e89f62e611a124f5a82729ea905bf4e7a0e0be4d
Author: chenglei <ch...@apache.org>
AuthorDate: Mon Nov 1 23:05:57 2021 +0800

    HBASE-26384 Segment already flushed to hfile may still be remained in CompactingMemStore (#3779)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hbase/regionserver/CompactingMemStore.java     |   8 +-
 .../hbase/regionserver/CompactionPipeline.java     |  65 ++-
 .../hadoop/hbase/regionserver/TestHStore.java      | 509 +++++++++++++++++++++
 3 files changed, 576 insertions(+), 6 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 5da0de9..a9683ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -567,12 +567,12 @@ public class CompactingMemStore extends AbstractMemStore {
     boolean done = false;
     while (!done) {
       iterationsCnt++;
-      VersionedSegmentsList segments = pipeline.getVersionedList();
+      VersionedSegmentsList segments = getImmutableSegments();
       pushToSnapshot(segments.getStoreSegments());
       // swap can return false in case the pipeline was updated by ongoing compaction
       // and the version increase, the chance of it happenning is very low
       // In Swap: don't close segments (they are in snapshot now) and don't update the region size
-      done = pipeline.swap(segments, null, false, false);
+      done = swapPipelineWithNull(segments);
       if (iterationsCnt>2) {
         // practically it is impossible that this loop iterates more than two times
         // (because the compaction is stopped and none restarts it while in snapshot request),
@@ -585,6 +585,10 @@ public class CompactingMemStore extends AbstractMemStore {
     }
   }
 
+  protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
+    return pipeline.swap(segments, null, false, false);
+  }
+
   private void pushToSnapshot(List<ImmutableSegment> segments) {
     if(segments.isEmpty()) return;
     if(segments.size() == 1 && !segments.get(0).isEmpty()) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 62d1f59..2576f78 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.ListIterator;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -64,7 +65,16 @@ public class CompactionPipeline {
   private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
   // The list is volatile to avoid reading a new allocated reference before the c'tor is executed
   private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
-  // Version is volatile to ensure it is atomically read when not using a lock
+  /**
+   * <pre>
+   * Version is volatile to ensure it is atomically read when not using a lock.
+   * To indicate whether the suffix of pipleline changes:
+   * 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only
+   *   added at Head, {@link #version} not change.
+   * 2.for {@link CompactionPipeline#swap},{@link #version} increase.
+   * 3.for {@link CompactionPipeline#replaceAtIndex},{@link #version} increase.
+   * </pre>
+   */
   private volatile long version = 0;
 
   public CompactionPipeline(RegionServicesForStores region) {
@@ -95,7 +105,7 @@ public class CompactionPipeline {
 
   public VersionedSegmentsList getVersionedTail() {
     synchronized (pipeline){
-      List<ImmutableSegment> segmentList = new ArrayList<>();
+      ArrayList<ImmutableSegment> segmentList = new ArrayList<>();
       if(!pipeline.isEmpty()) {
         segmentList.add(0, pipeline.getLast());
       }
@@ -290,10 +300,15 @@ public class CompactionPipeline {
     return memStoreSizing.getMemStoreSize();
   }
 
+  /**
+   * Must be called under the {@link CompactionPipeline#pipeline} Lock.
+   */
   private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
       boolean closeSegmentsInSuffix) {
-    pipeline.removeAll(suffix);
-    if(segment != null) pipeline.addLast(segment);
+    matchAndRemoveSuffixFromPipeline(suffix);
+    if (segment != null) {
+      pipeline.addLast(segment);
+    }
     // During index merge we won't be closing the segments undergoing the merge. Segment#close()
     // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
     // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
@@ -307,11 +322,53 @@ public class CompactionPipeline {
     }
   }
 
+  /**
+   * Checking that the {@link Segment}s in suffix input parameter is same as the {@link Segment}s in
+   * {@link CompactionPipeline#pipeline} one by one from the last element to the first element of
+   * suffix. If matched, remove suffix from {@link CompactionPipeline#pipeline}. <br/>
+   * Must be called under the {@link CompactionPipeline#pipeline} Lock.
+   */
+  private void matchAndRemoveSuffixFromPipeline(List<? extends Segment> suffix) {
+    if (suffix.isEmpty()) {
+      return;
+    }
+    if (pipeline.size() < suffix.size()) {
+      throw new IllegalStateException(
+          "CODE-BUG:pipleine size:[" + pipeline.size() + "],suffix size:[" + suffix.size()
+              + "],pipeline size must greater than or equals suffix size");
+    }
+
+    ListIterator<? extends Segment> suffixIterator = suffix.listIterator(suffix.size());
+    ListIterator<? extends Segment> pipelineIterator = pipeline.listIterator(pipeline.size());
+    int count = 0;
+    while (suffixIterator.hasPrevious()) {
+      Segment suffixSegment = suffixIterator.previous();
+      Segment pipelineSegment = pipelineIterator.previous();
+      if (suffixSegment != pipelineSegment) {
+        throw new IllegalStateException("CODE-BUG:suffix last:[" + count + "]" + suffixSegment
+            + " is not pipleline segment:[" + pipelineSegment + "]");
+      }
+      count++;
+    }
+
+    for (int index = 1; index <= count; index++) {
+      pipeline.pollLast();
+    }
+
+  }
+
   // replacing one segment in the pipeline with a new one exactly at the same index
   // need to be called only within synchronized block
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
+      justification = "replaceAtIndex is invoked under a synchronize block so safe")
   private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
     pipeline.set(idx, newSegment);
     readOnlyCopy = new LinkedList<>(pipeline);
+    // the version increment is indeed needed, because the swap uses removeAll() method of the
+    // linked-list that compares the objects to find what to remove.
+    // The flattening changes the segment object completely (creation pattern) and so
+    // swap will not proceed correctly after concurrent flattening.
+    version++;
   }
 
   public Segment getTail() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index b0cb1ee..3a0adb1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -97,6 +97,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
+import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
@@ -1935,6 +1936,225 @@ public class TestHStore {
     }
   }
 
+  /**
+   * <pre>
+    * This test is for HBASE-26384,
+   * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
+   * execute concurrently.
+   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
+   * for both branch-2 and master):
+   * 1. The {@link CompactingMemStore} size exceeds
+   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
+   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
+   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
+   * 2. The in memory compact thread starts and then stopping before
+   *    {@link CompactingMemStore#flattenOneSegment}.
+   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
+   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
+   *    compact thread continues.
+   *    Assuming {@link VersionedSegmentsList#version} returned from
+   *    {@link CompactingMemStore#getImmutableSegments} is v.
+   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
+   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
+   *    {@link CompactionPipeline#version} is still v.
+   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
+   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
+   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
+   *    {@link CompactionPipeline} has changed because
+   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
+   *    removed in fact and still remaining in {@link CompactionPipeline}.
+   *
+   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
+   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
+   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
+   *    v+1.
+   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
+   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
+   *    failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
+   *    again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
+   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.
+   * </pre>
+   */
+  @Test
+  public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+
+    byte[] smallValue = new byte[3];
+    byte[] largeValue = new byte[9];
+    final long timestamp = EnvironmentEdgeManager.currentTime();
+    final long seqId = 100;
+    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
+    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
+    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
+    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
+    int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
+    int flushByteSize = totalCellByteSize - 2;
+
+    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
+    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
+    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
+    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
+
+    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
+        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
+
+    MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
+    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
+
+    store.add(smallCell, new NonThreadSafeMemStoreSizing());
+    store.add(largeCell, new NonThreadSafeMemStoreSizing());
+
+    String oldThreadName = Thread.currentThread().getName();
+    try {
+      Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
+      /**
+       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
+       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
+       * would invoke {@link CompactingMemStore#stopCompaction}.
+       */
+      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
+
+      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
+      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
+
+      assertTrue(memStoreSnapshot.getCellsCount() == 2);
+      assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
+      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
+      assertTrue(segments.getNumOfSegments() == 0);
+      assertTrue(segments.getNumOfCells() == 0);
+      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
+      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
+    } finally {
+      Thread.currentThread().setName(oldThreadName);
+    }
+  }
+
+  /**
+   * <pre>
+   * This test is for HBASE-26384,
+   * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
+   * and writeMemStore execute concurrently.
+   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
+   * for both branch-2 and master):
+   * 1. The {@link CompactingMemStore} size exceeds
+   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
+   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
+   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
+   * 2. The in memory compact thread starts and then stopping before
+   *    {@link CompactingMemStore#flattenOneSegment}.
+   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
+   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
+   *    compact thread continues.
+   *    Assuming {@link VersionedSegmentsList#version} returned from
+   *    {@link CompactingMemStore#getImmutableSegments} is v.
+   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
+   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
+   *    {@link CompactionPipeline#version} is still v.
+   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
+   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
+   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
+   *    {@link CompactionPipeline} has changed because
+   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
+   *    removed in fact and still remaining in {@link CompactionPipeline}.
+   *
+   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
+   * and I add step 7-8 to test there is new segment added before retry.
+   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
+   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
+   *     v+1.
+   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
+   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
+   *    failed and retry,{@link VersionedSegmentsList#version} returned from
+   *    {@link CompactingMemStore#getImmutableSegments} is v+1.
+   * 7. The write thread continues writing to {@link CompactingMemStore} and
+   *    {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
+   *    {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
+   *    {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
+   *    {@link CompactionPipeline#version} is still v+1.
+   * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
+   *    {@link CompactionPipeline#version} is still v+1,
+   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
+   *    remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
+   *    {@link CompactingMemStore#swapPipelineWithNull}.
+   * </pre>
+   */
+  @Test
+  public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+
+    byte[] smallValue = new byte[3];
+    byte[] largeValue = new byte[9];
+    final long timestamp = EnvironmentEdgeManager.currentTime();
+    final long seqId = 100;
+    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
+    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
+    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
+    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
+    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
+    int flushByteSize = firstWriteCellByteSize - 2;
+
+    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
+    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
+    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
+    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
+
+    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
+        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
+
+    final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
+    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
+
+    store.add(smallCell, new NonThreadSafeMemStoreSizing());
+    store.add(largeCell, new NonThreadSafeMemStoreSizing());
+
+    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
+    final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
+    final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
+    final int writeAgainCellByteSize = MutableSegment.getCellLength(writeAgainCell1)
+        + MutableSegment.getCellLength(writeAgainCell2);
+    final Thread writeAgainThread = new Thread(() -> {
+      try {
+        myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
+
+        store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
+        store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
+
+        myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
+      } catch (Throwable exception) {
+        exceptionRef.set(exception);
+      }
+    });
+    writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
+    writeAgainThread.start();
+
+    String oldThreadName = Thread.currentThread().getName();
+    try {
+      Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
+      /**
+       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
+       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
+       * would invoke {@link CompactingMemStore#stopCompaction}.
+       */
+      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
+      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
+      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
+      writeAgainThread.join();
+
+      assertTrue(memStoreSnapshot.getCellsCount() == 2);
+      assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
+      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
+      assertTrue(segments.getNumOfSegments() == 1);
+      assertTrue(
+        ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
+      assertTrue(segments.getNumOfCells() == 2);
+      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
+      assertTrue(exceptionRef.get() == null);
+      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
+    } finally {
+      Thread.currentThread().setName(oldThreadName);
+    }
+  }
+
   private HStoreFile mockStoreFileWithLength(long length) {
     HStoreFile sf = mock(HStoreFile.class);
     StoreFileReader sfr = mock(StoreFileReader.class);
@@ -2300,4 +2520,293 @@ public class TestHStore {
     }
 
   }
+
+  public static class MyCompactingMemStore4 extends CompactingMemStore {
+    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
+    /**
+     * {@link CompactingMemStore#flattenOneSegment} must execute after
+     * {@link CompactingMemStore#getImmutableSegments}
+     */
+    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
+    /**
+     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
+     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
+     */
+    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
+    /**
+     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
+     * snapshot thread starts {@link CompactingMemStore#snapshot},because
+     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
+     */
+    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
+    /**
+     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
+     */
+    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
+    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
+    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
+    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
+    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
+
+    public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
+        HStore store, RegionServicesForStores regionServices,
+        MemoryCompactionPolicy compactionPolicy) throws IOException {
+      super(conf, cellComparator, store, regionServices, compactionPolicy);
+    }
+
+    @Override
+    public VersionedSegmentsList getImmutableSegments() {
+      VersionedSegmentsList result = super.getImmutableSegments();
+      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
+        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
+        if (currentCount <= 1) {
+          try {
+            flattenOneSegmentPreCyclicBarrier.await();
+          } catch (Throwable e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+      return result;
+    }
+
+    @Override
+    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
+      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
+        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
+        if (currentCount <= 1) {
+          try {
+            flattenOneSegmentPostCyclicBarrier.await();
+          } catch (Throwable e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+      boolean result = super.swapPipelineWithNull(segments);
+      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
+        int currentCount = swapPipelineWithNullCounter.get();
+        if (currentCount <= 1) {
+          assertTrue(!result);
+        }
+        if (currentCount == 2) {
+          assertTrue(result);
+        }
+      }
+      return result;
+
+    }
+
+    @Override
+    public void flattenOneSegment(long requesterVersion, Action action) {
+      int currentCount = flattenOneSegmentCounter.incrementAndGet();
+      if (currentCount <= 1) {
+        try {
+          /**
+           * {@link CompactingMemStore#snapshot} could start.
+           */
+          snapShotStartCyclicCyclicBarrier.await();
+          flattenOneSegmentPreCyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+      }
+      super.flattenOneSegment(requesterVersion, action);
+      if (currentCount <= 1) {
+        try {
+          flattenOneSegmentPostCyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    @Override
+    protected boolean setInMemoryCompactionFlag() {
+      boolean result = super.setInMemoryCompactionFlag();
+      assertTrue(result);
+      setInMemoryCompactionFlagCounter.incrementAndGet();
+      return result;
+    }
+
+    @Override
+    void inMemoryCompaction() {
+      try {
+        super.inMemoryCompaction();
+      } finally {
+        try {
+          inMemoryCompactionEndCyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+
+      }
+    }
+
+  }
+
+  public static class MyCompactingMemStore5 extends CompactingMemStore {
+    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
+    private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
+    /**
+     * {@link CompactingMemStore#flattenOneSegment} must execute after
+     * {@link CompactingMemStore#getImmutableSegments}
+     */
+    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
+    /**
+     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
+     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
+     */
+    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
+    /**
+     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
+     * snapshot thread starts {@link CompactingMemStore#snapshot},because
+     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
+     */
+    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
+    /**
+     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
+     */
+    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
+    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
+    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
+    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
+    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
+    /**
+     * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
+     * thread could start.
+     */
+    private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
+    /**
+     * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
+     * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
+     * execute,and in memory compact thread would exit,because we expect that in memory compact
+     * executing only once.
+     */
+    private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
+
+    public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
+        HStore store, RegionServicesForStores regionServices,
+        MemoryCompactionPolicy compactionPolicy) throws IOException {
+      super(conf, cellComparator, store, regionServices, compactionPolicy);
+    }
+
+    @Override
+    public VersionedSegmentsList getImmutableSegments() {
+      VersionedSegmentsList result = super.getImmutableSegments();
+      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
+        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
+        if (currentCount <= 1) {
+          try {
+            flattenOneSegmentPreCyclicBarrier.await();
+          } catch (Throwable e) {
+            throw new RuntimeException(e);
+          }
+        }
+
+      }
+
+      return result;
+    }
+
+    @Override
+    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
+      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
+        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
+        if (currentCount <= 1) {
+          try {
+            flattenOneSegmentPostCyclicBarrier.await();
+          } catch (Throwable e) {
+            throw new RuntimeException(e);
+          }
+        }
+
+        if (currentCount == 2) {
+          try {
+            /**
+             * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
+             * writeAgain thread could start.
+             */
+            writeMemStoreAgainStartCyclicBarrier.await();
+            /**
+             * Only the writeAgain thread completes, retry
+             * {@link CompactingMemStore#swapPipelineWithNull} would execute.
+             */
+            writeMemStoreAgainEndCyclicBarrier.await();
+          } catch (Throwable e) {
+            throw new RuntimeException(e);
+          }
+        }
+
+      }
+      boolean result = super.swapPipelineWithNull(segments);
+      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
+        int currentCount = swapPipelineWithNullCounter.get();
+        if (currentCount <= 1) {
+          assertTrue(!result);
+        }
+        if (currentCount == 2) {
+          assertTrue(result);
+        }
+      }
+      return result;
+
+    }
+
+    @Override
+    public void flattenOneSegment(long requesterVersion, Action action) {
+      int currentCount = flattenOneSegmentCounter.incrementAndGet();
+      if (currentCount <= 1) {
+        try {
+          /**
+           * {@link CompactingMemStore#snapshot} could start.
+           */
+          snapShotStartCyclicCyclicBarrier.await();
+          flattenOneSegmentPreCyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+      }
+      super.flattenOneSegment(requesterVersion, action);
+      if (currentCount <= 1) {
+        try {
+          flattenOneSegmentPostCyclicBarrier.await();
+          /**
+           * Only the writeAgain thread completes, in memory compact thread would exit,because we
+           * expect that in memory compact executing only once.
+           */
+          writeMemStoreAgainEndCyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+
+      }
+    }
+
+    @Override
+    protected boolean setInMemoryCompactionFlag() {
+      boolean result = super.setInMemoryCompactionFlag();
+      int count = setInMemoryCompactionFlagCounter.incrementAndGet();
+      if (count <= 1) {
+        assertTrue(result);
+      }
+      if (count == 2) {
+        assertTrue(!result);
+      }
+      return result;
+    }
+
+    @Override
+    void inMemoryCompaction() {
+      try {
+        super.inMemoryCompaction();
+      } finally {
+        try {
+          inMemoryCompactionEndCyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+
+      }
+    }
+  }
 }