You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/25 13:40:10 UTC

[hbase] branch branch-2 updated: HBASE-26465 MemStoreLAB may be released early when its SegmentScanner is scanning (#3859)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 5101f37  HBASE-26465 MemStoreLAB may be released early when its SegmentScanner is scanning (#3859)
5101f37 is described below

commit 5101f372e46cca30e64e55b99eb0b5e394409c98
Author: chenglei <ch...@apache.org>
AuthorDate: Thu Nov 25 20:32:28 2021 +0800

    HBASE-26465 MemStoreLAB may be released early when its SegmentScanner is scanning (#3859)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Reviewed-by: Anoop Sam John <an...@apache.org>
---
 .../hbase/regionserver/AbstractMemStore.java       |   6 +
 .../hbase/regionserver/CompactingMemStore.java     |   3 +
 .../hadoop/hbase/regionserver/DefaultMemStore.java |   9 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   |  23 ++-
 .../hadoop/hbase/regionserver/MemStoreLABImpl.java |  23 ++-
 .../hadoop/hbase/regionserver/TestHStore.java      | 225 +++++++++++++++++++++
 .../hadoop/hbase/regionserver/TestMemStoreLAB.java |   5 +-
 .../regionserver/TestStoreScannerClosure.java      |   4 +-
 8 files changed, 278 insertions(+), 20 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index 4b923ff..56dab21 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -232,6 +232,8 @@ public abstract class AbstractMemStore implements MemStore {
   }
 
   /**
+   * This method is protected under {@link HStore#lock} write lock,<br/>
+   * and this method is used by {@link HStore#updateStorefiles} after flushing is completed.<br/>
    * The passed snapshot was successfully persisted; it can be let go.
    * @param id Id of the snapshot to clean out.
    * @see MemStore#snapshot()
@@ -245,6 +247,10 @@ public abstract class AbstractMemStore implements MemStore {
     }
     // OK. Passed in snapshot is same as current snapshot. If not-empty,
     // create a new snapshot and let the old one go.
+    doClearSnapShot();
+  }
+
+  protected void doClearSnapShot() {
     Segment oldSnapshot = this.snapshot;
     if (!this.snapshot.isEmpty()) {
       this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index a9683ac..eac5fc1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -394,6 +394,9 @@ public class CompactingMemStore extends AbstractMemStore {
     return Bytes.toString(getFamilyNameInBytes());
   }
 
+  /**
+   * This method is protected under {@link HStore#lock} read lock.
+   */
   @Override
   public List<KeyValueScanner> getScanners(long readPt) throws IOException {
     MutableSegment activeTmp = getActive();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index e38c5a3..76bef7e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -132,16 +132,21 @@ public class DefaultMemStore extends AbstractMemStore {
   }
 
   @Override
-  /*
+  /**
+   * This method is protected under {@link HStore#lock} read lock. <br/>
    * Scanners are ordered from 0 (oldest) to newest in increasing order.
    */
   public List<KeyValueScanner> getScanners(long readPt) throws IOException {
     List<KeyValueScanner> list = new ArrayList<>();
     addToScanners(getActive(), readPt, list);
-    addToScanners(snapshot.getAllSegments(), readPt, list);
+    addToScanners(getSnapshotSegments(), readPt, list);
     return list;
   }
 
+  protected List<Segment> getSnapshotSegments() {
+    return snapshot.getAllSegments();
+  }
+
   @Override
   protected List<Segment> getSegments() throws IOException {
     List<Segment> list = new ArrayList<>(2);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 7aa55e0..df49e39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -150,8 +150,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
   private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000;
 
   private static final Logger LOG = LoggerFactory.getLogger(HStore.class);
-
-  protected final MemStore memstore;
+  /**
+   * TODO:After making the {@link DefaultMemStore} extensible in {@link HStore} by HBASE-26476,we
+   * change it back to final.
+   */
+  protected MemStore memstore;
   // This stores directory in the filesystem.
   private final HRegion region;
   protected Configuration conf;
@@ -1222,6 +1225,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     this.lock.writeLock().lock();
     try {
       this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
+      /**
+       * NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may
+       * close {@link DefaultMemStore#snapshot}, which may be used by
+       * {@link DefaultMemStore#getScanners}.
+       */
+      if (snapshotId > 0) {
+        this.memstore.clearSnapshot(snapshotId);
+      }
     } finally {
       // We need the lock, as long as we are updating the storeFiles
       // or changing the memstore. Let us release it before calling
@@ -1230,13 +1241,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
       // the lock.
       this.lock.writeLock().unlock();
     }
-    // We do not need to call clearSnapshot method inside the write lock.
-    // The clearSnapshot itself is thread safe, which can be called at the same time with other
-    // memstore operations expect snapshot and clearSnapshot. And for these two methods, in HRegion
-    // we can guarantee that there is only one onging flush, so they will be no race.
-    if (snapshotId > 0) {
-      this.memstore.clearSnapshot(snapshotId);
-    }
+
     // notify to be called here - only in case of flushes
     notifyChangedReadersObservers(sfs);
     if (LOG.isTraceEnabled()) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index 0d13e49..5bc47dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -81,10 +81,10 @@ public class MemStoreLABImpl implements MemStoreLAB {
 
   // This flag is for closing this instance, its set when clearing snapshot of
   // memstore
-  private volatile boolean closed = false;
+  private final AtomicBoolean closed = new AtomicBoolean(false);;
   // This flag is for reclaiming chunks. Its set when putting chunks back to
   // pool
-  private AtomicBoolean reclaimed = new AtomicBoolean(false);
+  private final AtomicBoolean reclaimed = new AtomicBoolean(false);
   // Current count of open scanners which reading data from this MemStoreLAB
   private final AtomicInteger openScannerCount = new AtomicInteger();
 
@@ -259,7 +259,9 @@ public class MemStoreLABImpl implements MemStoreLAB {
    */
   @Override
   public void close() {
-    this.closed = true;
+    if (!this.closed.compareAndSet(false, true)) {
+      return;
+    }
     // We could put back the chunks to pool for reusing only when there is no
     // opening scanner which will read their data
     int count  = openScannerCount.get();
@@ -286,7 +288,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
   @Override
   public void decScannerCount() {
     int count = this.openScannerCount.decrementAndGet();
-    if (this.closed && count == 0) {
+    if (this.closed.get() && count == 0) {
       recycleChunks();
     }
   }
@@ -294,6 +296,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
   private void recycleChunks() {
     if (reclaimed.compareAndSet(false, true)) {
       chunkCreator.putbackChunks(chunks);
+      chunks.clear();
     }
   }
 
@@ -409,13 +412,21 @@ public class MemStoreLABImpl implements MemStoreLAB {
     return pooledChunks;
   }
 
-  Integer getNumOfChunksReturnedToPool() {
+  Integer getNumOfChunksReturnedToPool(Set<Integer> chunksId) {
     int i = 0;
-    for (Integer id : this.chunks) {
+    for (Integer id : chunksId) {
       if (chunkCreator.isChunkInPool(id)) {
         i++;
       }
     }
     return i;
   }
+
+  boolean isReclaimed() {
+    return reclaimed.get();
+  }
+
+  boolean isClosed() {
+    return closed.get();
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 1a0291a..ad40a98 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -2160,6 +2161,122 @@ public class TestHStore {
     }
   }
 
+  /**
+   * <pre>
+   * This test is for HBASE-26465,
+   * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute
+   * concurrently. The threads sequence before HBASE-26465 is:
+   * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to
+   *  {@link DefaultMemStore}.
+   * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in
+   *   {@link HStore#updateStorefiles} after completed flushing memStore to hfile.
+   * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in
+   *   {@link DefaultMemStore#getScanners},here the scan thread gets the
+   *   {@link DefaultMemStore#snapshot} which is created by the flush thread.
+   * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close
+   *   {@link DefaultMemStore#snapshot},because the reference count of the corresponding
+   *   {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl}
+   *   are recycled.
+   * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a
+   *   {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the
+   *   reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in
+   *   corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may
+   *   be overwritten by other write threads,which may cause serious problem.
+   * After HBASE-26465,{@link DefaultMemStore#getScanners} and
+   * {@link DefaultMemStore#clearSnapshot} could not execute concurrently.
+   * </pre>
+   */
+  @Test
+  public void testClearSnapshotGetScannerConcurrently() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+
+    byte[] smallValue = new byte[3];
+    byte[] largeValue = new byte[9];
+    final long timestamp = EnvironmentEdgeManager.currentTime();
+    final long seqId = 100;
+    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
+    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
+    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    quals.add(qf1);
+    quals.add(qf2);
+
+    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName());
+    conf.setBoolean(WALFactory.WAL_ENABLED, false);
+
+    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
+    MyDefaultMemStore myDefaultMemStore = new MyDefaultMemStore(store.conf, store.getComparator(),
+        store.getHRegion().getRegionServicesForStores());
+    store.memstore = myDefaultMemStore;
+    myDefaultMemStore.store = store;
+
+    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
+    store.add(smallCell, memStoreSizing);
+    store.add(largeCell, memStoreSizing);
+
+    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
+    final Thread flushThread = new Thread(() -> {
+      try {
+        flushStore(store, id++);
+      } catch (Throwable exception) {
+        exceptionRef.set(exception);
+      }
+    });
+    flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME);
+    flushThread.start();
+
+    String oldThreadName = Thread.currentThread().getName();
+    StoreScanner storeScanner = null;
+    try {
+      Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME);
+
+      /**
+       * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot}
+       */
+      myDefaultMemStore.getScannerCyclicBarrier.await();
+
+      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
+      flushThread.join();
+
+      if (myDefaultMemStore.shouldWait) {
+        SegmentScanner segmentScanner = getSegmentScanner(storeScanner);
+        MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
+        assertTrue(memStoreLAB.isClosed());
+        assertTrue(!memStoreLAB.chunks.isEmpty());
+        assertTrue(!memStoreLAB.isReclaimed());
+
+        Cell cell1 = segmentScanner.next();
+        CellUtil.equals(smallCell, cell1);
+        Cell cell2 = segmentScanner.next();
+        CellUtil.equals(largeCell, cell2);
+        assertNull(segmentScanner.next());
+      } else {
+        List<Cell> results = new ArrayList<>();
+        storeScanner.next(results);
+        assertEquals(2, results.size());
+        CellUtil.equals(smallCell, results.get(0));
+        CellUtil.equals(largeCell, results.get(1));
+      }
+      assertTrue(exceptionRef.get() == null);
+    } finally {
+      if (storeScanner != null) {
+        storeScanner.close();
+      }
+      Thread.currentThread().setName(oldThreadName);
+    }
+  }
+
+  private SegmentScanner getSegmentScanner(StoreScanner storeScanner) {
+    List<SegmentScanner> segmentScanners = new ArrayList<SegmentScanner>();
+    for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
+      if (keyValueScanner instanceof SegmentScanner) {
+        segmentScanners.add((SegmentScanner) keyValueScanner);
+      }
+    }
+
+    assertTrue(segmentScanners.size() == 1);
+    return segmentScanners.get(0);
+  }
+
   @Test 
   public void testOnConfigurationChange() throws IOException {
     final int COMMON_MAX_FILES_TO_COMPACT = 10;
@@ -2861,4 +2978,112 @@ public class TestHStore {
       }
     }
   }
+
+  public static class MyDefaultMemStore extends DefaultMemStore {
+    private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread";
+    private static final String FLUSH_THREAD_NAME = "flushMyThread";
+    /**
+     * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread
+     * could start.
+     */
+    private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2);
+    /**
+     * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments}
+     * completed, {@link DefaultMemStore#doClearSnapShot} could continue.
+     */
+    private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2);
+    /**
+     * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot}
+     * completed, {@link DefaultMemStore#getScanners} could continue.
+     */
+    private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2);
+    private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0);
+    private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0);
+    private volatile boolean shouldWait = true;
+    private volatile HStore store = null;
+
+    public MyDefaultMemStore(Configuration conf, CellComparator cellComparator,
+        RegionServicesForStores regionServices)
+        throws IOException {
+      super(conf, cellComparator, regionServices);
+    }
+
+    @Override
+    protected List<Segment> getSnapshotSegments() {
+
+      List<Segment> result = super.getSnapshotSegments();
+
+      if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) {
+        int currentCount = getSnapshotSegmentsCounter.incrementAndGet();
+        if (currentCount == 1) {
+          if (this.shouldWait) {
+            try {
+              /**
+               * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed,
+               * {@link DefaultMemStore#doClearSnapShot} could continue.
+               */
+              preClearSnapShotCyclicBarrier.await();
+              /**
+               * Wait for {@link DefaultMemStore#doClearSnapShot} completed.
+               */
+              postClearSnapShotCyclicBarrier.await();
+
+            } catch (Throwable e) {
+              throw new RuntimeException(e);
+            }
+          }
+        }
+      }
+      return result;
+    }
+
+
+    @Override
+    protected void doClearSnapShot() {
+      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
+        int currentCount = clearSnapshotCounter.incrementAndGet();
+        if (currentCount == 1) {
+          try {
+            if (store.lock.isWriteLockedByCurrentThread()) {
+              shouldWait = false;
+            }
+            /**
+             * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner
+             * thread could start.
+             */
+            getScannerCyclicBarrier.await();
+
+            if (shouldWait) {
+              /**
+               * Wait for {@link DefaultMemStore#getSnapshotSegments} completed.
+               */
+              preClearSnapShotCyclicBarrier.await();
+            }
+          } catch (Throwable e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+      super.doClearSnapShot();
+
+      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
+        int currentCount = clearSnapshotCounter.get();
+        if (currentCount == 1) {
+          if (shouldWait) {
+            try {
+              /**
+               * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed,
+               * {@link DefaultMemStore#getScanners} could continue.
+               */
+              postClearSnapShotCyclicBarrier.await();
+            } catch (Throwable e) {
+              throw new RuntimeException(e);
+            }
+          }
+        }
+      }
+    }
+
+
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
index d9b1035..0c1bad3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
@@ -24,9 +24,11 @@ import static org.junit.Assert.assertTrue;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ByteBufferKeyValue;
@@ -256,11 +258,12 @@ public class TestMemStoreLAB {
       // none of the chunkIds would have been returned back
       assertTrue("All the chunks must have been cleared",
           ChunkCreator.instance.numberOfMappedChunks() != 0);
+      Set<Integer> chunkIds = new HashSet<Integer>(mslab.chunks);
       int pooledChunksNum = mslab.getPooledChunks().size();
       // close the mslab
       mslab.close();
       // make sure all chunks where reclaimed back to pool
-      int queueLength = mslab.getNumOfChunksReturnedToPool();
+      int queueLength = mslab.getNumOfChunksReturnedToPool(chunkIds);
       assertTrue("All chunks in chunk queue should be reclaimed or removed"
           + " after mslab closed but actually: " + (pooledChunksNum-queueLength),
           pooledChunksNum-queueLength == 0);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
index 46e5b06..23b1693 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
@@ -160,8 +160,8 @@ public class TestStoreScannerClosure {
           memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB();
           if (memStoreLAB != null) {
             // There should be no unpooled chunks
-            int openScannerCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount();
-            assertTrue("The memstore should not have unpooled chunks", openScannerCount == 0);
+            int refCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount();
+            assertTrue("The memstore should not have unpooled chunks", refCount == 0);
           }
         }
       }