You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/08/12 15:30:29 UTC

[hbase] branch branch-2 updated: HBASE-26026 HBase Write may be stuck forever when using CompactingMemStore (#3421)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 725474b  HBASE-26026 HBase Write may be stuck forever when using CompactingMemStore (#3421)
725474b is described below

commit 725474bbe3b21fcee16cda56bcf9ac5268ed2bd7
Author: chenglei <ch...@apache.org>
AuthorDate: Thu Aug 12 23:11:19 2021 +0800

    HBASE-26026 HBase Write may be stuck forever when using CompactingMemStore (#3421)
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hbase/regionserver/AbstractMemStore.java       |   6 +-
 .../hbase/regionserver/CompactingMemStore.java     |  27 +++-
 .../hbase/regionserver/CompactionPipeline.java     |  10 +-
 .../hbase/regionserver/MemStoreCompactor.java      |   3 +-
 .../hbase/regionserver/TestCompactingMemStore.java |   8 +-
 .../hbase/regionserver/TestDefaultMemStore.java    |  21 +++-
 .../hadoop/hbase/regionserver/TestHStore.java      | 140 +++++++++++++++++++++
 7 files changed, 202 insertions(+), 13 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index 556cf04..4b923ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -70,7 +70,9 @@ public abstract class AbstractMemStore implements MemStore {
 
   protected static void addToScanners(Segment segment, long readPt,
       List<KeyValueScanner> scanners) {
-    scanners.add(segment.getScanner(readPt));
+    if (!segment.isEmpty()) {
+      scanners.add(segment.getScanner(readPt));
+    }
   }
 
   protected AbstractMemStore(final Configuration conf, final CellComparator c,
@@ -156,7 +158,7 @@ public abstract class AbstractMemStore implements MemStore {
     }
   }
 
-  private void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
+  protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
     Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false);
     boolean mslabUsed = (toAdd != cell);
     // This cell data is backed by the same byte[] where we read request in RPC(See
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 4ee2db9..973d5a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -453,9 +453,14 @@ public class CompactingMemStore extends AbstractMemStore {
     inMemoryCompaction();
   }
 
-  private void flushInMemory(MutableSegment currActive) {
+  protected void flushInMemory(MutableSegment currActive) {
     LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
-    pushActiveToPipeline(currActive);
+    // NOTE: Due to concurrent writes and because we first add cell size to currActive.getDataSize
+    // and then actually add cell to currActive.cellSet, it is possible that
+    // currActive.getDataSize could not accommodate cellToAdd but currActive.cellSet is still
+    // empty if pending writes which not yet add cells to currActive.cellSet.
+    // so here we should not check currActive.isEmpty or not.
+    pushActiveToPipeline(currActive, false);
   }
 
   void inMemoryCompaction() {
@@ -524,7 +529,23 @@ public class CompactingMemStore extends AbstractMemStore {
   }
 
   protected void pushActiveToPipeline(MutableSegment currActive) {
-    if (!currActive.isEmpty()) {
+    pushActiveToPipeline(currActive, true);
+  }
+
+  /**
+   * NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls this method, due to
+   * concurrent writes and because we first add cell size to currActive.getDataSize and then
+   * actually add cell to currActive.cellSet, it is possible that currActive.getDataSize could not
+   * accommodate cellToAdd but currActive.cellSet is still empty if pending writes which not yet add
+   * cells to currActive.cellSet,so for
+   * {@link CompactingMemStore#flushInMemory(MutableSegment)},checkEmpty parameter is false. But if
+   * {@link CompactingMemStore#snapshot} called this method,because there is no pending
+   * write,checkEmpty parameter could be true.
+   * @param currActive
+   * @param checkEmpty
+   */
+  protected void pushActiveToPipeline(MutableSegment currActive, boolean checkEmpty) {
+    if (!checkEmpty || !currActive.isEmpty()) {
       pipeline.pushHead(currActive);
       resetActive();
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 711cfd3..194e065 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -223,10 +223,16 @@ public class CompactionPipeline {
         LOG.warn("Segment flattening failed, because versions do not match");
         return false;
       }
-      int i = 0;
+      int i = -1;
       for (ImmutableSegment s : pipeline) {
+        i++;
         if ( s.canBeFlattened() ) {
           s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
+          if (s.isEmpty()) {
+            // after s.waitForUpdates() is called, there is no updates preceding,if no cells in s,
+            // we can skip it.
+            continue;
+          }
           // size to be updated
           MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
           ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
@@ -242,9 +248,7 @@ public class CompactionPipeline {
           LOG.debug("Compaction pipeline segment {} flattened", s);
           return true;
         }
-        i++;
       }
-
     }
     // do not update the global memstore size counter and do not increase the version,
     // because all the cells remain in place
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index 4df9ae0..4a99430 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -208,7 +208,8 @@ public class MemStoreCompactor {
     MemStoreSegmentsIterator iterator = null;
     List<ImmutableSegment> segments = versionedList.getStoreSegments();
     for (ImmutableSegment s : segments) {
-      s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
+      s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed.
+      // we skip empty segment when create MemStoreSegmentsIterator following.
     }
 
     switch (action) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 38fa587..297f5cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -176,11 +176,15 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     // use case 1: both kvs in kvset
     this.memstore.add(kv1.clone(), null);
     this.memstore.add(kv2.clone(), null);
-    verifyScanAcrossSnapshot2(kv1, kv2);
+    // snapshot is empty,active segment is not empty,
+    // empty segment is skipped.
+    verifyOneScanAcrossSnapshot2(kv1, kv2);
 
     // use case 2: both kvs in snapshot
     this.memstore.snapshot();
-    verifyScanAcrossSnapshot2(kv1, kv2);
+    // active segment is empty,snapshot is not empty,
+    // empty segment is skipped.
+    verifyOneScanAcrossSnapshot2(kv1, kv2);
 
     // use case 3: first in snapshot second in kvset
     this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 973547d..0dc35c5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -255,11 +255,16 @@ public class TestDefaultMemStore {
     // use case 1: both kvs in kvset
     this.memstore.add(kv1.clone(), null);
     this.memstore.add(kv2.clone(), null);
-    verifyScanAcrossSnapshot2(kv1, kv2);
+    // snapshot is empty,active segment is not empty,
+    // empty segment is skipped.
+    verifyOneScanAcrossSnapshot2(kv1, kv2);
 
     // use case 2: both kvs in snapshot
+    // active segment is empty,snapshot is not empty,
+    // empty segment is skipped.
     this.memstore.snapshot();
-    verifyScanAcrossSnapshot2(kv1, kv2);
+    //
+    verifyOneScanAcrossSnapshot2(kv1, kv2);
 
     // use case 3: first in snapshot second in kvset
     this.memstore = new DefaultMemStore();
@@ -288,6 +293,18 @@ public class TestDefaultMemStore {
     assertNull(scanner1.next());
   }
 
+  protected void verifyOneScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException {
+    List<KeyValueScanner> memstorescanners = this.memstore.getScanners(mvcc.getReadPoint());
+    assertEquals(1, memstorescanners.size());
+    final KeyValueScanner scanner0 = memstorescanners.get(0);
+    scanner0.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
+    Cell n0 = scanner0.next();
+    Cell n1 = scanner0.next();
+    assertTrue(kv1.equals(n0));
+    assertTrue(kv2.equals(n1));
+    assertNull(scanner0.next());
+  }
+
   protected void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected)
       throws IOException {
     scanner.seek(KeyValueUtil.createFirstOnRow(new byte[]{}));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index e024eec..0e0a352 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -43,6 +43,7 @@ import java.util.NavigableSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -1726,6 +1727,67 @@ public class TestHStore {
     assertArrayEquals(table, hFileContext.getTableName());
   }
 
+  @Test
+  public void testCompactingMemStoreStuckBug26026() throws IOException, InterruptedException {
+    Configuration conf = HBaseConfiguration.create();
+
+    byte[] smallValue = new byte[3];
+    byte[] largeValue = new byte[9];
+    final long timestamp = EnvironmentEdgeManager.currentTime();
+    final long seqId = 100;
+    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
+    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
+    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
+    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
+    int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
+
+    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
+    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
+    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
+    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
+
+    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
+        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
+
+    MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
+    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
+    myCompactingMemStore.smallCellPreUpdateCounter.set(0);
+    myCompactingMemStore.smallCellPostUpdateCounter.set(0);
+    myCompactingMemStore.largeCellPreUpdateCounter.set(0);
+    myCompactingMemStore.largeCellPostUpdateCounter.set(0);
+
+    Thread smallCellThread = new Thread(() -> {
+      store.add(smallCell, new NonThreadSafeMemStoreSizing());
+    });
+    smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
+    smallCellThread.start();
+
+    String oldThreadName = Thread.currentThread().getName();
+    try {
+      /**
+       * 1.smallCellThread enters CompactingMemStore.shouldFlushInMemory first, when largeCellThread
+       * enters CompactingMemStore.shouldFlushInMemory, CompactingMemStore.active.getDataSize could
+       * not accommodate cellToAdd and CompactingMemStore.shouldFlushInMemory return true.
+       * <p/>
+       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
+       * can add cell to currentActive . That is to say when largeCellThread called flushInMemory
+       * method, CompactingMemStore.active has no cell.
+       */
+      Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
+      store.add(largeCell, new NonThreadSafeMemStoreSizing());
+      smallCellThread.join();
+
+      for (int i = 0; i < 100; i++) {
+        long currentTimestamp = timestamp + 100 + i;
+        Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
+        store.add(cell, new NonThreadSafeMemStoreSizing());
+      }
+    } finally {
+      Thread.currentThread().setName(oldThreadName);
+    }
+
+  }
+
   private HStoreFile mockStoreFileWithLength(long length) {
     HStoreFile sf = mock(HStoreFile.class);
     StoreFileReader sfr = mock(StoreFileReader.class);
@@ -1925,4 +1987,82 @@ public class TestHStore {
     @Override
     public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
   }
+
+  public static class MyCompactingMemStore2 extends CompactingMemStore {
+    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
+    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
+    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
+    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
+    private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
+    private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
+    private final AtomicInteger largeCellPostUpdateCounter = new AtomicInteger(0);
+    private final AtomicInteger smallCellPostUpdateCounter = new AtomicInteger(0);
+
+    public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
+        HStore store, RegionServicesForStores regionServices,
+        MemoryCompactionPolicy compactionPolicy) throws IOException {
+      super(conf, cellComparator, store, regionServices, compactionPolicy);
+    }
+
+    protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd,
+        MemStoreSizing memstoreSizing) {
+      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
+        int currentCount = largeCellPreUpdateCounter.incrementAndGet();
+        if (currentCount <= 1) {
+          try {
+            /**
+             * smallCellThread enters super.shouldFlushInMemory first, when largeCellThread enters
+             * super.shouldFlushInMemory, currActive.getDataSize could not accommodate cellToAdd and
+             * super.shouldFlushInMemory return true.
+             */
+            preCyclicBarrier.await();
+          } catch (Throwable e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+
+      boolean returnValue = super.shouldFlushInMemory(currActive, cellToAdd, memstoreSizing);
+      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
+        try {
+          preCyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+      }
+      return returnValue;
+    }
+
+    @Override
+    protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
+      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
+        try {
+          /**
+           * After largeCellThread finished flushInMemory method, smallCellThread can add cell to
+           * currentActive . That is to say when largeCellThread called flushInMemory method,
+           * currentActive has no cell.
+           */
+          postCyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+      }
+      super.doAdd(currentActive, cell, memstoreSizing);
+    }
+
+    @Override
+    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
+      super.flushInMemory(currentActiveMutableSegment);
+      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
+        if (largeCellPreUpdateCounter.get() <= 1) {
+          try {
+            postCyclicBarrier.await();
+          } catch (Throwable e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }
+
+  }
 }