You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/12/18 04:28:03 UTC

[hbase] branch branch-2.5 updated (5d56c80 -> 482ef69)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a change to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git.


    from 5d56c80  HBASE-26601 maven-gpg-plugin failing with "Inappropriate ioctl for device"
     new d072a2e  HBASE-26340 TableSplit returns false size under 1MB (#3872)
     new 482ef69  HBASE-26488 Memory leak when MemStore retry flushing (#3899)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hadoop/hbase/regionserver/HRegionServer.java   |   6 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   |   5 -
 .../hbase/regionserver/ImmutableSegment.java       |   4 +
 .../hbase/regionserver/MemStoreSnapshot.java       |  42 ++++---
 .../hadoop/hbase/regionserver/TestHStore.java      | 132 +++++++++++++++++++--
 5 files changed, 153 insertions(+), 36 deletions(-)

[hbase] 01/02: HBASE-26340 TableSplit returns false size under 1MB (#3872)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit d072a2e694127476e19192bfe470902d0e418175
Author: Norbert Kalmar <nk...@apache.org>
AuthorDate: Sat Dec 18 04:58:16 2021 +0100

    HBASE-26340 TableSplit returns false size under 1MB (#3872)
    
    Signed-off-by: Peter Somogyi <psomogyi@apache.org
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../java/org/apache/hadoop/hbase/regionserver/HRegionServer.java    | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 41d13b7..047345c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1748,6 +1748,7 @@ public class HRegionServer extends Thread implements
     int maxCompactedStoreFileRefCount = 0;
     int storeUncompressedSizeMB = 0;
     int storefileSizeMB = 0;
+    long storefileSizeByte = 0L;
     int memstoreSizeMB = (int) (r.getMemStoreDataSize() / 1024 / 1024);
     long storefileIndexSizeKB = 0;
     int rootLevelIndexSizeKB = 0;
@@ -1765,7 +1766,7 @@ public class HRegionServer extends Thread implements
       maxCompactedStoreFileRefCount = Math.max(maxCompactedStoreFileRefCount,
         currentMaxCompactedStoreFileRefCount);
       storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
-      storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
+      storefileSizeByte += store.getStorefilesSize();
       //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
       storefileIndexSizeKB += store.getStorefilesRootLevelIndexSize() / 1024;
       CompactionProgress progress = store.getCompactionProgress();
@@ -1777,6 +1778,9 @@ public class HRegionServer extends Thread implements
       totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
       totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
     }
+    //HBASE-26340 Fix false "0" size under 1MB
+    storefileSizeMB = storefileSizeByte > 0 && storefileSizeByte <= 1024 * 1024
+       ? 1 : (int) storefileSizeByte / 1024 / 1024;
 
     HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
     float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());

[hbase] 02/02: HBASE-26488 Memory leak when MemStore retry flushing (#3899)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 482ef6904e0b4076b17d0694f96e8350259b523f
Author: chenglei <ch...@apache.org>
AuthorDate: Sat Dec 18 12:17:19 2021 +0800

    HBASE-26488 Memory leak when MemStore retry flushing (#3899)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../apache/hadoop/hbase/regionserver/HStore.java   |   5 -
 .../hbase/regionserver/ImmutableSegment.java       |   4 +
 .../hbase/regionserver/MemStoreSnapshot.java       |  42 ++++---
 .../hadoop/hbase/regionserver/TestHStore.java      | 132 +++++++++++++++++++--
 4 files changed, 148 insertions(+), 35 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 8937d34..0213827 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2404,7 +2404,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
       long snapshotId = -1; // -1 means do not drop
       if (dropMemstoreSnapshot && snapshot != null) {
         snapshotId = snapshot.getId();
-        snapshot.close();
       }
       HStore.this.updateStorefiles(storeFiles, snapshotId);
     }
@@ -2415,10 +2414,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     @Override
     public void abort() throws IOException {
       if (snapshot != null) {
-        //We need to close the snapshot when aborting, otherwise, the segment scanner
-        //won't be closed. If we are using MSLAB, the chunk referenced by those scanners
-        //can't be released, thus memory leak
-        snapshot.close();
         HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId());
       }
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index 8c426bc..972684f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -85,6 +85,10 @@ public abstract class ImmutableSegment extends Segment {
     return res;
   }
 
+  /**
+   * We create a new {@link SnapshotSegmentScanner} to increase the reference count of
+   * {@link MemStoreLABImpl} used by this segment.
+   */
   List<KeyValueScanner> getSnapshotScanners() {
     return Collections.singletonList(new SnapshotSegmentScanner(this));
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
index 3b34828..07eb64d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
@@ -17,31 +17,38 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.List;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import java.io.Closeable;
-import java.util.List;
+
 /**
- * Holds details of the snapshot taken on a MemStore. Details include the snapshot's identifier,
- * count of cells in it and total memory size occupied by all the cells, timestamp information of
- * all the cells and a scanner to read all cells in it.
+ * {@link MemStoreSnapshot} is a Context Object to hold details of the snapshot taken on a MemStore.
+ * Details include the snapshot's identifier, count of cells in it and total memory size occupied by
+ * all the cells, timestamp information of all the cells and the snapshot immutableSegment.
+ * <p>
+ * NOTE:Every time when {@link MemStoreSnapshot#getScanners} is called, we create new
+ * {@link SnapshotSegmentScanner}s on the {@link MemStoreSnapshot#snapshotImmutableSegment},and
+ * {@link Segment#incScannerCount} is invoked in the {@link SnapshotSegmentScanner} ctor to increase
+ * the reference count of {@link MemStoreLAB} which used by
+ * {@link MemStoreSnapshot#snapshotImmutableSegment}, so after we finish using these scanners, we
+ * must call their close method to invoke {@link Segment#decScannerCount}.
  */
 @InterfaceAudience.Private
-public class MemStoreSnapshot implements Closeable {
+public class MemStoreSnapshot {
   private final long id;
   private final int cellsCount;
   private final MemStoreSize memStoreSize;
   private final TimeRangeTracker timeRangeTracker;
-  private final List<KeyValueScanner> scanners;
   private final boolean tagsPresent;
+  private final ImmutableSegment snapshotImmutableSegment;
 
   public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
     this.id = id;
     this.cellsCount = snapshot.getCellsCount();
     this.memStoreSize = snapshot.getMemStoreSize();
     this.timeRangeTracker = snapshot.getTimeRangeTracker();
-    this.scanners = snapshot.getSnapshotScanners();
     this.tagsPresent = snapshot.isTagsPresent();
+    this.snapshotImmutableSegment = snapshot;
   }
 
   /**
@@ -74,10 +81,16 @@ public class MemStoreSnapshot implements Closeable {
   }
 
   /**
-   * @return {@link KeyValueScanner} for iterating over the snapshot
+   * Create new {@link SnapshotSegmentScanner}s for iterating over the snapshot. <br/>
+   * NOTE:Here when create new {@link SnapshotSegmentScanner}s, {@link Segment#incScannerCount} is
+   * invoked in the {@link SnapshotSegmentScanner} ctor,so after we use these
+   * {@link SnapshotSegmentScanner}s, we must call {@link SnapshotSegmentScanner#close} to invoke
+   * {@link Segment#decScannerCount}.
+   * @return {@link KeyValueScanner}s(Which type is {@link SnapshotSegmentScanner}) for iterating
+   *         over the snapshot.
    */
   public List<KeyValueScanner> getScanners() {
-    return scanners;
+    return snapshotImmutableSegment.getSnapshotScanners();
   }
 
   /**
@@ -86,13 +99,4 @@ public class MemStoreSnapshot implements Closeable {
   public boolean isTagsPresent() {
     return this.tagsPresent;
   }
-
-  @Override
-  public void close() {
-    if (this.scanners != null) {
-      for (KeyValueScanner scanner : scanners) {
-        scanner.close();
-      }
-    }
-  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 2f5350f..9009cb2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -103,6 +103,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -784,11 +785,12 @@ public class TestHStore {
     }
   }
 
-  private static void flushStore(HStore store, long id) throws IOException {
+  private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
     StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
     storeFlushCtx.prepare();
     storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
     storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+    return storeFlushCtx;
   }
 
   /**
@@ -2236,7 +2238,7 @@ public class TestHStore {
       flushThread.join();
 
       if (myDefaultMemStore.shouldWait) {
-        SegmentScanner segmentScanner = getSegmentScanner(storeScanner);
+        SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
         MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
         assertTrue(memStoreLAB.isClosed());
         assertTrue(!memStoreLAB.chunks.isEmpty());
@@ -2263,16 +2265,16 @@ public class TestHStore {
     }
   }
 
-  private SegmentScanner getSegmentScanner(StoreScanner storeScanner) {
-    List<SegmentScanner> segmentScanners = new ArrayList<SegmentScanner>();
+  @SuppressWarnings("unchecked")
+  private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
+    List<T> resultScanners = new ArrayList<T>();
     for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
-      if (keyValueScanner instanceof SegmentScanner) {
-        segmentScanners.add((SegmentScanner) keyValueScanner);
+      if (keyValueScannerClass.isInstance(keyValueScanner)) {
+        resultScanners.add((T) keyValueScanner);
       }
     }
-
-    assertTrue(segmentScanners.size() == 1);
-    return segmentScanners.get(0);
+    assertTrue(resultScanners.size() == 1);
+    return resultScanners.get(0);
   }
 
   @Test 
@@ -2324,6 +2326,116 @@ public class TestHStore {
 
   }
 
+  /**
+   * This test is for HBASE-26488
+   */
+  @Test
+  public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {
+
+    Configuration conf = HBaseConfiguration.create();
+
+    byte[] smallValue = new byte[3];
+    byte[] largeValue = new byte[9];
+    final long timestamp = EnvironmentEdgeManager.currentTime();
+    final long seqId = 100;
+    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
+    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
+    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    quals.add(qf1);
+    quals.add(qf2);
+
+    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
+    conf.setBoolean(WALFactory.WAL_ENABLED, false);
+    conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
+      MyDefaultStoreFlusher.class.getName());
+
+    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
+    MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
+    assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);
+
+    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
+    store.add(smallCell, memStoreSizing);
+    store.add(largeCell, memStoreSizing);
+    flushStore(store, id++);
+
+    MemStoreLABImpl memStoreLAB =
+        (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
+    assertTrue(memStoreLAB.isClosed());
+    assertTrue(memStoreLAB.getOpenScannerCount() == 0);
+    assertTrue(memStoreLAB.isReclaimed());
+    assertTrue(memStoreLAB.chunks.isEmpty());
+    StoreScanner storeScanner = null;
+    try {
+      storeScanner =
+          (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
+      assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
+      assertTrue(store.memstore.size().getCellsCount() == 0);
+      assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
+      assertTrue(storeScanner.currentScanners.size() == 1);
+      assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);
+
+      List<Cell> results = new ArrayList<>();
+      storeScanner.next(results);
+      assertEquals(2, results.size());
+      CellUtil.equals(smallCell, results.get(0));
+      CellUtil.equals(largeCell, results.get(1));
+    } finally {
+      if (storeScanner != null) {
+        storeScanner.close();
+      }
+    }
+  }
+
+
+  static class MyDefaultMemStore1 extends DefaultMemStore {
+
+    private ImmutableSegment snapshotImmutableSegment;
+
+    public MyDefaultMemStore1(Configuration conf, CellComparator c,
+        RegionServicesForStores regionServices) {
+      super(conf, c, regionServices);
+    }
+
+    @Override
+    public MemStoreSnapshot snapshot() {
+      MemStoreSnapshot result = super.snapshot();
+      this.snapshotImmutableSegment = snapshot;
+      return result;
+    }
+
+  }
+
+  public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
+    private static final AtomicInteger failCounter = new AtomicInteger(1);
+    private static final AtomicInteger counter = new AtomicInteger(0);
+
+    public MyDefaultStoreFlusher(Configuration conf, HStore store) {
+      super(conf, store);
+    }
+
+    @Override
+    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
+        MonitoredTask status, ThroughputController throughputController,
+        FlushLifeCycleTracker tracker) throws IOException {
+      counter.incrementAndGet();
+      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
+    }
+
+    @Override
+    protected void performFlush(InternalScanner scanner, final CellSink sink,
+        ThroughputController throughputController) throws IOException {
+
+      final int currentCount = counter.get();
+      CellSink newCellSink = (cell) -> {
+        if (currentCount <= failCounter.get()) {
+          throw new IOException("Simulated exception by tests");
+        }
+        sink.append(cell);
+      };
+      super.performFlush(scanner, newCellSink, throughputController);
+    }
+  }
+
   private HStoreFile mockStoreFileWithLength(long length) {
     HStoreFile sf = mock(HStoreFile.class);
     StoreFileReader sfr = mock(StoreFileReader.class);
@@ -3107,7 +3219,5 @@ public class TestHStore {
         }
       }
     }
-
-
   }
 }