You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2016/11/24 15:18:04 UTC

[2/4] hbase git commit: HBASE-15786 Create DBB backed MSLAB pool.

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java
new file mode 100644
index 0000000..e7520a2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.util;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryType;
+import java.lang.management.MemoryUsage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Util class to calculate memory size for memstore, block cache(L1, L2) of RS.
+ */
+@InterfaceAudience.Private
+public class MemorySizeUtil {
+
+  public static final String MEMSTORE_SIZE_KEY = "hbase.regionserver.global.memstore.size";
+  public static final String MEMSTORE_SIZE_OLD_KEY =
+      "hbase.regionserver.global.memstore.upperLimit";
+  public static final String MEMSTORE_SIZE_LOWER_LIMIT_KEY =
+      "hbase.regionserver.global.memstore.size.lower.limit";
+  public static final String MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY =
+      "hbase.regionserver.global.memstore.lowerLimit";
+  // Max global off heap memory that can be used for all memstores
+  // This should be an absolute value in MBs and not percent.
+  public static final String OFFHEAP_MEMSTORE_SIZE_KEY =
+      "hbase.regionserver.offheap.global.memstore.size";
+
+  public static final float DEFAULT_MEMSTORE_SIZE = 0.4f;
+  // Default lower water mark limit is 95% size of memstore size.
+  public static final float DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT = 0.95f;
+
+  private static final Log LOG = LogFactory.getLog(MemorySizeUtil.class);
+  // a constant to convert a fraction to a percentage
+  private static final int CONVERT_TO_PERCENTAGE = 100;
+
+  /**
+   * Checks whether we have enough heap memory left out after portion for Memstore and Block cache.
+   * We need atleast 20% of heap left out for other RS functions.
+   * @param conf
+   */
+  public static void checkForClusterFreeHeapMemoryLimit(Configuration conf) {
+    if (conf.get(MEMSTORE_SIZE_OLD_KEY) != null) {
+      LOG.warn(MEMSTORE_SIZE_OLD_KEY + " is deprecated by " + MEMSTORE_SIZE_KEY);
+    }
+    float globalMemstoreSize = getGlobalMemStoreHeapPercent(conf, false);
+    int gml = (int)(globalMemstoreSize * CONVERT_TO_PERCENTAGE);
+    float blockCacheUpperLimit = getBlockCacheHeapPercent(conf);
+    int bcul = (int)(blockCacheUpperLimit * CONVERT_TO_PERCENTAGE);
+    if (CONVERT_TO_PERCENTAGE - (gml + bcul)
+            < (int)(CONVERT_TO_PERCENTAGE *
+                    HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD)) {
+      throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
+          + "the threshold required for successful cluster operation. "
+          + "The combined value cannot exceed 0.8. Please check "
+          + "the settings for hbase.regionserver.global.memstore.size and "
+          + "hfile.block.cache.size in your configuration. "
+          + "hbase.regionserver.global.memstore.size is " + globalMemstoreSize
+          + " hfile.block.cache.size is " + blockCacheUpperLimit);
+    }
+  }
+
+  /**
+   * Retrieve global memstore configured size as percentage of total heap.
+   * @param c
+   * @param logInvalid
+   */
+  public static float getGlobalMemStoreHeapPercent(final Configuration c,
+      final boolean logInvalid) {
+    float limit = c.getFloat(MEMSTORE_SIZE_KEY,
+        c.getFloat(MEMSTORE_SIZE_OLD_KEY, DEFAULT_MEMSTORE_SIZE));
+    if (limit > 0.8f || limit <= 0.0f) {
+      if (logInvalid) {
+        LOG.warn("Setting global memstore limit to default of " + DEFAULT_MEMSTORE_SIZE
+            + " because supplied value outside allowed range of (0 -> 0.8]");
+      }
+      limit = DEFAULT_MEMSTORE_SIZE;
+    }
+    return limit;
+  }
+
+  /**
+   * Retrieve configured size for global memstore lower water mark as fraction of global memstore
+   * size.
+   */
+  public static float getGlobalMemStoreHeapLowerMark(final Configuration conf,
+      boolean honorOldConfig) {
+    String lowMarkPercentStr = conf.get(MEMSTORE_SIZE_LOWER_LIMIT_KEY);
+    if (lowMarkPercentStr != null) {
+      float lowMarkPercent = Float.parseFloat(lowMarkPercentStr);
+      if (lowMarkPercent > 1.0f) {
+        LOG.error("Bad configuration value for " + MEMSTORE_SIZE_LOWER_LIMIT_KEY + ": "
+            + lowMarkPercent + ". Using 1.0f instead.");
+        lowMarkPercent = 1.0f;
+      }
+      return lowMarkPercent;
+    }
+    if (!honorOldConfig) return DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT;
+    String lowerWaterMarkOldValStr = conf.get(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY);
+    if (lowerWaterMarkOldValStr != null) {
+      LOG.warn(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " is deprecated. Instead use "
+          + MEMSTORE_SIZE_LOWER_LIMIT_KEY);
+      float lowerWaterMarkOldVal = Float.parseFloat(lowerWaterMarkOldValStr);
+      float upperMarkPercent = getGlobalMemStoreHeapPercent(conf, false);
+      if (lowerWaterMarkOldVal > upperMarkPercent) {
+        lowerWaterMarkOldVal = upperMarkPercent;
+        LOG.error("Value of " + MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " (" + lowerWaterMarkOldVal
+            + ") is greater than global memstore limit (" + upperMarkPercent + ") set by "
+            + MEMSTORE_SIZE_KEY + "/" + MEMSTORE_SIZE_OLD_KEY + ". Setting memstore lower limit "
+            + "to " + upperMarkPercent);
+      }
+      return lowerWaterMarkOldVal / upperMarkPercent;
+    }
+    return DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT;
+  }
+
+  /**
+   * @return Pair of global memstore size and memory type(ie. on heap or off heap).
+   */
+  public static Pair<Long, MemoryType> getGlobalMemstoreSize(Configuration conf) {
+    long offheapMSGlobal = conf.getLong(OFFHEAP_MEMSTORE_SIZE_KEY, 0);// Size in MBs
+    if (offheapMSGlobal > 0) {
+      // Off heap memstore size has not relevance when MSLAB is turned OFF. We will go with making
+      // this entire size split into Chunks and pooling them in MemstoreLABPoool. We dont want to
+      // create so many on demand off heap chunks. In fact when this off heap size is configured, we
+      // will go with 100% of this size as the pool size
+      if (MemStoreLAB.isEnabled(conf)) {
+        // We are in offheap Memstore use
+        long globalMemStoreLimit = (long) (offheapMSGlobal * 1024 * 1024); // Size in bytes
+        return new Pair<Long, MemoryType>(globalMemStoreLimit, MemoryType.NON_HEAP);
+      } else {
+        // Off heap max memstore size is configured with turning off MSLAB. It makes no sense. Do a
+        // warn log and go with on heap memstore percentage. By default it will be 40% of Xmx
+        LOG.warn("There is no relevance of configuring '" + OFFHEAP_MEMSTORE_SIZE_KEY + "' when '"
+            + MemStoreLAB.USEMSLAB_KEY + "' is turned off."
+            + " Going with on heap global memstore size ('" + MEMSTORE_SIZE_KEY + "')");
+      }
+    }
+    long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
+    float globalMemStorePercent = getGlobalMemStoreHeapPercent(conf, true);
+    return new Pair<Long, MemoryType>((long) (max * globalMemStorePercent), MemoryType.HEAP);
+  }
+
+  /**
+   * Retrieve configured size for on heap block cache as percentage of total heap.
+   * @param conf
+   */
+  public static float getBlockCacheHeapPercent(final Configuration conf) {
+    // L1 block cache is always on heap
+    float l1CachePercent = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY,
+        HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
+    float l2CachePercent = getL2BlockCacheHeapPercent(conf);
+    return l1CachePercent + l2CachePercent;
+  }
+
+  /**
+   * @param conf
+   * @return The on heap size for L2 block cache.
+   */
+  public static float getL2BlockCacheHeapPercent(Configuration conf) {
+    float l2CachePercent = 0.0F;
+    String bucketCacheIOEngineName = conf.get(HConstants.BUCKET_CACHE_IOENGINE_KEY, null);
+    // L2 block cache can be on heap when IOEngine is "heap"
+    if (bucketCacheIOEngineName != null && bucketCacheIOEngineName.startsWith("heap")) {
+      float bucketCachePercentage = conf.getFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0F);
+      MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+      l2CachePercent = bucketCachePercentage < 1 ? bucketCachePercentage
+          : (bucketCachePercentage * 1024 * 1024) / mu.getMax();
+    }
+    return l2CachePercent;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
index d968ed9..2cbf0a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
@@ -17,34 +17,34 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 /**
  * A chunk of memory out of which allocations are sliced.
  */
 @InterfaceAudience.Private
-public class Chunk {
+public abstract class Chunk {
   /** Actual underlying data */
-  private byte[] data;
+  protected ByteBuffer data;
 
-  private static final int UNINITIALIZED = -1;
-  private static final int OOM = -2;
+  protected static final int UNINITIALIZED = -1;
+  protected static final int OOM = -2;
   /**
    * Offset for the next allocation, or the sentinel value -1 which implies that the chunk is still
    * uninitialized.
    */
-  private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
+  protected AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
 
   /** Total number of allocations satisfied from this buffer */
-  private AtomicInteger allocCount = new AtomicInteger();
+  protected AtomicInteger allocCount = new AtomicInteger();
 
   /** Size of chunk in bytes */
-  private final int size;
+  protected final int size;
 
   /**
    * Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap.
@@ -60,23 +60,7 @@ public class Chunk {
    * constructed the chunk. It is thread-safe against other threads calling alloc(), who will block
    * until the allocation is complete.
    */
-  public void init() {
-    assert nextFreeOffset.get() == UNINITIALIZED;
-    try {
-      if (data == null) {
-        data = new byte[size];
-      }
-    } catch (OutOfMemoryError e) {
-      boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
-      assert failInit; // should be true.
-      throw e;
-    }
-    // Mark that it's ready for use
-    boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
-    // We should always succeed the above CAS since only one thread
-    // calls init()!
-    Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
-  }
+  public abstract void init();
 
   /**
    * Reset the offset to UNINITIALIZED before before reusing an old chunk
@@ -109,7 +93,7 @@ public class Chunk {
         return -1;
       }
 
-      if (oldOffset + size > data.length) {
+      if (oldOffset + size > data.capacity()) {
         return -1; // alloc doesn't fit
       }
 
@@ -126,14 +110,14 @@ public class Chunk {
   /**
    * @return This chunk's backing data.
    */
-  byte[] getData() {
+  ByteBuffer getData() {
     return this.data;
   }
 
   @Override
   public String toString() {
     return "Chunk@" + System.identityHashCode(this) + " allocs=" + allocCount.get() + "waste="
-        + (data.length - nextFreeOffset.get());
+        + (data.capacity() - nextFreeOffset.get());
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java
index 1d237d0..1c7dfe2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java
@@ -29,7 +29,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult;
 import org.apache.hadoop.hbase.util.RollingStatCalculator;
@@ -109,6 +109,9 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
   private float globalMemStorePercentMaxRange;
   private float blockCachePercentMinRange;
   private float blockCachePercentMaxRange;
+
+  private float globalMemStoreLimitLowMarkPercent;
+
   // Store statistics about the corresponding parameters for memory tuning
   private RollingStatCalculator rollingStatsForCacheMisses;
   private RollingStatCalculator rollingStatsForFlushes;
@@ -165,11 +168,9 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
       newTuneDirection = StepDirection.NEUTRAL;
     }
     // Increase / decrease the memstore / block cahce sizes depending on new tuner step.
-    float globalMemstoreLowerMark = HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf,
-        curMemstoreSize);
     // We don't want to exert immediate pressure on memstore. So, we decrease its size gracefully;
     // we set a minimum bar in the middle of the total memstore size and the lower limit.
-    float minMemstoreSize = ((globalMemstoreLowerMark + 1) * curMemstoreSize) / 2.00f;
+    float minMemstoreSize = ((globalMemStoreLimitLowMarkPercent + 1) * curMemstoreSize) / 2.00f;
 
     switch (newTuneDirection) {
     case INCREASE_BLOCK_CACHE_SIZE:
@@ -365,9 +366,11 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
     this.blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY,
         conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT));
     this.globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY,
-        HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false));
+        MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
     this.globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY,
-        HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false));
+        MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
+    this.globalMemStoreLimitLowMarkPercent = MemorySizeUtil.getGlobalMemStoreHeapLowerMark(conf,
+        true);
     // Default value of periods to ignore is number of lookup periods
     this.numPeriodsToIgnore = conf.getInt(NUM_PERIODS_TO_IGNORE, this.tunerLookupPeriods);
     this.rollingStatsForCacheMisses = new RollingStatCalculator(this.tunerLookupPeriods);

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f074b0e..44350c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7028,7 +7028,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
     stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreDataSize.get() * 100) / this
         .memstoreFlushSize)));
-    stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100);
+    if (rsServices.getHeapMemoryManager() != null) {
+      stats.setHeapOccupancy(
+          (int) rsServices.getHeapMemoryManager().getHeapOccupancyPercent() * 100);
+    }
     stats.setCompactionPressure((int)rsServices.getCompactionPressure()*100 > 100 ? 100 :
                 (int)rsServices.getCompactionPressure()*100);
     return stats.build();

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 8e78422..56fc6eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryType;
 import java.lang.management.MemoryUsage;
 import java.lang.reflect.Constructor;
 import java.net.BindException;
@@ -100,6 +101,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.http.InfoServer;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
@@ -170,6 +172,7 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.JSONBean;
 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.util.Sleeper;
 import org.apache.hadoop.hbase.util.Threads;
@@ -516,6 +519,7 @@ public class HRegionServer extends HasThread implements
     super("RegionServer");  // thread name
     this.fsOk = true;
     this.conf = conf;
+    MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
     HFile.checkHFileVersion(this.conf);
     checkCodecs(this.conf);
     this.userProvider = UserProvider.instantiate(conf);
@@ -1451,6 +1455,8 @@ public class HRegionServer extends HasThread implements
 
       startServiceThreads();
       startHeapMemoryManager();
+      // Call it after starting HeapMemoryManager.
+      initializeMemStoreChunkPool();
       LOG.info("Serving as " + this.serverName +
         ", RpcServer on " + rpcServices.isa +
         ", sessionid=0x" +
@@ -1470,16 +1476,34 @@ public class HRegionServer extends HasThread implements
     }
   }
 
+  private void initializeMemStoreChunkPool() {
+    if (MemStoreLAB.isEnabled(conf)) {
+      // MSLAB is enabled. So initialize MemStoreChunkPool
+      // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
+      // it.
+      Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemstoreSize(conf);
+      long globalMemStoreSize = pair.getFirst();
+      boolean offheap = pair.getSecond() == MemoryType.NON_HEAP;
+      // When off heap memstore in use, take full area for chunk pool.
+      float poolSizePercentage = offheap ? 1.0F
+          : conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
+      float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
+          MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
+      int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
+      MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage,
+          initialCountPercentage, chunkSize, offheap);
+      if (pool != null && this.hMemManager != null) {
+        // Register with Heap Memory manager
+        this.hMemManager.registerTuneObserver(pool);
+      }
+    }
+  }
+
   private void startHeapMemoryManager() {
-    this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher,
-        this, this.regionServerAccounting);
+    this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this,
+        this.regionServerAccounting);
     if (this.hMemManager != null) {
       this.hMemManager.start(getChoreService());
-      MemStoreChunkPool chunkPool = MemStoreChunkPool.getPool(this.conf);
-      if (chunkPool != null) {
-        // Register it as HeapMemoryTuneObserver
-        this.hMemManager.registerTuneObserver(chunkPool);
-      }
     }
   }
 
@@ -3523,11 +3547,6 @@ public class HRegionServer extends HasThread implements
   }
 
   @Override
-  public HeapMemoryManager getHeapMemoryManager() {
-    return hMemManager;
-  }
-
-  @Override
   public double getCompactionPressure() {
     double max = 0;
     for (Region region : onlineRegions.values()) {
@@ -3541,6 +3560,11 @@ public class HRegionServer extends HasThread implements
     return max;
   }
 
+  @Override
+  public HeapMemoryManager getHeapMemoryManager() {
+    return hMemManager;
+  }
+
   /**
    * For testing
    * @return whether all wal roll request finished for this regionserver

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
deleted file mode 100644
index 99b2bb6..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.MemStoreChunkPool.PooledChunk;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-/**
- * A memstore-local allocation buffer.
- * <p>
- * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
- * big (2MB) byte[] chunks from and then doles it out to threads that request
- * slices into the array.
- * <p>
- * The purpose of this class is to combat heap fragmentation in the
- * regionserver. By ensuring that all KeyValues in a given memstore refer
- * only to large chunks of contiguous memory, we ensure that large blocks
- * get freed up when the memstore is flushed.
- * <p>
- * Without the MSLAB, the byte array allocated during insertion end up
- * interleaved throughout the heap, and the old generation gets progressively
- * more fragmented until a stop-the-world compacting collection occurs.
- * <p>
- * TODO: we should probably benchmark whether word-aligning the allocations
- * would provide a performance improvement - probably would speed up the
- * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
- * anyway
- */
-@InterfaceAudience.Private
-public class HeapMemStoreLAB implements MemStoreLAB {
-
-  static final String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
-  static final int CHUNK_SIZE_DEFAULT = 2048 * 1024;
-  static final String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
-  static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through
-                                                   // allocator
-
-  static final Log LOG = LogFactory.getLog(HeapMemStoreLAB.class);
-
-  private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
-  // A queue of chunks from pool contained by this memstore LAB
-  // TODO: in the future, it would be better to have List implementation instead of Queue,
-  // as FIFO order is not so important here
-  @VisibleForTesting
-  BlockingQueue<PooledChunk> pooledChunkQueue = null;
-  private final int chunkSize;
-  private final int maxAlloc;
-  private final MemStoreChunkPool chunkPool;
-
-  // This flag is for closing this instance, its set when clearing snapshot of
-  // memstore
-  private volatile boolean closed = false;
-  // This flag is for reclaiming chunks. Its set when putting chunks back to
-  // pool
-  private AtomicBoolean reclaimed = new AtomicBoolean(false);
-  // Current count of open scanners which reading data from this MemStoreLAB
-  private final AtomicInteger openScannerCount = new AtomicInteger();
-
-  // Used in testing
-  public HeapMemStoreLAB() {
-    this(new Configuration());
-  }
-
-  public HeapMemStoreLAB(Configuration conf) {
-    chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
-    maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
-    this.chunkPool = MemStoreChunkPool.getPool(conf);
-    // currently chunkQueue is only used for chunkPool
-    if (this.chunkPool != null) {
-      // set queue length to chunk pool max count to avoid keeping reference of
-      // too many non-reclaimable chunks
-      pooledChunkQueue = new LinkedBlockingQueue<PooledChunk>(chunkPool.getMaxCount());
-    }
-
-    // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
-    Preconditions.checkArgument(maxAlloc <= chunkSize,
-        MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
-  }
-
-
-  @Override
-  public Cell copyCellInto(Cell cell) {
-    int size = KeyValueUtil.length(cell);
-    Preconditions.checkArgument(size >= 0, "negative size");
-    // Callers should satisfy large allocations directly from JVM since they
-    // don't cause fragmentation as badly.
-    if (size > maxAlloc) {
-      return null;
-    }
-    Chunk c = null;
-    int allocOffset = 0;
-    while (true) {
-      c = getOrMakeChunk();
-      // Try to allocate from this chunk
-      allocOffset = c.alloc(size);
-      if (allocOffset != -1) {
-        // We succeeded - this is the common case - small alloc
-        // from a big buffer
-        break;
-      }
-      // not enough space!
-      // try to retire this chunk
-      tryRetireChunk(c);
-    }
-    return KeyValueUtil.copyCellTo(cell, c.getData(), allocOffset, size);
-  }
-
-  /**
-   * Close this instance since it won't be used any more, try to put the chunks
-   * back to pool
-   */
-  @Override
-  public void close() {
-    this.closed = true;
-    // We could put back the chunks to pool for reusing only when there is no
-    // opening scanner which will read their data
-    if (chunkPool != null && openScannerCount.get() == 0
-        && reclaimed.compareAndSet(false, true)) {
-      chunkPool.putbackChunks(this.pooledChunkQueue);
-    }
-  }
-
-  /**
-   * Called when opening a scanner on the data of this MemStoreLAB
-   */
-  @Override
-  public void incScannerCount() {
-    this.openScannerCount.incrementAndGet();
-  }
-
-  /**
-   * Called when closing a scanner on the data of this MemStoreLAB
-   */
-  @Override
-  public void decScannerCount() {
-    int count = this.openScannerCount.decrementAndGet();
-    if (this.closed && chunkPool != null && count == 0
-        && reclaimed.compareAndSet(false, true)) {
-      chunkPool.putbackChunks(this.pooledChunkQueue);
-    }
-  }
-
-  /**
-   * Try to retire the current chunk if it is still
-   * <code>c</code>. Postcondition is that curChunk.get()
-   * != c
-   * @param c the chunk to retire
-   * @return true if we won the race to retire the chunk
-   */
-  private void tryRetireChunk(Chunk c) {
-    curChunk.compareAndSet(c, null);
-    // If the CAS succeeds, that means that we won the race
-    // to retire the chunk. We could use this opportunity to
-    // update metrics on external fragmentation.
-    //
-    // If the CAS fails, that means that someone else already
-    // retired the chunk for us.
-  }
-
-  /**
-   * Get the current chunk, or, if there is no current chunk,
-   * allocate a new one from the JVM.
-   */
-  private Chunk getOrMakeChunk() {
-    while (true) {
-      // Try to get the chunk
-      Chunk c = curChunk.get();
-      if (c != null) {
-        return c;
-      }
-
-      // No current chunk, so we want to allocate one. We race
-      // against other allocators to CAS in an uninitialized chunk
-      // (which is cheap to allocate)
-      if (chunkPool != null) {
-        c = chunkPool.getChunk();
-      }
-      boolean pooledChunk = false;
-      if (c != null) {
-        // This is chunk from pool
-        pooledChunk = true;
-      } else {
-        c = new Chunk(chunkSize);
-      }
-      if (curChunk.compareAndSet(null, c)) {
-        // we won race - now we need to actually do the expensive
-        // allocation step
-        c.init();
-        if (pooledChunk) {
-          if (!this.closed && !this.pooledChunkQueue.offer((PooledChunk) c)) {
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
-                  + pooledChunkQueue.size());
-            }
-          }
-        }
-        return c;
-      } else if (pooledChunk) {
-        chunkPool.putbackChunk((PooledChunk) c);
-      }
-      // someone else won race - that's fine, we'll try to grab theirs
-      // in the next iteration of the loop.
-    }
-  }
-
-  @VisibleForTesting
-  Chunk getCurrentChunk() {
-    return this.curChunk.get();
-  }
-
-
-  BlockingQueue<PooledChunk> getPooledChunks() {
-    return this.pooledChunkQueue;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
index 7646293..a2f546a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
@@ -36,13 +36,15 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
-import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
 /**
- * Manages tuning of Heap memory using <code>HeapMemoryTuner</code>.
+ * Manages tuning of Heap memory using <code>HeapMemoryTuner</code>. Most part of the heap memory is
+ * split between Memstores and BlockCache. This manager helps in tuning sizes of both these
+ * dynamically, as per the R/W load on the servers.
  */
 @InterfaceAudience.Private
 public class HeapMemoryManager {
@@ -91,7 +93,7 @@ public class HeapMemoryManager {
   private List<HeapMemoryTuneObserver> tuneObservers = new ArrayList<HeapMemoryTuneObserver>();
 
   public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher,
-                Server server, RegionServerAccounting regionServerAccounting) {
+      Server server, RegionServerAccounting regionServerAccounting) {
     ResizableBlockCache l1Cache = CacheConfig.getL1(conf);
     if (l1Cache != null) {
       return new HeapMemoryManager(l1Cache, memStoreFlusher, server, regionServerAccounting);
@@ -117,10 +119,10 @@ public class HeapMemoryManager {
 
   private boolean doInit(Configuration conf) {
     boolean tuningEnabled = true;
-    globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false);
+    globalMemStorePercent = MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false);
     blockCachePercent = conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY,
         HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
-    HeapMemorySizeUtil.checkForClusterFreeMemoryLimit(conf);
+    MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(conf);
     // Initialize max and min range for memstore heap space
     globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY,
         globalMemStorePercent);
@@ -128,14 +130,14 @@ public class HeapMemoryManager {
         globalMemStorePercent);
     if (globalMemStorePercent < globalMemStorePercentMinRange) {
       LOG.warn("Setting " + MEMSTORE_SIZE_MIN_RANGE_KEY + " to " + globalMemStorePercent
-          + ", same value as " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
+          + ", same value as " + MemorySizeUtil.MEMSTORE_SIZE_KEY
           + " because supplied value greater than initial memstore size value.");
       globalMemStorePercentMinRange = globalMemStorePercent;
       conf.setFloat(MEMSTORE_SIZE_MIN_RANGE_KEY, globalMemStorePercentMinRange);
     }
     if (globalMemStorePercent > globalMemStorePercentMaxRange) {
       LOG.warn("Setting " + MEMSTORE_SIZE_MAX_RANGE_KEY + " to " + globalMemStorePercent
-          + ", same value as " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
+          + ", same value as " + MemorySizeUtil.MEMSTORE_SIZE_KEY
           + " because supplied value less than initial memstore size value.");
       globalMemStorePercentMaxRange = globalMemStorePercent;
       conf.setFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, globalMemStorePercentMaxRange);
@@ -167,7 +169,7 @@ public class HeapMemoryManager {
     }
 
     int gml = (int) (globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE);
-    this.l2BlockCachePercent = HeapMemorySizeUtil.getL2BlockCacheHeapPercent(conf);
+    this.l2BlockCachePercent = MemorySizeUtil.getL2BlockCacheHeapPercent(conf);
     int bcul = (int) ((blockCachePercentMinRange + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
     if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
       throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
@@ -340,7 +342,7 @@ public class HeapMemoryManager {
         if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
           LOG.info("Current heap configuration from HeapMemoryTuner exceeds "
               + "the threshold required for successful cluster operation. "
-              + "The combined value cannot exceed 0.8. " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
+              + "The combined value cannot exceed 0.8. " + MemorySizeUtil.MEMSTORE_SIZE_KEY
               + " is " + memstoreSize + " and " + HFILE_BLOCK_CACHE_SIZE_KEY + " is "
               + blockCacheSize);
           // TODO can adjust the value so as not exceed 80%. Is that correct? may be.

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
index db2cd18..926dd7a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.lang.management.ManagementFactory;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -29,8 +28,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
 import org.apache.hadoop.util.StringUtils;
 
@@ -45,7 +42,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  * collection on JVM.
  * 
  * The pool instance is globally unique and could be obtained through
- * {@link MemStoreChunkPool#getPool(Configuration)}
+ * {@link MemStoreChunkPool#initialize(long, float, float, int, boolean)}
  * 
  * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating
  * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called
@@ -55,10 +52,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 @InterfaceAudience.Private
 public class MemStoreChunkPool implements HeapMemoryTuneObserver {
   private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class);
-  final static String CHUNK_POOL_MAXSIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize";
-  final static String CHUNK_POOL_INITIALSIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize";
-  final static float POOL_MAX_SIZE_DEFAULT = 1.0f;
-  final static float POOL_INITIAL_SIZE_DEFAULT = 0.0f;
 
   // Static reference to the MemStoreChunkPool
   static MemStoreChunkPool GLOBAL_INSTANCE;
@@ -68,7 +61,7 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
   private int maxCount;
 
   // A queue of reclaimed chunks
-  private final BlockingQueue<PooledChunk> reclaimedChunks;
+  private final BlockingQueue<Chunk> reclaimedChunks;
   private final int chunkSize;
   private final float poolSizePercentage;
 
@@ -78,15 +71,17 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
   private static final int statThreadPeriod = 60 * 5;
   private final AtomicLong chunkCount = new AtomicLong();
   private final AtomicLong reusedChunkCount = new AtomicLong();
+  private final boolean offheap;
 
-  MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount,
-      int initialCount, float poolSizePercentage) {
+  MemStoreChunkPool(int chunkSize, int maxCount, int initialCount, float poolSizePercentage,
+      boolean offheap) {
     this.maxCount = maxCount;
     this.chunkSize = chunkSize;
     this.poolSizePercentage = poolSizePercentage;
-    this.reclaimedChunks = new LinkedBlockingQueue<PooledChunk>();
+    this.offheap = offheap;
+    this.reclaimedChunks = new LinkedBlockingQueue<>();
     for (int i = 0; i < initialCount; i++) {
-      PooledChunk chunk = new PooledChunk(chunkSize);
+      Chunk chunk = this.offheap ? new OffheapChunk(chunkSize) : new OnheapChunk(chunkSize);
       chunk.init();
       reclaimedChunks.add(chunk);
     }
@@ -108,8 +103,8 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
    * @see #putbackChunk(Chunk)
    * @see #putbackChunks(BlockingQueue)
    */
-  PooledChunk getChunk() {
-    PooledChunk chunk = reclaimedChunks.poll();
+  Chunk getChunk() {
+    Chunk chunk = reclaimedChunks.poll();
     if (chunk != null) {
       chunk.reset();
       reusedChunkCount.incrementAndGet();
@@ -118,7 +113,7 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
       while (true) {
         long created = this.chunkCount.get();
         if (created < this.maxCount) {
-          chunk = new PooledChunk(chunkSize);
+          chunk = this.offheap ? new OffheapChunk(this.chunkSize) : new OnheapChunk(this.chunkSize);
           if (this.chunkCount.compareAndSet(created, created + 1)) {
             break;
           }
@@ -135,9 +130,9 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
    * skip the remaining chunks
    * @param chunks
    */
-  synchronized void putbackChunks(BlockingQueue<PooledChunk> chunks) {
+  synchronized void putbackChunks(BlockingQueue<Chunk> chunks) {
     int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
-    PooledChunk chunk = null;
+    Chunk chunk = null;
     while ((chunk = chunks.poll()) != null && toAdd > 0) {
       reclaimedChunks.add(chunk);
       toAdd--;
@@ -149,7 +144,7 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
    * skip it
    * @param chunk
    */
-  synchronized void putbackChunk(PooledChunk chunk) {
+  synchronized void putbackChunk(Chunk chunk) {
     if (reclaimedChunks.size() < this.maxCount) {
       reclaimedChunks.add(chunk);
     }
@@ -191,51 +186,41 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
   }
 
   /**
-   * @param conf
    * @return the global MemStoreChunkPool instance
    */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DC_DOUBLECHECK",
-      justification="Intentional")
-  static MemStoreChunkPool getPool(Configuration conf) {
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC",
+      justification = "Method is called by single thread at the starting of RS")
+  static MemStoreChunkPool initialize(long globalMemStoreSize, float poolSizePercentage,
+      float initialCountPercentage, int chunkSize, boolean offheap) {
     if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE;
+    if (chunkPoolDisabled) return null;
 
-    synchronized (MemStoreChunkPool.class) {
-      if (chunkPoolDisabled) return null;
-      if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE;
-      // When MSLAB is turned OFF no need to init chunk pool at all.
-      if (!conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
-        chunkPoolDisabled = true;
-        return null;
-      }
-      float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, POOL_MAX_SIZE_DEFAULT);
-      if (poolSizePercentage <= 0) {
-        chunkPoolDisabled = true;
-        return null;
-      }
-      if (poolSizePercentage > 1.0) {
-        throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
-      }
-      long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
-      long globalMemStoreLimit = (long) (heapMax * HeapMemorySizeUtil.getGlobalMemStorePercent(conf,
-          false));
-      int chunkSize = conf.getInt(HeapMemStoreLAB.CHUNK_SIZE_KEY,
-          HeapMemStoreLAB.CHUNK_SIZE_DEFAULT);
-      int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize);
-
-      float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY,
-          POOL_INITIAL_SIZE_DEFAULT);
-      if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
-        throw new IllegalArgumentException(CHUNK_POOL_INITIALSIZE_KEY
-            + " must be between 0.0 and 1.0");
-      }
-
-      int initialCount = (int) (initialCountPercentage * maxCount);
-      LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
-          + ", max count " + maxCount + ", initial count " + initialCount);
-      GLOBAL_INSTANCE = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount,
-          poolSizePercentage);
-      return GLOBAL_INSTANCE;
+    if (poolSizePercentage <= 0) {
+      chunkPoolDisabled = true;
+      return null;
+    }
+    if (poolSizePercentage > 1.0) {
+      throw new IllegalArgumentException(
+          MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
+    }
+    int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize);
+    if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
+      throw new IllegalArgumentException(
+          MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0");
     }
+    int initialCount = (int) (initialCountPercentage * maxCount);
+    LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
+        + ", max count " + maxCount + ", initial count " + initialCount);
+    GLOBAL_INSTANCE = new MemStoreChunkPool(chunkSize, maxCount, initialCount, poolSizePercentage,
+        offheap);
+    return GLOBAL_INSTANCE;
+  }
+
+  /**
+   * @return The singleton instance of this pool.
+   */
+  static MemStoreChunkPool getPool() {
+    return GLOBAL_INSTANCE;
   }
 
   int getMaxCount() {
@@ -247,12 +232,6 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
     chunkPoolDisabled = false;
   }
 
-  public static class PooledChunk extends Chunk {
-    PooledChunk(int size) {
-      super(size);
-    }
-  }
-
   @Override
   public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
     int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize);

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 2f4d225..15cf97c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -24,7 +24,7 @@ import com.google.common.base.Preconditions;
 
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
-import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryType;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
@@ -49,11 +49,12 @@ import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.ipc.RemoteException;
@@ -109,12 +110,21 @@ class MemStoreFlusher implements FlushRequester {
     this.conf = conf;
     this.server = server;
     this.threadWakeFrequency =
-      conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
-    long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
-    float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
-    this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
-    this.globalMemStoreLimitLowMarkPercent =
-        HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
+        conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
+    Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemstoreSize(conf);
+    this.globalMemStoreLimit = pair.getFirst();
+    boolean onheap = pair.getSecond() == MemoryType.HEAP;
+    // When off heap memstore in use we configure the global off heap space for memstore as bytes
+    // not as % of max memory size. In such case, the lower water mark should be specified using the
+    // key "hbase.regionserver.global.memstore.size.lower.limit" which says % of the global upper
+    // bound and defaults to 95%. In on heap case also specifying this way is ideal. But in the past
+    // we used to take lower bound also as the % of xmx (38% as default). For backward compatibility
+    // for this deprecated config,we will fall back to read that config when new one is missing.
+    // Only for on heap case, do this fallback mechanism. For off heap it makes no sense.
+    // TODO When to get rid of the deprecated config? ie
+    // "hbase.regionserver.global.memstore.lowerLimit". Can get rid of this boolean passing then.
+    this.globalMemStoreLimitLowMarkPercent = MemorySizeUtil.getGlobalMemStoreHeapLowerMark(conf,
+        onheap);
     this.globalMemStoreLimitLowMark =
         (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
 
@@ -126,7 +136,7 @@ class MemStoreFlusher implements FlushRequester {
         + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1)
         + ", globalMemStoreLimitLowMark="
         + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1)
-        + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1));
+        + ", Offheap=" + !onheap);
   }
 
   public LongAdder getUpdatesBlockedMsHighWater() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
index 706e243..f6d1607 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 /**
  * A memstore-local allocation buffer.
@@ -46,6 +48,19 @@ public interface MemStoreLAB {
 
   String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
   boolean USEMSLAB_DEFAULT = true;
+  String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
+
+  String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
+  int CHUNK_SIZE_DEFAULT = 2048 * 1024;
+  String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
+  int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through
+                                                   // allocator
+
+  // MSLAB pool related configs
+  String CHUNK_POOL_MAXSIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize";
+  String CHUNK_POOL_INITIALSIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize";
+  float POOL_MAX_SIZE_DEFAULT = 1.0f;
+  float POOL_INITIAL_SIZE_DEFAULT = 0.0f;
 
   /**
    * Allocates slice in this LAB and copy the passed Cell into this area. Returns new Cell instance
@@ -68,4 +83,17 @@ public interface MemStoreLAB {
    */
   void decScannerCount();
 
+  public static MemStoreLAB newInstance(Configuration conf) {
+    MemStoreLAB memStoreLAB = null;
+    if (isEnabled(conf)) {
+      String className = conf.get(MSLAB_CLASS_NAME, MemStoreLABImpl.class.getName());
+      memStoreLAB = ReflectionUtils.instantiateWithCustomCtor(className,
+          new Class[] { Configuration.class }, new Object[] { conf });
+    }
+    return memStoreLAB;
+  }
+
+  public static boolean isEnabled(Configuration conf) {
+    return conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
new file mode 100644
index 0000000..30e4311
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -0,0 +1,243 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * A memstore-local allocation buffer.
+ * <p>
+ * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
+ * big (2MB) byte[] chunks from and then doles it out to threads that request
+ * slices into the array.
+ * <p>
+ * The purpose of this class is to combat heap fragmentation in the
+ * regionserver. By ensuring that all Cells in a given memstore refer
+ * only to large chunks of contiguous memory, we ensure that large blocks
+ * get freed up when the memstore is flushed.
+ * <p>
+ * Without the MSLAB, the byte array allocated during insertion end up
+ * interleaved throughout the heap, and the old generation gets progressively
+ * more fragmented until a stop-the-world compacting collection occurs.
+ * <p>
+ * TODO: we should probably benchmark whether word-aligning the allocations
+ * would provide a performance improvement - probably would speed up the
+ * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
+ * anyway.
+ * The chunks created by this MemStoreLAB can get pooled at {@link MemStoreChunkPool}.
+ * When the Chunk comes pool, it can be either an on heap or an off heap backed chunk. The chunks,
+ * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be
+ * always on heap backed.
+ */
+@InterfaceAudience.Private
+public class MemStoreLABImpl implements MemStoreLAB {
+
+  static final Log LOG = LogFactory.getLog(MemStoreLABImpl.class);
+
+  private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
+  // A queue of chunks from pool contained by this memstore LAB
+  // TODO: in the future, it would be better to have List implementation instead of Queue,
+  // as FIFO order is not so important here
+  @VisibleForTesting
+  BlockingQueue<Chunk> pooledChunkQueue = null;
+  private final int chunkSize;
+  private final int maxAlloc;
+  private final MemStoreChunkPool chunkPool;
+
+  // This flag is for closing this instance, its set when clearing snapshot of
+  // memstore
+  private volatile boolean closed = false;
+  // This flag is for reclaiming chunks. Its set when putting chunks back to
+  // pool
+  private AtomicBoolean reclaimed = new AtomicBoolean(false);
+  // Current count of open scanners which reading data from this MemStoreLAB
+  private final AtomicInteger openScannerCount = new AtomicInteger();
+
+  // Used in testing
+  public MemStoreLABImpl() {
+    this(new Configuration());
+  }
+
+  public MemStoreLABImpl(Configuration conf) {
+    chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
+    maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
+    this.chunkPool = MemStoreChunkPool.getPool();
+    // currently chunkQueue is only used for chunkPool
+    if (this.chunkPool != null) {
+      // set queue length to chunk pool max count to avoid keeping reference of
+      // too many non-reclaimable chunks
+      pooledChunkQueue = new LinkedBlockingQueue<>(chunkPool.getMaxCount());
+    }
+
+    // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
+    Preconditions.checkArgument(maxAlloc <= chunkSize,
+        MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
+  }
+
+
+  @Override
+  public Cell copyCellInto(Cell cell) {
+    int size = KeyValueUtil.length(cell);
+    Preconditions.checkArgument(size >= 0, "negative size");
+    // Callers should satisfy large allocations directly from JVM since they
+    // don't cause fragmentation as badly.
+    if (size > maxAlloc) {
+      return null;
+    }
+    Chunk c = null;
+    int allocOffset = 0;
+    while (true) {
+      c = getOrMakeChunk();
+      // Try to allocate from this chunk
+      allocOffset = c.alloc(size);
+      if (allocOffset != -1) {
+        // We succeeded - this is the common case - small alloc
+        // from a big buffer
+        break;
+      }
+      // not enough space!
+      // try to retire this chunk
+      tryRetireChunk(c);
+    }
+    return CellUtil.copyCellTo(cell, c.getData(), allocOffset, size);
+  }
+
+  /**
+   * Close this instance since it won't be used any more, try to put the chunks
+   * back to pool
+   */
+  @Override
+  public void close() {
+    this.closed = true;
+    // We could put back the chunks to pool for reusing only when there is no
+    // opening scanner which will read their data
+    if (chunkPool != null && openScannerCount.get() == 0
+        && reclaimed.compareAndSet(false, true)) {
+      chunkPool.putbackChunks(this.pooledChunkQueue);
+    }
+  }
+
+  /**
+   * Called when opening a scanner on the data of this MemStoreLAB
+   */
+  @Override
+  public void incScannerCount() {
+    this.openScannerCount.incrementAndGet();
+  }
+
+  /**
+   * Called when closing a scanner on the data of this MemStoreLAB
+   */
+  @Override
+  public void decScannerCount() {
+    int count = this.openScannerCount.decrementAndGet();
+    if (this.closed && chunkPool != null && count == 0
+        && reclaimed.compareAndSet(false, true)) {
+      chunkPool.putbackChunks(this.pooledChunkQueue);
+    }
+  }
+
+  /**
+   * Try to retire the current chunk if it is still
+   * <code>c</code>. Postcondition is that curChunk.get()
+   * != c
+   * @param c the chunk to retire
+   * @return true if we won the race to retire the chunk
+   */
+  private void tryRetireChunk(Chunk c) {
+    curChunk.compareAndSet(c, null);
+    // If the CAS succeeds, that means that we won the race
+    // to retire the chunk. We could use this opportunity to
+    // update metrics on external fragmentation.
+    //
+    // If the CAS fails, that means that someone else already
+    // retired the chunk for us.
+  }
+
+  /**
+   * Get the current chunk, or, if there is no current chunk,
+   * allocate a new one from the JVM.
+   */
+  private Chunk getOrMakeChunk() {
+    while (true) {
+      // Try to get the chunk
+      Chunk c = curChunk.get();
+      if (c != null) {
+        return c;
+      }
+
+      // No current chunk, so we want to allocate one. We race
+      // against other allocators to CAS in an uninitialized chunk
+      // (which is cheap to allocate)
+      if (chunkPool != null) {
+        c = chunkPool.getChunk();
+      }
+      boolean pooledChunk = false;
+      if (c != null) {
+        // This is chunk from pool
+        pooledChunk = true;
+      } else {
+        c = new OnheapChunk(chunkSize);// When chunk is not from pool, always make it as on heap.
+      }
+      if (curChunk.compareAndSet(null, c)) {
+        // we won race - now we need to actually do the expensive
+        // allocation step
+        c.init();
+        if (pooledChunk) {
+          if (!this.closed && !this.pooledChunkQueue.offer(c)) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
+                  + pooledChunkQueue.size());
+            }
+          }
+        }
+        return c;
+      } else if (pooledChunk) {
+        chunkPool.putbackChunk(c);
+      }
+      // someone else won race - that's fine, we'll try to grab theirs
+      // in the next iteration of the loop.
+    }
+  }
+
+  @VisibleForTesting
+  Chunk getCurrentChunk() {
+    return this.curChunk.get();
+  }
+
+
+  BlockingQueue<Chunk> getPooledChunks() {
+    return this.pooledChunkQueue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
new file mode 100644
index 0000000..ed98cfa
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An off heap chunk implementation.
+ */
+@InterfaceAudience.Private
+public class OffheapChunk extends Chunk {
+
+  OffheapChunk(int size) {
+    super(size);
+  }
+
+  @Override
+  public void init() {
+    assert nextFreeOffset.get() == UNINITIALIZED;
+    try {
+      if (data == null) {
+        data = ByteBuffer.allocateDirect(this.size);
+      }
+    } catch (OutOfMemoryError e) {
+      boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
+      assert failInit; // should be true.
+      throw e;
+    }
+    // Mark that it's ready for use
+    boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
+    // We should always succeed the above CAS since only one thread
+    // calls init()!
+    Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
new file mode 100644
index 0000000..bd33cb5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An on heap chunk implementation.
+ */
+@InterfaceAudience.Private
+public class OnheapChunk extends Chunk {
+
+  OnheapChunk(int size) {
+    super(size);
+  }
+
+  public void init() {
+    assert nextFreeOffset.get() == UNINITIALIZED;
+    try {
+      if (data == null) {
+        data = ByteBuffer.allocate(this.size);
+      }
+    } catch (OutOfMemoryError e) {
+      boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
+      assert failInit; // should be true.
+      throw e;
+    }
+    // Mark that it's ready for use
+    boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
+    // We should always succeed the above CAS since only one thread
+    // calls init()!
+    Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 554b29a..a61a9f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -45,7 +45,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ByteBufferedCell;
+import org.apache.hadoop.hbase.ByteBufferCell;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
@@ -1174,8 +1174,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         // Since byte buffers can point all kinds of crazy places it's harder to keep track
         // of which blocks are kept alive by what byte buffer.
         // So we make a guess.
-        if (c instanceof ByteBufferedCell) {
-          ByteBufferedCell bbCell = (ByteBufferedCell) c;
+        if (c instanceof ByteBufferCell) {
+          ByteBufferCell bbCell = (ByteBufferCell) c;
           ByteBuffer bb = bbCell.getValueByteBuffer();
           if (bb != lastBlock) {
             context.incrementResponseBlockSize(bb.capacity());

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
index fa8860a..01e07ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -35,8 +34,6 @@ import java.util.List;
 @InterfaceAudience.Private
 public final class SegmentFactory {
 
-  static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
-
   private SegmentFactory() {}
   private static SegmentFactory instance = new SegmentFactory();
 
@@ -47,7 +44,7 @@ public final class SegmentFactory {
   // create skip-list-based (non-flat) immutable segment from compacting old immutable segments
   public ImmutableSegment createImmutableSegment(final Configuration conf,
       final CellComparator comparator, MemStoreSegmentsIterator iterator) {
-    return new ImmutableSegment(comparator, iterator, getMemStoreLAB(conf));
+    return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf));
   }
 
   // create new flat immutable segment from compacting old immutable segments
@@ -57,7 +54,7 @@ public final class SegmentFactory {
       throws IOException {
     Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED,
         "wrong immutable segment type");
-    MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
+    MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf);
     return
         // the last parameter "false" means not to merge, but to compact the pipeline
         // in order to create the new segment
@@ -77,7 +74,7 @@ public final class SegmentFactory {
 
   // create mutable segment
   public MutableSegment createMutableSegment(final Configuration conf, CellComparator comparator) {
-    MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
+    MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf);
     return generateMutableSegment(conf, comparator, memStoreLAB);
   }
 
@@ -103,16 +100,6 @@ public final class SegmentFactory {
     return new MutableSegment(set, comparator, memStoreLAB);
   }
 
-  private MemStoreLAB getMemStoreLAB(Configuration conf) {
-    MemStoreLAB memStoreLAB = null;
-    if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
-      String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
-      memStoreLAB = ReflectionUtils.instantiateWithCustomCtor(className,
-          new Class[] { Configuration.class }, new Object[] { conf });
-    }
-    return memStoreLAB;
-  }
-
   private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List<ImmutableSegment> segments) {
     List<MemStoreLAB> mslabs = new ArrayList<MemStoreLAB>();
     for (ImmutableSegment segment : segments) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index aa57881..e7c4f98 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CollectionUtils;
 import org.apache.hadoop.hbase.util.DrainBarrier;
@@ -386,8 +386,7 @@ public abstract class AbstractFSWAL<W> implements WAL {
     this.logrollsize = (long) (blocksize
         * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
 
-    float memstoreRatio = conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, conf.getFloat(
-      HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY, HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE));
+    float memstoreRatio = MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false);
     boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null;
     if (maxLogsDefined) {
       LOG.warn("'hbase.regionserver.maxlogs' was deprecated.");

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueFilter.java
index 059d717..48a5b6f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueFilter.java
@@ -27,7 +27,7 @@ import java.util.regex.Pattern;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TestCellUtil.ByteBufferedCellImpl;
+import org.apache.hadoop.hbase.TestCellUtil.ByteBufferCellImpl;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.testclassification.FilterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -108,7 +108,7 @@ public class TestSingleColumnValueFilter {
     assertTrue("less than", filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW);
     filter.reset();
     byte[] buffer = kv.getBuffer();
-    Cell c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    Cell c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("less than", filter.filterKeyValue(c) == Filter.ReturnCode.NEXT_ROW);
     filter.reset();
 
@@ -117,7 +117,7 @@ public class TestSingleColumnValueFilter {
     assertTrue("Equals 100", filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW);
     filter.reset();
     buffer = kv.getBuffer();
-    c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("Equals 100", filter.filterKeyValue(c) == Filter.ReturnCode.NEXT_ROW);
     filter.reset();
 
@@ -126,7 +126,7 @@ public class TestSingleColumnValueFilter {
     assertTrue("include 120", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
     filter.reset();
     buffer = kv.getBuffer();
-    c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("include 120", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
   }
 
@@ -135,29 +135,29 @@ public class TestSingleColumnValueFilter {
     KeyValue kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_2);
     assertTrue("basicFilter1", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
     byte[] buffer = kv.getBuffer();
-    Cell c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    Cell c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("basicFilter1", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
     kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_3);
     assertTrue("basicFilter2", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
     buffer = kv.getBuffer();
-    c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("basicFilter2", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
     kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_4);
     assertTrue("basicFilter3", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
     buffer = kv.getBuffer();
-    c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("basicFilter3", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
     assertFalse("basicFilterNotNull", filter.filterRow());
     filter.reset();
     kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1);
     assertTrue("basicFilter4", filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW);
     buffer = kv.getBuffer();
-    c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("basicFilter4", filter.filterKeyValue(c) == Filter.ReturnCode.NEXT_ROW);
     kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_2);
     assertTrue("basicFilter4", filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW);
     buffer = kv.getBuffer();
-    c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("basicFilter4", filter.filterKeyValue(c) == Filter.ReturnCode.NEXT_ROW);
     assertFalse("basicFilterAllRemaining", filter.filterAllRemaining());
     assertTrue("basicFilterNotNull", filter.filterRow());
@@ -166,12 +166,12 @@ public class TestSingleColumnValueFilter {
     kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1);
     assertTrue("basicFilter5", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
     buffer = kv.getBuffer();
-    c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("basicFilter5", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
     kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_2);
     assertTrue("basicFilter5", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
     buffer = kv.getBuffer();
-    c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("basicFilter5", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
     assertFalse("basicFilterNotNull", filter.filterRow());
   }
@@ -181,14 +181,14 @@ public class TestSingleColumnValueFilter {
     KeyValue kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, FULLSTRING_1);
     assertTrue("null1", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
     byte[] buffer = kv.getBuffer();
-    Cell c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    Cell c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("null1", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
     assertFalse("null1FilterRow", filter.filterRow());
     filter.reset();
     kv = new KeyValue(ROW, COLUMN_FAMILY, Bytes.toBytes("qual2"), FULLSTRING_2);
     assertTrue("null2", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
     buffer = kv.getBuffer();
-    c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("null2", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
     assertTrue("null2FilterRow", filter.filterRow());
   }
@@ -200,13 +200,13 @@ public class TestSingleColumnValueFilter {
     assertTrue("substrTrue",
       filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
     byte[] buffer = kv.getBuffer();
-    Cell c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    Cell c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("substrTrue", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
     kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER,
       FULLSTRING_2);
     assertTrue("substrFalse", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
     buffer = kv.getBuffer();
-    c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("substrFalse", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
     assertFalse("substrFilterAllRemaining", filter.filterAllRemaining());
     assertFalse("substrFilterNotNull", filter.filterRow());
@@ -219,13 +219,13 @@ public class TestSingleColumnValueFilter {
     assertTrue("regexTrue",
       filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
     byte[] buffer = kv.getBuffer();
-    Cell c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    Cell c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("regexTrue", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
     kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER,
       FULLSTRING_2);
     assertTrue("regexFalse", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
     buffer = kv.getBuffer();
-    c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("regexFalse", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
     assertFalse("regexFilterAllRemaining", filter.filterAllRemaining());
     assertFalse("regexFilterNotNull", filter.filterRow());
@@ -238,7 +238,7 @@ public class TestSingleColumnValueFilter {
     assertTrue("regexTrue",
       filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
     byte[] buffer = kv.getBuffer();
-    Cell c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+    Cell c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
     assertTrue("regexTrue", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
     assertFalse("regexFilterAllRemaining", filter.filterAllRemaining());
     assertFalse("regexFilterNotNull", filter.filterRow());

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
index 62506ad..3b4d068 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
@@ -44,7 +44,6 @@ public class TestCellFlatSet extends TestCase {
   private Cell descCells[];
   private CellArrayMap descCbOnHeap;
   private final static Configuration CONF = new Configuration();
-  private HeapMemStoreLAB mslab;
   private KeyValue lowerOuterCell;
   private KeyValue upperOuterCell;
 
@@ -73,9 +72,8 @@ public class TestCellFlatSet extends TestCase {
     descCells = new Cell[] {kv4,kv3,kv2,kv1};
     descCbOnHeap = new CellArrayMap(CellComparator.COMPARATOR,descCells,0,NUM_OF_CELLS,true);
     CONF.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
-    CONF.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
+    CONF.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
     MemStoreChunkPool.chunkPoolDisabled = false;
-    mslab = new HeapMemStoreLAB(CONF);
   }
 
   /* Create and test CellSet based on CellArrayMap */

http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 4f2b12f..d1bbd50 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueTestUtil;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -87,7 +89,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     super.internalSetUp();
     Configuration conf = new Configuration();
     conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
-    conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
+    conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
     conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
     HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf);
     HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
@@ -95,7 +97,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     this.regionServicesForStores = region.getRegionServicesForStores();
     this.store = new HStore(region, hcd, conf);
 
-    chunkPool = MemStoreChunkPool.getPool(conf);
+    long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
+        .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
+    chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f,
+        MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false);
     assertTrue(chunkPool != null);
   }