You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2018/12/06 02:57:08 UTC
hbase git commit: HBASE-21551 Memory leak when use scan with STREAM
at server side
Repository: hbase
Updated Branches:
refs/heads/master f49baf259 -> 3b854859f
HBASE-21551 Memory leak when use scan with STREAM at server side
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3b854859
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3b854859
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3b854859
Branch: refs/heads/master
Commit: 3b854859f6fad44cbf31164374569a6ab23f3623
Parents: f49baf2
Author: huzheng <op...@gmail.com>
Authored: Wed Dec 5 22:57:49 2018 +0800
Committer: huzheng <op...@gmail.com>
Committed: Thu Dec 6 10:55:42 2018 +0800
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HStoreFile.java | 3 +-
.../hbase/regionserver/StoreFileReader.java | 3 ++
.../regionserver/TestSwitchToStreamRead.java | 50 ++++++++++++++++++++
3 files changed, 55 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/3b854859/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
index 4aff949..9c94990 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
@@ -126,7 +126,8 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener {
private final AtomicInteger refCount = new AtomicInteger(0);
// Set implementation must be of concurrent type
- private final Set<StoreFileReader> streamReaders;
+ @VisibleForTesting
+ final Set<StoreFileReader> streamReaders;
private final boolean noReadahead;
http://git-wip-us.apache.org/repos/asf/hbase/blob/3b854859/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
index 3fbddf2..d9008b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
@@ -186,6 +186,9 @@ public class StoreFileReader {
if (!shared) {
try {
reader.close(false);
+ if (this.listener != null) {
+ this.listener.storeFileReaderClosed(this);
+ }
} catch (IOException e) {
LOG.warn("failed to close stream reader", e);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3b854859/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
index 815643d..037b13e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
@@ -23,8 +23,13 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -33,6 +38,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
@@ -42,6 +48,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
@@ -98,6 +105,49 @@ public class TestSwitchToStreamRead {
UTIL.cleanupTestDir();
}
+ private Set<StoreFileReader> getStreamReaders() {
+ List<HStore> stores = REGION.getStores();
+ Assert.assertEquals(1, stores.size());
+ HStore firstStore = stores.get(0);
+ Assert.assertNotNull(firstStore);
+ Collection<HStoreFile> storeFiles = firstStore.getStorefiles();
+ Assert.assertEquals(1, storeFiles.size());
+ HStoreFile firstSToreFile = storeFiles.iterator().next();
+ Assert.assertNotNull(firstSToreFile);
+ return Collections.unmodifiableSet(firstSToreFile.streamReaders);
+ }
+
+ /**
+ * Test Case for HBASE-21551
+ */
+ @Test
+ public void testStreamReadersCleanup() throws IOException {
+ Set<StoreFileReader> streamReaders = getStreamReaders();
+ Assert.assertEquals(0, getStreamReaders().size());
+ try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM))) {
+ StoreScanner storeScanner =
+ (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
+ List<StoreFileScanner> sfScanners = storeScanner.getAllScannersForTesting().stream()
+ .filter(kvs -> kvs instanceof StoreFileScanner).map(kvs -> (StoreFileScanner) kvs)
+ .collect(Collectors.toList());
+ Assert.assertEquals(1, sfScanners.size());
+ StoreFileScanner sfScanner = sfScanners.get(0);
+ Assert.assertFalse(sfScanner.getReader().shared);
+
+ // There should be a stream reader
+ Assert.assertEquals(1, getStreamReaders().size());
+ }
+ Assert.assertEquals(0, getStreamReaders().size());
+
+ // The streamsReader should be clear after region close even if there're some opened stream
+ // scanner.
+ RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM));
+ Assert.assertNotNull(scanner);
+ Assert.assertEquals(1, getStreamReaders().size());
+ REGION.close();
+ Assert.assertEquals(0, streamReaders.size());
+ }
+
@Test
public void test() throws IOException {
try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {