You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/12/18 04:18:08 UTC

[hbase] branch master updated: HBASE-26488 Memory leak when MemStore retry flushing (#3899)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new da616c0  HBASE-26488 Memory leak when MemStore retry flushing (#3899)
da616c0 is described below

commit da616c00c73977067236ba1d60277586c3625691
Author: chenglei <ch...@apache.org>
AuthorDate: Sat Dec 18 12:17:19 2021 +0800

    HBASE-26488 Memory leak when MemStore retry flushing (#3899)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../apache/hadoop/hbase/regionserver/HStore.java   |   5 -
 .../hbase/regionserver/ImmutableSegment.java       |   4 +
 .../hbase/regionserver/MemStoreSnapshot.java       |  42 ++++---
 .../hadoop/hbase/regionserver/TestHStore.java      | 132 +++++++++++++++++++--
 4 files changed, 148 insertions(+), 35 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 0ee7b57..1555bbe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2404,7 +2404,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
       long snapshotId = -1; // -1 means do not drop
       if (dropMemstoreSnapshot && snapshot != null) {
         snapshotId = snapshot.getId();
-        snapshot.close();
       }
       HStore.this.updateStorefiles(storeFiles, snapshotId);
     }
@@ -2415,10 +2414,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     @Override
     public void abort() throws IOException {
       if (snapshot != null) {
-        //We need to close the snapshot when aborting, otherwise, the segment scanner
-        //won't be closed. If we are using MSLAB, the chunk referenced by those scanners
-        //can't be released, thus memory leak
-        snapshot.close();
         HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId());
       }
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index 8c426bc..972684f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -85,6 +85,10 @@ public abstract class ImmutableSegment extends Segment {
     return res;
   }
 
+  /**
+   * We create a new {@link SnapshotSegmentScanner} to increase the reference count of
+   * {@link MemStoreLABImpl} used by this segment.
+   */
   List<KeyValueScanner> getSnapshotScanners() {
     return Collections.singletonList(new SnapshotSegmentScanner(this));
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
index 3b34828..07eb64d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
@@ -17,31 +17,38 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.List;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import java.io.Closeable;
-import java.util.List;
+
 /**
- * Holds details of the snapshot taken on a MemStore. Details include the snapshot's identifier,
- * count of cells in it and total memory size occupied by all the cells, timestamp information of
- * all the cells and a scanner to read all cells in it.
+ * {@link MemStoreSnapshot} is a Context Object to hold details of the snapshot taken on a MemStore.
+ * Details include the snapshot's identifier, count of cells in it and total memory size occupied by
+ * all the cells, timestamp information of all the cells and the snapshot immutableSegment.
+ * <p>
+ * NOTE:Every time when {@link MemStoreSnapshot#getScanners} is called, we create new
+ * {@link SnapshotSegmentScanner}s on the {@link MemStoreSnapshot#snapshotImmutableSegment},and
+ * {@link Segment#incScannerCount} is invoked in the {@link SnapshotSegmentScanner} ctor to increase
+ * the reference count of {@link MemStoreLAB} which used by
+ * {@link MemStoreSnapshot#snapshotImmutableSegment}, so after we finish using these scanners, we
+ * must call their close method to invoke {@link Segment#decScannerCount}.
  */
 @InterfaceAudience.Private
-public class MemStoreSnapshot implements Closeable {
+public class MemStoreSnapshot {
   private final long id;
   private final int cellsCount;
   private final MemStoreSize memStoreSize;
   private final TimeRangeTracker timeRangeTracker;
-  private final List<KeyValueScanner> scanners;
   private final boolean tagsPresent;
+  private final ImmutableSegment snapshotImmutableSegment;
 
   public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
     this.id = id;
     this.cellsCount = snapshot.getCellsCount();
     this.memStoreSize = snapshot.getMemStoreSize();
     this.timeRangeTracker = snapshot.getTimeRangeTracker();
-    this.scanners = snapshot.getSnapshotScanners();
     this.tagsPresent = snapshot.isTagsPresent();
+    this.snapshotImmutableSegment = snapshot;
   }
 
   /**
@@ -74,10 +81,16 @@ public class MemStoreSnapshot implements Closeable {
   }
 
   /**
-   * @return {@link KeyValueScanner} for iterating over the snapshot
+   * Create new {@link SnapshotSegmentScanner}s for iterating over the snapshot. <br/>
+   * NOTE:Here when create new {@link SnapshotSegmentScanner}s, {@link Segment#incScannerCount} is
+   * invoked in the {@link SnapshotSegmentScanner} ctor,so after we use these
+   * {@link SnapshotSegmentScanner}s, we must call {@link SnapshotSegmentScanner#close} to invoke
+   * {@link Segment#decScannerCount}.
+   * @return {@link KeyValueScanner}s(Which type is {@link SnapshotSegmentScanner}) for iterating
+   *         over the snapshot.
    */
   public List<KeyValueScanner> getScanners() {
-    return scanners;
+    return snapshotImmutableSegment.getSnapshotScanners();
   }
 
   /**
@@ -86,13 +99,4 @@ public class MemStoreSnapshot implements Closeable {
   public boolean isTagsPresent() {
     return this.tagsPresent;
   }
-
-  @Override
-  public void close() {
-    if (this.scanners != null) {
-      for (KeyValueScanner scanner : scanners) {
-        scanner.close();
-      }
-    }
-  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 4e373a8..24c3ccd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -102,6 +102,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -783,11 +784,12 @@ public class TestHStore {
     }
   }
 
-  private static void flushStore(HStore store, long id) throws IOException {
+  private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
     StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
     storeFlushCtx.prepare();
     storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
     storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+    return storeFlushCtx;
   }
 
   /**
@@ -2222,7 +2224,7 @@ public class TestHStore {
       flushThread.join();
 
       if (myDefaultMemStore.shouldWait) {
-        SegmentScanner segmentScanner = getSegmentScanner(storeScanner);
+        SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
         MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
         assertTrue(memStoreLAB.isClosed());
         assertTrue(!memStoreLAB.chunks.isEmpty());
@@ -2249,16 +2251,16 @@ public class TestHStore {
     }
   }
 
-  private SegmentScanner getSegmentScanner(StoreScanner storeScanner) {
-    List<SegmentScanner> segmentScanners = new ArrayList<SegmentScanner>();
+  @SuppressWarnings("unchecked")
+  private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
+    List<T> resultScanners = new ArrayList<T>();
     for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
-      if (keyValueScanner instanceof SegmentScanner) {
-        segmentScanners.add((SegmentScanner) keyValueScanner);
+      if (keyValueScannerClass.isInstance(keyValueScanner)) {
+        resultScanners.add((T) keyValueScanner);
       }
     }
-
-    assertTrue(segmentScanners.size() == 1);
-    return segmentScanners.get(0);
+    assertTrue(resultScanners.size() == 1);
+    return resultScanners.get(0);
   }
 
   @Test 
@@ -2310,6 +2312,116 @@ public class TestHStore {
 
   }
 
+  /**
+   * This test is for HBASE-26488
+   */
+  @Test
+  public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {
+
+    Configuration conf = HBaseConfiguration.create();
+
+    byte[] smallValue = new byte[3];
+    byte[] largeValue = new byte[9];
+    final long timestamp = EnvironmentEdgeManager.currentTime();
+    final long seqId = 100;
+    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
+    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
+    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    quals.add(qf1);
+    quals.add(qf2);
+
+    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
+    conf.setBoolean(WALFactory.WAL_ENABLED, false);
+    conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
+      MyDefaultStoreFlusher.class.getName());
+
+    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
+    MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
+    assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);
+
+    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
+    store.add(smallCell, memStoreSizing);
+    store.add(largeCell, memStoreSizing);
+    flushStore(store, id++);
+
+    MemStoreLABImpl memStoreLAB =
+        (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
+    assertTrue(memStoreLAB.isClosed());
+    assertTrue(memStoreLAB.getOpenScannerCount() == 0);
+    assertTrue(memStoreLAB.isReclaimed());
+    assertTrue(memStoreLAB.chunks.isEmpty());
+    StoreScanner storeScanner = null;
+    try {
+      storeScanner =
+          (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
+      assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
+      assertTrue(store.memstore.size().getCellsCount() == 0);
+      assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
+      assertTrue(storeScanner.currentScanners.size() == 1);
+      assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);
+
+      List<Cell> results = new ArrayList<>();
+      storeScanner.next(results);
+      assertEquals(2, results.size());
+      CellUtil.equals(smallCell, results.get(0));
+      CellUtil.equals(largeCell, results.get(1));
+    } finally {
+      if (storeScanner != null) {
+        storeScanner.close();
+      }
+    }
+  }
+
+
+  static class MyDefaultMemStore1 extends DefaultMemStore {
+
+    private ImmutableSegment snapshotImmutableSegment;
+
+    public MyDefaultMemStore1(Configuration conf, CellComparator c,
+        RegionServicesForStores regionServices) {
+      super(conf, c, regionServices);
+    }
+
+    @Override
+    public MemStoreSnapshot snapshot() {
+      MemStoreSnapshot result = super.snapshot();
+      this.snapshotImmutableSegment = snapshot;
+      return result;
+    }
+
+  }
+
+  public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
+    private static final AtomicInteger failCounter = new AtomicInteger(1);
+    private static final AtomicInteger counter = new AtomicInteger(0);
+
+    public MyDefaultStoreFlusher(Configuration conf, HStore store) {
+      super(conf, store);
+    }
+
+    @Override
+    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
+        MonitoredTask status, ThroughputController throughputController,
+        FlushLifeCycleTracker tracker) throws IOException {
+      counter.incrementAndGet();
+      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
+    }
+
+    @Override
+    protected void performFlush(InternalScanner scanner, final CellSink sink,
+        ThroughputController throughputController) throws IOException {
+
+      final int currentCount = counter.get();
+      CellSink newCellSink = (cell) -> {
+        if (currentCount <= failCounter.get()) {
+          throw new IOException("Simulated exception by tests");
+        }
+        sink.append(cell);
+      };
+      super.performFlush(scanner, newCellSink, throughputController);
+    }
+  }
+
   private HStoreFile mockStoreFileWithLength(long length) {
     HStoreFile sf = mock(HStoreFile.class);
     StoreFileReader sfr = mock(StoreFileReader.class);
@@ -3093,7 +3205,5 @@ public class TestHStore {
         }
       }
     }
-
-
   }
 }