You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2016/07/13 02:06:37 UTC

hbase git commit: HBASE-16195 Should not add chunk into chunkQueue if not using chunk pool in HeapMemStoreLAB

Repository: hbase
Updated Branches:
  refs/heads/master 911706a87 -> 3b3c3dc02


HBASE-16195 Should not add chunk into chunkQueue if not using chunk pool in HeapMemStoreLAB


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3b3c3dc0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3b3c3dc0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3b3c3dc0

Branch: refs/heads/master
Commit: 3b3c3dc02deaeb3c2c8ca52dd204edbf87da502f
Parents: 911706a
Author: Yu Li <li...@apache.org>
Authored: Wed Jul 13 09:33:24 2016 +0800
Committer: Yu Li <li...@apache.org>
Committed: Wed Jul 13 09:33:24 2016 +0800

----------------------------------------------------------------------
 .../hbase/regionserver/HeapMemStoreLAB.java     | 28 +++++++-
 .../hbase/regionserver/MemStoreChunkPool.java   | 17 +++++
 .../hbase/regionserver/TestMemStoreLAB.java     | 76 +++++++++++++++++++-
 3 files changed, 117 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3b3c3dc0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
index 625811a..d8fa5c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.ByteRange;
@@ -62,9 +64,11 @@ public class HeapMemStoreLAB implements MemStoreLAB {
   static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through
                                                    // allocator
 
+  static final Log LOG = LogFactory.getLog(HeapMemStoreLAB.class);
+
   private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
-  // A queue of chunks contained by this memstore
-  private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
+  // A queue of chunks contained by this memstore, used with chunk pool
+  private BlockingQueue<Chunk> chunkQueue = null;
   final int chunkSize;
   final int maxAlloc;
   private final MemStoreChunkPool chunkPool;
@@ -87,6 +91,12 @@ public class HeapMemStoreLAB implements MemStoreLAB {
     chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
     maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
     this.chunkPool = MemStoreChunkPool.getPool(conf);
+    // currently chunkQueue is only used for chunkPool
+    if (this.chunkPool != null) {
+      // set queue length to chunk pool max count to avoid keeping reference of
+      // too many non-reclaimable chunks
+      chunkQueue = new LinkedBlockingQueue<Chunk>(chunkPool.getMaxCount());
+    }
 
     // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
     Preconditions.checkArgument(
@@ -166,6 +176,8 @@ public class HeapMemStoreLAB implements MemStoreLAB {
    * Try to retire the current chunk if it is still
    * <code>c</code>. Postcondition is that curChunk.get()
    * != c
+   * @param c the chunk to retire
+   * @return true if we won the race to retire the chunk
    */
   private void tryRetireChunk(Chunk c) {
     curChunk.compareAndSet(c, null);
@@ -197,7 +209,12 @@ public class HeapMemStoreLAB implements MemStoreLAB {
         // we won race - now we need to actually do the expensive
         // allocation step
         c.init();
-        this.chunkQueue.add(c);
+        if (chunkQueue != null && !this.closed && !this.chunkQueue.offer(c)) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
+                + chunkQueue.size());
+          }
+        }
         return c;
       } else if (chunkPool != null) {
         chunkPool.putbackChunk(c);
@@ -212,6 +229,11 @@ public class HeapMemStoreLAB implements MemStoreLAB {
     return this.curChunk.get();
   }
 
+  @VisibleForTesting
+  BlockingQueue<Chunk> getChunkQueue() {
+    return this.chunkQueue;
+  }
+
   /**
    * A chunk of memory out of which allocations are sliced.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/3b3c3dc0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
index 6285060..81b6046 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
 import org.apache.hadoop.hbase.regionserver.HeapMemStoreLAB.Chunk;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -123,6 +124,13 @@ public class MemStoreChunkPool {
       return;
     }
     chunks.drainTo(reclaimedChunks, maxNumToPutback);
+    // clear reference of any non-reclaimable chunks
+    if (chunks.size() > 0) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Left " + chunks.size() + " unreclaimable chunks, removing them from queue");
+      }
+      chunks.clear();
+    }
   }
 
   /**
@@ -217,4 +225,13 @@ public class MemStoreChunkPool {
     }
   }
 
+  int getMaxCount() {
+    return this.maxCount;
+  }
+
+  @VisibleForTesting
+  static void clearDisableFlag() {
+    chunkPoolDisabled = false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3b3c3dc0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
index 170bdd4..34caf97 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
@@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.*;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.MultithreadedTestUtil;
 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -37,6 +39,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.primitives.Ints;
+
 import org.junit.experimental.categories.Category;
 
 @Category({RegionServerTests.class, SmallTests.class})
@@ -149,7 +152,78 @@ public class TestMemStoreLAB {
     }
 
   }
-  
+
+  /**
+   * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure
+   * there's no memory leak (HBASE-16195)
+   * @throws Exception if any error occurred
+   */
+  @Test
+  public void testLABChunkQueue() throws Exception {
+    HeapMemStoreLAB mslab = new HeapMemStoreLAB();
+    // by default setting, there should be no chunk queue initialized
+    assertNull(mslab.getChunkQueue());
+    // reset mslab with chunk pool
+    Configuration conf = HBaseConfiguration.create();
+    conf.setDouble(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.1);
+    // set chunk size to default max alloc size, so we could easily trigger chunk retirement
+    conf.setLong(HeapMemStoreLAB.CHUNK_SIZE_KEY, HeapMemStoreLAB.MAX_ALLOC_DEFAULT);
+    // reconstruct mslab
+    MemStoreChunkPool.clearDisableFlag();
+    mslab = new HeapMemStoreLAB(conf);
+    // launch multiple threads to trigger frequent chunk retirement
+    List<Thread> threads = new ArrayList<Thread>();
+    for (int i = 0; i < 10; i++) {
+      threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i));
+    }
+    for (Thread thread : threads) {
+      thread.start();
+    }
+    // let it run for some time
+    Thread.sleep(1000);
+    for (Thread thread : threads) {
+      thread.interrupt();
+    }
+    boolean threadsRunning = true;
+    while (threadsRunning) {
+      for (Thread thread : threads) {
+        if (thread.isAlive()) {
+          threadsRunning = true;
+          break;
+        }
+      }
+      threadsRunning = false;
+    }
+    // close the mslab
+    mslab.close();
+    // make sure all chunks reclaimed or removed from chunk queue
+    int queueLength = mslab.getChunkQueue().size();
+    assertTrue("All chunks in chunk queue should be reclaimed or removed"
+        + " after mslab closed but actually: " + queueLength, queueLength == 0);
+  }
+
+  private Thread getChunkQueueTestThread(final HeapMemStoreLAB mslab, String threadName) {
+    Thread thread = new Thread() {
+      boolean stopped = false;
+
+      @Override
+      public void run() {
+        while (!stopped) {
+          // keep triggering chunk retirement
+          mslab.allocateBytes(HeapMemStoreLAB.MAX_ALLOC_DEFAULT - 1);
+        }
+      }
+
+      @Override
+      public void interrupt() {
+        this.stopped = true;
+      }
+    };
+    thread.setName(threadName);
+    thread.setDaemon(true);
+    return thread;
+  }
+
   private static class AllocRecord implements Comparable<AllocRecord>{
     private final ByteRange alloc;
     private final int size;