You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2016/07/13 02:06:37 UTC
hbase git commit: HBASE-16195 Should not add chunk into chunkQueue if
not using chunk pool in HeapMemStoreLAB
Repository: hbase
Updated Branches:
refs/heads/master 911706a87 -> 3b3c3dc02
HBASE-16195 Should not add chunk into chunkQueue if not using chunk pool in HeapMemStoreLAB
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3b3c3dc0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3b3c3dc0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3b3c3dc0
Branch: refs/heads/master
Commit: 3b3c3dc02deaeb3c2c8ca52dd204edbf87da502f
Parents: 911706a
Author: Yu Li <li...@apache.org>
Authored: Wed Jul 13 09:33:24 2016 +0800
Committer: Yu Li <li...@apache.org>
Committed: Wed Jul 13 09:33:24 2016 +0800
----------------------------------------------------------------------
.../hbase/regionserver/HeapMemStoreLAB.java | 28 +++++++-
.../hbase/regionserver/MemStoreChunkPool.java | 17 +++++
.../hbase/regionserver/TestMemStoreLAB.java | 76 +++++++++++++++++++-
3 files changed, 117 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/3b3c3dc0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
index 625811a..d8fa5c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ByteRange;
@@ -62,9 +64,11 @@ public class HeapMemStoreLAB implements MemStoreLAB {
static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through
// allocator
+ static final Log LOG = LogFactory.getLog(HeapMemStoreLAB.class);
+
private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
- // A queue of chunks contained by this memstore
- private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
+ // A queue of chunks contained by this memstore, used with chunk pool
+ private BlockingQueue<Chunk> chunkQueue = null;
final int chunkSize;
final int maxAlloc;
private final MemStoreChunkPool chunkPool;
@@ -87,6 +91,12 @@ public class HeapMemStoreLAB implements MemStoreLAB {
chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
this.chunkPool = MemStoreChunkPool.getPool(conf);
+ // currently chunkQueue is only used for chunkPool
+ if (this.chunkPool != null) {
+ // set queue length to chunk pool max count to avoid keeping reference of
+ // too many non-reclaimable chunks
+ chunkQueue = new LinkedBlockingQueue<Chunk>(chunkPool.getMaxCount());
+ }
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
Preconditions.checkArgument(
@@ -166,6 +176,8 @@ public class HeapMemStoreLAB implements MemStoreLAB {
* Try to retire the current chunk if it is still
* <code>c</code>. Postcondition is that curChunk.get()
* != c
+ * @param c the chunk to retire
+ * @return true if we won the race to retire the chunk
*/
private void tryRetireChunk(Chunk c) {
curChunk.compareAndSet(c, null);
@@ -197,7 +209,12 @@ public class HeapMemStoreLAB implements MemStoreLAB {
// we won race - now we need to actually do the expensive
// allocation step
c.init();
- this.chunkQueue.add(c);
+ if (chunkQueue != null && !this.closed && !this.chunkQueue.offer(c)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
+ + chunkQueue.size());
+ }
+ }
return c;
} else if (chunkPool != null) {
chunkPool.putbackChunk(c);
@@ -212,6 +229,11 @@ public class HeapMemStoreLAB implements MemStoreLAB {
return this.curChunk.get();
}
+ @VisibleForTesting
+ BlockingQueue<Chunk> getChunkQueue() {
+ return this.chunkQueue;
+ }
+
/**
* A chunk of memory out of which allocations are sliced.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/3b3c3dc0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
index 6285060..81b6046 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.HeapMemStoreLAB.Chunk;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -123,6 +124,13 @@ public class MemStoreChunkPool {
return;
}
chunks.drainTo(reclaimedChunks, maxNumToPutback);
+ // clear reference of any non-reclaimable chunks
+ if (chunks.size() > 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Left " + chunks.size() + " unreclaimable chunks, removing them from queue");
+ }
+ chunks.clear();
+ }
}
/**
@@ -217,4 +225,13 @@ public class MemStoreChunkPool {
}
}
+ int getMaxCount() {
+ return this.maxCount;
+ }
+
+ @VisibleForTesting
+ static void clearDisableFlag() {
+ chunkPoolDisabled = false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3b3c3dc0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
index 170bdd4..34caf97 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
@@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -37,6 +39,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
+
import org.junit.experimental.categories.Category;
@Category({RegionServerTests.class, SmallTests.class})
@@ -149,7 +152,78 @@ public class TestMemStoreLAB {
}
}
-
+
+ /**
+ * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure
+ * there's no memory leak (HBASE-16195)
+ * @throws Exception if any error occurred
+ */
+ @Test
+ public void testLABChunkQueue() throws Exception {
+ HeapMemStoreLAB mslab = new HeapMemStoreLAB();
+ // by default setting, there should be no chunk queue initialized
+ assertNull(mslab.getChunkQueue());
+ // reset mslab with chunk pool
+ Configuration conf = HBaseConfiguration.create();
+ conf.setDouble(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.1);
+ // set chunk size to default max alloc size, so we could easily trigger chunk retirement
+ conf.setLong(HeapMemStoreLAB.CHUNK_SIZE_KEY, HeapMemStoreLAB.MAX_ALLOC_DEFAULT);
+ // reconstruct mslab
+ MemStoreChunkPool.clearDisableFlag();
+ mslab = new HeapMemStoreLAB(conf);
+ // launch multiple threads to trigger frequent chunk retirement
+ List<Thread> threads = new ArrayList<Thread>();
+ for (int i = 0; i < 10; i++) {
+ threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i));
+ }
+ for (Thread thread : threads) {
+ thread.start();
+ }
+ // let it run for some time
+ Thread.sleep(1000);
+ for (Thread thread : threads) {
+ thread.interrupt();
+ }
+ boolean threadsRunning = true;
+ while (threadsRunning) {
+ for (Thread thread : threads) {
+ if (thread.isAlive()) {
+ threadsRunning = true;
+ break;
+ }
+ }
+ threadsRunning = false;
+ }
+ // close the mslab
+ mslab.close();
+ // make sure all chunks reclaimed or removed from chunk queue
+ int queueLength = mslab.getChunkQueue().size();
+ assertTrue("All chunks in chunk queue should be reclaimed or removed"
+ + " after mslab closed but actually: " + queueLength, queueLength == 0);
+ }
+
+ private Thread getChunkQueueTestThread(final HeapMemStoreLAB mslab, String threadName) {
+ Thread thread = new Thread() {
+ boolean stopped = false;
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ // keep triggering chunk retirement
+ mslab.allocateBytes(HeapMemStoreLAB.MAX_ALLOC_DEFAULT - 1);
+ }
+ }
+
+ @Override
+ public void interrupt() {
+ this.stopped = true;
+ }
+ };
+ thread.setName(threadName);
+ thread.setDaemon(true);
+ return thread;
+ }
+
private static class AllocRecord implements Comparable<AllocRecord>{
private final ByteRange alloc;
private final int size;