You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/01/01 13:06:09 UTC

[hbase] branch branch-2.5 updated: HBASE-26494 Using RefCnt to fix the flawed MemStoreLABImpl (#3983)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new efeec91  HBASE-26494 Using RefCnt to fix the flawed MemStoreLABImpl (#3983)
efeec91 is described below

commit efeec919ded0626c21abf12136500da8ec933c3d
Author: chenglei <ch...@apache.org>
AuthorDate: Sat Jan 1 20:50:52 2022 +0800

    HBASE-26494 Using RefCnt to fix the flawed MemStoreLABImpl (#3983)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hbase/regionserver/CompactionPipeline.java     |  2 +-
 .../hbase/regionserver/ImmutableMemStoreLAB.java   | 61 +++++++------
 .../hadoop/hbase/regionserver/MemStoreLABImpl.java | 31 ++++---
 .../hadoop/hbase/regionserver/TestHStore.java      | 99 +++++++++++++++++++++-
 .../regionserver/TestStoreScannerClosure.java      |  2 +-
 5 files changed, 154 insertions(+), 41 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 2576f78..eef25f1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -68,7 +68,7 @@ public class CompactionPipeline {
   /**
    * <pre>
    * Version is volatile to ensure it is atomically read when not using a lock.
-   * To indicate whether the suffix of pipleline changes:
+   * To indicate whether the suffix of pipeline changes:
    * 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only
    *   added at Head, {@link #version} not change.
    * 2.for {@link CompactionPipeline#swap},{@link #version} increase.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
index 0ce52b6..bc4bf78 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
@@ -17,10 +17,12 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.nio.RefCnt;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -31,13 +33,16 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public class ImmutableMemStoreLAB implements MemStoreLAB {
 
-  private final AtomicInteger openScannerCount = new AtomicInteger();
-  private volatile boolean closed = false;
+  private final RefCnt refCnt;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
 
   private final List<MemStoreLAB> mslabs;
 
   public ImmutableMemStoreLAB(List<MemStoreLAB> mslabs) {
     this.mslabs = mslabs;
+    this.refCnt = RefCnt.create(() -> {
+      closeMSLABs();
+    });
   }
 
   @Override
@@ -85,47 +90,43 @@ public class ImmutableMemStoreLAB implements MemStoreLAB {
 
   @Override
   public void close() {
-    // 'openScannerCount' here tracks the scanners opened on segments which directly refer to this
-    // MSLAB. The individual MSLABs this refers also having its own 'openScannerCount'. The usage of
+    // 'refCnt' here tracks the scanners opened on segments which directly refer to this
+    // MSLAB. The individual MSLABs this refers also having its own 'refCnt'. The usage of
     // the variable in close() and decScannerCount() is as as that in HeapMemstoreLAB. Here the
     // close just delegates the call to the individual MSLABs. The actual return of the chunks to
     // MSLABPool will happen within individual MSLABs only (which is at the leaf level).
     // Say an ImmutableMemStoreLAB is created over 2 HeapMemStoreLABs at some point and at that time
-    // both of them were referred by ongoing scanners. So they have > 0 'openScannerCount'. Now over
-    // the new Segment some scanners come in and this MSLABs 'openScannerCount' also goes up and
+    // both of them were referred by ongoing scanners. So they have > 0 'refCnt'. Now over
+    // the new Segment some scanners come in and this MSLABs 'refCnt' also goes up and
     // then come down on finish of scanners. Now a close() call comes to this Immutable MSLAB. As
-    // it's 'openScannerCount' is zero it will call close() on both of the Heap MSLABs. Say by that
+    // it's 'refCnt' is zero it will call close() on both of the Heap MSLABs. Say by that
     // time the old scanners on one of the MSLAB got over where as on the other, still an old
     // scanner is going on. The call close() on that MSLAB will not close it immediately but will
-    // just mark it for closure as it's 'openScannerCount' still > 0. Later once the old scan is
+    // just decrease it's 'refCnt' and it's 'refCnt' still > 0. Later once the old scan is
     // over, the decScannerCount() call will do the actual close and return of the chunks.
-    this.closed = true;
+    if (!this.closed.compareAndSet(false, true)) {
+      return;
+    }
     // When there are still on going scanners over this MSLAB, we will defer the close until all
-    // scanners finish. We will just mark it for closure. See #decScannerCount(). This will be
-    // called at end of every scan. When it is marked for closure and scanner count reached 0, we
-    // will do the actual close then.
-    checkAndCloseMSLABs(openScannerCount.get());
+    // scanners finish. We will just decrease it's 'refCnt'. See #decScannerCount(). This will be
+    // called at end of every scan. When it's 'refCnt' reached 0, we will do the actual close then.
+    this.refCnt.release();
   }
 
-  private void checkAndCloseMSLABs(int openScanners) {
-    if (openScanners == 0) {
-      for (MemStoreLAB mslab : this.mslabs) {
-        mslab.close();
-      }
+  private void closeMSLABs() {
+    for (MemStoreLAB mslab : this.mslabs) {
+      mslab.close();
     }
   }
 
   @Override
   public void incScannerCount() {
-    this.openScannerCount.incrementAndGet();
+    this.refCnt.retain();
   }
 
   @Override
   public void decScannerCount() {
-    int count = this.openScannerCount.decrementAndGet();
-    if (this.closed) {
-      checkAndCloseMSLABs(count);
-    }
+    this.refCnt.release();
   }
 
   @Override
@@ -138,5 +139,17 @@ public class ImmutableMemStoreLAB implements MemStoreLAB {
     return ChunkCreator.getInstance().isOffheap();
   }
 
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  int getRefCntValue() {
+    return this.refCnt.refCnt();
+  }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  boolean isClosed() {
+    return this.closed.get();
+  }
+
 
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index 5bc47dc..33174ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -18,13 +18,13 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.nio.ByteBuffer;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ByteBufferExtendedCell;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.ExtendedCell;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.nio.RefCnt;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,8 +86,11 @@ public class MemStoreLABImpl implements MemStoreLAB {
   // This flag is for reclaiming chunks. Its set when putting chunks back to
   // pool
   private final AtomicBoolean reclaimed = new AtomicBoolean(false);
-  // Current count of open scanners which reading data from this MemStoreLAB
-  private final AtomicInteger openScannerCount = new AtomicInteger();
+  /**
+   * Its initial value is 1, so it is one bigger than the current count of open scanners which
+   * reading data from this MemStoreLAB.
+   */
+  private final RefCnt refCnt;
 
   // Used in testing
   public MemStoreLABImpl() {
@@ -100,6 +104,9 @@ public class MemStoreLABImpl implements MemStoreLAB {
     // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
     Preconditions.checkArgument(maxAlloc <= dataChunkSize,
         MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
+    this.refCnt = RefCnt.create(() -> {
+      recycleChunks();
+    });
 
     // if user requested to work with MSLABs (whether on- or off-heap), then the
     // immutable segments are going to use CellChunkMap as their index
@@ -264,14 +271,13 @@ public class MemStoreLABImpl implements MemStoreLAB {
     }
     // We could put back the chunks to pool for reusing only when there is no
     // opening scanner which will read their data
-    int count  = openScannerCount.get();
-    if(count == 0) {
-      recycleChunks();
-    }
+    this.refCnt.release();
   }
 
-  int getOpenScannerCount() {
-    return this.openScannerCount.get();
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  int getRefCntValue() {
+    return this.refCnt.refCnt();
   }
 
   /**
@@ -279,7 +285,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
    */
   @Override
   public void incScannerCount() {
-    this.openScannerCount.incrementAndGet();
+    this.refCnt.retain();
   }
 
   /**
@@ -287,10 +293,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
    */
   @Override
   public void decScannerCount() {
-    int count = this.openScannerCount.decrementAndGet();
-    if (this.closed.get() && count == 0) {
-      recycleChunks();
-    }
+    this.refCnt.release();
   }
 
   private void recycleChunks() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 9009cb2..2fdd627 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -2361,7 +2361,7 @@ public class TestHStore {
     MemStoreLABImpl memStoreLAB =
         (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
     assertTrue(memStoreLAB.isClosed());
-    assertTrue(memStoreLAB.getOpenScannerCount() == 0);
+    assertTrue(memStoreLAB.getRefCntValue() == 0);
     assertTrue(memStoreLAB.isReclaimed());
     assertTrue(memStoreLAB.chunks.isEmpty());
     StoreScanner storeScanner = null;
@@ -2436,6 +2436,103 @@ public class TestHStore {
     }
   }
 
+  /**
+   * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB}
+   */
+  @Test
+  public void testImmutableMemStoreLABRefCnt() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+
+    byte[] smallValue = new byte[3];
+    byte[] largeValue = new byte[9];
+    final long timestamp = EnvironmentEdgeManager.currentTime();
+    final long seqId = 100;
+    final Cell smallCell1 = createCell(qf1, timestamp, seqId, smallValue);
+    final Cell largeCell1 = createCell(qf2, timestamp, seqId, largeValue);
+    final Cell smallCell2 = createCell(qf3, timestamp, seqId+1, smallValue);
+    final Cell largeCell2 = createCell(qf4, timestamp, seqId+1, largeValue);
+    final Cell smallCell3 = createCell(qf5, timestamp, seqId+2, smallValue);
+    final Cell largeCell3 = createCell(qf6, timestamp, seqId+2, largeValue);
+
+    int smallCellByteSize = MutableSegment.getCellLength(smallCell1);
+    int largeCellByteSize = MutableSegment.getCellLength(largeCell1);
+    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
+    int flushByteSize = firstWriteCellByteSize - 2;
+
+    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
+    conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
+    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
+    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
+    conf.setBoolean(WALFactory.WAL_ENABLED, false);
+
+    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
+        .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
+
+    final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore);
+    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
+    myCompactingMemStore.allowCompaction.set(false);
+
+    NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
+    store.add(smallCell1, memStoreSizing);
+    store.add(largeCell1, memStoreSizing);
+    store.add(smallCell2, memStoreSizing);
+    store.add(largeCell2, memStoreSizing);
+    store.add(smallCell3, memStoreSizing);
+    store.add(largeCell3, memStoreSizing);
+    VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
+    assertTrue(versionedSegmentsList.getNumOfSegments() == 3);
+    List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments();
+    List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size());
+    for (ImmutableSegment segment : segments) {
+      memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB());
+    }
+    List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
+    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
+      assertTrue(memStoreLAB.getRefCntValue() == 2);
+    }
+
+    myCompactingMemStore.allowCompaction.set(true);
+    myCompactingMemStore.flushInMemory();
+
+    versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
+    assertTrue(versionedSegmentsList.getNumOfSegments() == 1);
+    ImmutableMemStoreLAB immutableMemStoreLAB =
+        (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB());
+    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
+      assertTrue(memStoreLAB.getRefCntValue() == 2);
+    }
+
+    List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
+    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
+      assertTrue(memStoreLAB.getRefCntValue() == 2);
+    }
+    assertTrue(immutableMemStoreLAB.getRefCntValue() == 2);
+    for (KeyValueScanner scanner : scanners1) {
+      scanner.close();
+    }
+    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
+      assertTrue(memStoreLAB.getRefCntValue() == 1);
+    }
+    for (KeyValueScanner scanner : scanners2) {
+      scanner.close();
+    }
+    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
+      assertTrue(memStoreLAB.getRefCntValue() == 1);
+    }
+    assertTrue(immutableMemStoreLAB.getRefCntValue() == 1);
+    flushStore(store, id++);
+    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
+      assertTrue(memStoreLAB.getRefCntValue() == 0);
+    }
+    assertTrue(immutableMemStoreLAB.getRefCntValue() == 0);
+    assertTrue(immutableMemStoreLAB.isClosed());
+    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
+      assertTrue(memStoreLAB.isClosed());
+      assertTrue(memStoreLAB.isReclaimed());
+      assertTrue(memStoreLAB.chunks.isEmpty());
+    }
+  }
+
   private HStoreFile mockStoreFileWithLength(long length) {
     HStoreFile sf = mock(HStoreFile.class);
     StoreFileReader sfr = mock(StoreFileReader.class);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
index 23b1693..f20691b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
@@ -160,7 +160,7 @@ public class TestStoreScannerClosure {
           memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB();
           if (memStoreLAB != null) {
             // There should be no unpooled chunks
-            int refCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount();
+            int refCount = ((MemStoreLABImpl) memStoreLAB).getRefCntValue();
             assertTrue("The memstore should not have unpooled chunks", refCount == 0);
           }
         }