You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2018/12/06 02:57:08 UTC

hbase git commit: HBASE-21551 Memory leak when use scan with STREAM at server side

Repository: hbase
Updated Branches:
  refs/heads/master f49baf259 -> 3b854859f


HBASE-21551 Memory leak when use scan with STREAM at server side


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3b854859
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3b854859
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3b854859

Branch: refs/heads/master
Commit: 3b854859f6fad44cbf31164374569a6ab23f3623
Parents: f49baf2
Author: huzheng <op...@gmail.com>
Authored: Wed Dec 5 22:57:49 2018 +0800
Committer: huzheng <op...@gmail.com>
Committed: Thu Dec 6 10:55:42 2018 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HStoreFile.java   |  3 +-
 .../hbase/regionserver/StoreFileReader.java     |  3 ++
 .../regionserver/TestSwitchToStreamRead.java    | 50 ++++++++++++++++++++
 3 files changed, 55 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3b854859/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
index 4aff949..9c94990 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
@@ -126,7 +126,8 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener {
   private final AtomicInteger refCount = new AtomicInteger(0);
 
   // Set implementation must be of concurrent type
-  private final Set<StoreFileReader> streamReaders;
+  @VisibleForTesting
+  final Set<StoreFileReader> streamReaders;
 
   private final boolean noReadahead;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3b854859/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
index 3fbddf2..d9008b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
@@ -186,6 +186,9 @@ public class StoreFileReader {
     if (!shared) {
       try {
         reader.close(false);
+        if (this.listener != null) {
+          this.listener.storeFileReaderClosed(this);
+        }
       } catch (IOException e) {
         LOG.warn("failed to close stream reader", e);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3b854859/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
index 815643d..037b13e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
@@ -23,8 +23,13 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -33,6 +38,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
@@ -42,6 +48,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Ignore;
@@ -98,6 +105,49 @@ public class TestSwitchToStreamRead {
     UTIL.cleanupTestDir();
   }
 
+  private Set<StoreFileReader> getStreamReaders() {
+    List<HStore> stores = REGION.getStores();
+    Assert.assertEquals(1, stores.size());
+    HStore firstStore = stores.get(0);
+    Assert.assertNotNull(firstStore);
+    Collection<HStoreFile> storeFiles = firstStore.getStorefiles();
+    Assert.assertEquals(1, storeFiles.size());
+    HStoreFile firstSToreFile = storeFiles.iterator().next();
+    Assert.assertNotNull(firstSToreFile);
+    return Collections.unmodifiableSet(firstSToreFile.streamReaders);
+  }
+
+  /**
+   * Test Case for HBASE-21551
+   */
+  @Test
+  public void testStreamReadersCleanup() throws IOException {
+    Set<StoreFileReader> streamReaders = getStreamReaders();
+    Assert.assertEquals(0, getStreamReaders().size());
+    try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM))) {
+      StoreScanner storeScanner =
+          (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
+      List<StoreFileScanner> sfScanners = storeScanner.getAllScannersForTesting().stream()
+          .filter(kvs -> kvs instanceof StoreFileScanner).map(kvs -> (StoreFileScanner) kvs)
+          .collect(Collectors.toList());
+      Assert.assertEquals(1, sfScanners.size());
+      StoreFileScanner sfScanner = sfScanners.get(0);
+      Assert.assertFalse(sfScanner.getReader().shared);
+
+      // There should be a stream reader
+      Assert.assertEquals(1, getStreamReaders().size());
+    }
+    Assert.assertEquals(0, getStreamReaders().size());
+
+    // The streamsReader should be clear after region close even if there're some opened stream
+    // scanner.
+    RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM));
+    Assert.assertNotNull(scanner);
+    Assert.assertEquals(1, getStreamReaders().size());
+    REGION.close();
+    Assert.assertEquals(0, streamReaders.size());
+  }
+
   @Test
   public void test() throws IOException {
     try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {