You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2016/11/24 15:18:04 UTC
[2/4] hbase git commit: HBASE-15786 Create DBB backed MSLAB pool.
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java
new file mode 100644
index 0000000..e7520a2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.util;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryType;
+import java.lang.management.MemoryUsage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Util class to calculate memory size for memstore, block cache(L1, L2) of RS.
+ */
+@InterfaceAudience.Private
+public class MemorySizeUtil {
+
+ public static final String MEMSTORE_SIZE_KEY = "hbase.regionserver.global.memstore.size";
+ public static final String MEMSTORE_SIZE_OLD_KEY =
+ "hbase.regionserver.global.memstore.upperLimit";
+ public static final String MEMSTORE_SIZE_LOWER_LIMIT_KEY =
+ "hbase.regionserver.global.memstore.size.lower.limit";
+ public static final String MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY =
+ "hbase.regionserver.global.memstore.lowerLimit";
+ // Max global off heap memory that can be used for all memstores
+ // This should be an absolute value in MBs and not percent.
+ public static final String OFFHEAP_MEMSTORE_SIZE_KEY =
+ "hbase.regionserver.offheap.global.memstore.size";
+
+ public static final float DEFAULT_MEMSTORE_SIZE = 0.4f;
+ // Default lower water mark limit is 95% size of memstore size.
+ public static final float DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT = 0.95f;
+
+ private static final Log LOG = LogFactory.getLog(MemorySizeUtil.class);
+ // a constant to convert a fraction to a percentage
+ private static final int CONVERT_TO_PERCENTAGE = 100;
+
+ /**
+ * Checks whether we have enough heap memory left out after portion for Memstore and Block cache.
+ * We need atleast 20% of heap left out for other RS functions.
+ * @param conf
+ */
+ public static void checkForClusterFreeHeapMemoryLimit(Configuration conf) {
+ if (conf.get(MEMSTORE_SIZE_OLD_KEY) != null) {
+ LOG.warn(MEMSTORE_SIZE_OLD_KEY + " is deprecated by " + MEMSTORE_SIZE_KEY);
+ }
+ float globalMemstoreSize = getGlobalMemStoreHeapPercent(conf, false);
+ int gml = (int)(globalMemstoreSize * CONVERT_TO_PERCENTAGE);
+ float blockCacheUpperLimit = getBlockCacheHeapPercent(conf);
+ int bcul = (int)(blockCacheUpperLimit * CONVERT_TO_PERCENTAGE);
+ if (CONVERT_TO_PERCENTAGE - (gml + bcul)
+ < (int)(CONVERT_TO_PERCENTAGE *
+ HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD)) {
+ throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
+ + "the threshold required for successful cluster operation. "
+ + "The combined value cannot exceed 0.8. Please check "
+ + "the settings for hbase.regionserver.global.memstore.size and "
+ + "hfile.block.cache.size in your configuration. "
+ + "hbase.regionserver.global.memstore.size is " + globalMemstoreSize
+ + " hfile.block.cache.size is " + blockCacheUpperLimit);
+ }
+ }
+
+ /**
+ * Retrieve global memstore configured size as percentage of total heap.
+ * @param c
+ * @param logInvalid
+ */
+ public static float getGlobalMemStoreHeapPercent(final Configuration c,
+ final boolean logInvalid) {
+ float limit = c.getFloat(MEMSTORE_SIZE_KEY,
+ c.getFloat(MEMSTORE_SIZE_OLD_KEY, DEFAULT_MEMSTORE_SIZE));
+ if (limit > 0.8f || limit <= 0.0f) {
+ if (logInvalid) {
+ LOG.warn("Setting global memstore limit to default of " + DEFAULT_MEMSTORE_SIZE
+ + " because supplied value outside allowed range of (0 -> 0.8]");
+ }
+ limit = DEFAULT_MEMSTORE_SIZE;
+ }
+ return limit;
+ }
+
+ /**
+ * Retrieve configured size for global memstore lower water mark as fraction of global memstore
+ * size.
+ */
+ public static float getGlobalMemStoreHeapLowerMark(final Configuration conf,
+ boolean honorOldConfig) {
+ String lowMarkPercentStr = conf.get(MEMSTORE_SIZE_LOWER_LIMIT_KEY);
+ if (lowMarkPercentStr != null) {
+ float lowMarkPercent = Float.parseFloat(lowMarkPercentStr);
+ if (lowMarkPercent > 1.0f) {
+ LOG.error("Bad configuration value for " + MEMSTORE_SIZE_LOWER_LIMIT_KEY + ": "
+ + lowMarkPercent + ". Using 1.0f instead.");
+ lowMarkPercent = 1.0f;
+ }
+ return lowMarkPercent;
+ }
+ if (!honorOldConfig) return DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT;
+ String lowerWaterMarkOldValStr = conf.get(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY);
+ if (lowerWaterMarkOldValStr != null) {
+ LOG.warn(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " is deprecated. Instead use "
+ + MEMSTORE_SIZE_LOWER_LIMIT_KEY);
+ float lowerWaterMarkOldVal = Float.parseFloat(lowerWaterMarkOldValStr);
+ float upperMarkPercent = getGlobalMemStoreHeapPercent(conf, false);
+ if (lowerWaterMarkOldVal > upperMarkPercent) {
+ lowerWaterMarkOldVal = upperMarkPercent;
+ LOG.error("Value of " + MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " (" + lowerWaterMarkOldVal
+ + ") is greater than global memstore limit (" + upperMarkPercent + ") set by "
+ + MEMSTORE_SIZE_KEY + "/" + MEMSTORE_SIZE_OLD_KEY + ". Setting memstore lower limit "
+ + "to " + upperMarkPercent);
+ }
+ return lowerWaterMarkOldVal / upperMarkPercent;
+ }
+ return DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT;
+ }
+
+ /**
+ * @return Pair of global memstore size and memory type(ie. on heap or off heap).
+ */
+ public static Pair<Long, MemoryType> getGlobalMemstoreSize(Configuration conf) {
+ long offheapMSGlobal = conf.getLong(OFFHEAP_MEMSTORE_SIZE_KEY, 0);// Size in MBs
+ if (offheapMSGlobal > 0) {
+ // Off heap memstore size has not relevance when MSLAB is turned OFF. We will go with making
+ // this entire size split into Chunks and pooling them in MemstoreLABPoool. We dont want to
+ // create so many on demand off heap chunks. In fact when this off heap size is configured, we
+ // will go with 100% of this size as the pool size
+ if (MemStoreLAB.isEnabled(conf)) {
+ // We are in offheap Memstore use
+ long globalMemStoreLimit = (long) (offheapMSGlobal * 1024 * 1024); // Size in bytes
+ return new Pair<Long, MemoryType>(globalMemStoreLimit, MemoryType.NON_HEAP);
+ } else {
+ // Off heap max memstore size is configured with turning off MSLAB. It makes no sense. Do a
+ // warn log and go with on heap memstore percentage. By default it will be 40% of Xmx
+ LOG.warn("There is no relevance of configuring '" + OFFHEAP_MEMSTORE_SIZE_KEY + "' when '"
+ + MemStoreLAB.USEMSLAB_KEY + "' is turned off."
+ + " Going with on heap global memstore size ('" + MEMSTORE_SIZE_KEY + "')");
+ }
+ }
+ long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
+ float globalMemStorePercent = getGlobalMemStoreHeapPercent(conf, true);
+ return new Pair<Long, MemoryType>((long) (max * globalMemStorePercent), MemoryType.HEAP);
+ }
+
+ /**
+ * Retrieve configured size for on heap block cache as percentage of total heap.
+ * @param conf
+ */
+ public static float getBlockCacheHeapPercent(final Configuration conf) {
+ // L1 block cache is always on heap
+ float l1CachePercent = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY,
+ HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
+ float l2CachePercent = getL2BlockCacheHeapPercent(conf);
+ return l1CachePercent + l2CachePercent;
+ }
+
+ /**
+ * @param conf
+ * @return The on heap size for L2 block cache.
+ */
+ public static float getL2BlockCacheHeapPercent(Configuration conf) {
+ float l2CachePercent = 0.0F;
+ String bucketCacheIOEngineName = conf.get(HConstants.BUCKET_CACHE_IOENGINE_KEY, null);
+ // L2 block cache can be on heap when IOEngine is "heap"
+ if (bucketCacheIOEngineName != null && bucketCacheIOEngineName.startsWith("heap")) {
+ float bucketCachePercentage = conf.getFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0F);
+ MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+ l2CachePercent = bucketCachePercentage < 1 ? bucketCachePercentage
+ : (bucketCachePercentage * 1024 * 1024) / mu.getMax();
+ }
+ return l2CachePercent;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
index d968ed9..2cbf0a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
@@ -17,34 +17,34 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
/**
* A chunk of memory out of which allocations are sliced.
*/
@InterfaceAudience.Private
-public class Chunk {
+public abstract class Chunk {
/** Actual underlying data */
- private byte[] data;
+ protected ByteBuffer data;
- private static final int UNINITIALIZED = -1;
- private static final int OOM = -2;
+ protected static final int UNINITIALIZED = -1;
+ protected static final int OOM = -2;
/**
* Offset for the next allocation, or the sentinel value -1 which implies that the chunk is still
* uninitialized.
*/
- private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
+ protected AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
/** Total number of allocations satisfied from this buffer */
- private AtomicInteger allocCount = new AtomicInteger();
+ protected AtomicInteger allocCount = new AtomicInteger();
/** Size of chunk in bytes */
- private final int size;
+ protected final int size;
/**
* Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap.
@@ -60,23 +60,7 @@ public class Chunk {
* constructed the chunk. It is thread-safe against other threads calling alloc(), who will block
* until the allocation is complete.
*/
- public void init() {
- assert nextFreeOffset.get() == UNINITIALIZED;
- try {
- if (data == null) {
- data = new byte[size];
- }
- } catch (OutOfMemoryError e) {
- boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
- assert failInit; // should be true.
- throw e;
- }
- // Mark that it's ready for use
- boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
- // We should always succeed the above CAS since only one thread
- // calls init()!
- Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
- }
+ public abstract void init();
/**
* Reset the offset to UNINITIALIZED before before reusing an old chunk
@@ -109,7 +93,7 @@ public class Chunk {
return -1;
}
- if (oldOffset + size > data.length) {
+ if (oldOffset + size > data.capacity()) {
return -1; // alloc doesn't fit
}
@@ -126,14 +110,14 @@ public class Chunk {
/**
* @return This chunk's backing data.
*/
- byte[] getData() {
+ ByteBuffer getData() {
return this.data;
}
@Override
public String toString() {
return "Chunk@" + System.identityHashCode(this) + " allocs=" + allocCount.get() + "waste="
- + (data.length - nextFreeOffset.get());
+ + (data.capacity() - nextFreeOffset.get());
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java
index 1d237d0..1c7dfe2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java
@@ -29,7 +29,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult;
import org.apache.hadoop.hbase.util.RollingStatCalculator;
@@ -109,6 +109,9 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
private float globalMemStorePercentMaxRange;
private float blockCachePercentMinRange;
private float blockCachePercentMaxRange;
+
+ private float globalMemStoreLimitLowMarkPercent;
+
// Store statistics about the corresponding parameters for memory tuning
private RollingStatCalculator rollingStatsForCacheMisses;
private RollingStatCalculator rollingStatsForFlushes;
@@ -165,11 +168,9 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
newTuneDirection = StepDirection.NEUTRAL;
}
// Increase / decrease the memstore / block cahce sizes depending on new tuner step.
- float globalMemstoreLowerMark = HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf,
- curMemstoreSize);
// We don't want to exert immediate pressure on memstore. So, we decrease its size gracefully;
// we set a minimum bar in the middle of the total memstore size and the lower limit.
- float minMemstoreSize = ((globalMemstoreLowerMark + 1) * curMemstoreSize) / 2.00f;
+ float minMemstoreSize = ((globalMemStoreLimitLowMarkPercent + 1) * curMemstoreSize) / 2.00f;
switch (newTuneDirection) {
case INCREASE_BLOCK_CACHE_SIZE:
@@ -365,9 +366,11 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
this.blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY,
conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT));
this.globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY,
- HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false));
+ MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
this.globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY,
- HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false));
+ MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
+ this.globalMemStoreLimitLowMarkPercent = MemorySizeUtil.getGlobalMemStoreHeapLowerMark(conf,
+ true);
// Default value of periods to ignore is number of lookup periods
this.numPeriodsToIgnore = conf.getInt(NUM_PERIODS_TO_IGNORE, this.tunerLookupPeriods);
this.rollingStatsForCacheMisses = new RollingStatCalculator(this.tunerLookupPeriods);
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f074b0e..44350c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7028,7 +7028,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreDataSize.get() * 100) / this
.memstoreFlushSize)));
- stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100);
+ if (rsServices.getHeapMemoryManager() != null) {
+ stats.setHeapOccupancy(
+ (int) rsServices.getHeapMemoryManager().getHeapOccupancyPercent() * 100);
+ }
stats.setCompactionPressure((int)rsServices.getCompactionPressure()*100 > 100 ? 100 :
(int)rsServices.getCompactionPressure()*100);
return stats.build();
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 8e78422..56fc6eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryType;
import java.lang.management.MemoryUsage;
import java.lang.reflect.Constructor;
import java.net.BindException;
@@ -100,6 +101,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
@@ -170,6 +172,7 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.JSONBean;
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Threads;
@@ -516,6 +519,7 @@ public class HRegionServer extends HasThread implements
super("RegionServer"); // thread name
this.fsOk = true;
this.conf = conf;
+ MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
HFile.checkHFileVersion(this.conf);
checkCodecs(this.conf);
this.userProvider = UserProvider.instantiate(conf);
@@ -1451,6 +1455,8 @@ public class HRegionServer extends HasThread implements
startServiceThreads();
startHeapMemoryManager();
+ // Call it after starting HeapMemoryManager.
+ initializeMemStoreChunkPool();
LOG.info("Serving as " + this.serverName +
", RpcServer on " + rpcServices.isa +
", sessionid=0x" +
@@ -1470,16 +1476,34 @@ public class HRegionServer extends HasThread implements
}
}
+ private void initializeMemStoreChunkPool() {
+ if (MemStoreLAB.isEnabled(conf)) {
+ // MSLAB is enabled. So initialize MemStoreChunkPool
+ // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
+ // it.
+ Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemstoreSize(conf);
+ long globalMemStoreSize = pair.getFirst();
+ boolean offheap = pair.getSecond() == MemoryType.NON_HEAP;
+ // When off heap memstore in use, take full area for chunk pool.
+ float poolSizePercentage = offheap ? 1.0F
+ : conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
+ float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
+ MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
+ int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
+ MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage,
+ initialCountPercentage, chunkSize, offheap);
+ if (pool != null && this.hMemManager != null) {
+ // Register with Heap Memory manager
+ this.hMemManager.registerTuneObserver(pool);
+ }
+ }
+ }
+
private void startHeapMemoryManager() {
- this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher,
- this, this.regionServerAccounting);
+ this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this,
+ this.regionServerAccounting);
if (this.hMemManager != null) {
this.hMemManager.start(getChoreService());
- MemStoreChunkPool chunkPool = MemStoreChunkPool.getPool(this.conf);
- if (chunkPool != null) {
- // Register it as HeapMemoryTuneObserver
- this.hMemManager.registerTuneObserver(chunkPool);
- }
}
}
@@ -3523,11 +3547,6 @@ public class HRegionServer extends HasThread implements
}
@Override
- public HeapMemoryManager getHeapMemoryManager() {
- return hMemManager;
- }
-
- @Override
public double getCompactionPressure() {
double max = 0;
for (Region region : onlineRegions.values()) {
@@ -3541,6 +3560,11 @@ public class HRegionServer extends HasThread implements
return max;
}
+ @Override
+ public HeapMemoryManager getHeapMemoryManager() {
+ return hMemManager;
+ }
+
/**
* For testing
* @return whether all wal roll request finished for this regionserver
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
deleted file mode 100644
index 99b2bb6..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.MemStoreChunkPool.PooledChunk;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-/**
- * A memstore-local allocation buffer.
- * <p>
- * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
- * big (2MB) byte[] chunks from and then doles it out to threads that request
- * slices into the array.
- * <p>
- * The purpose of this class is to combat heap fragmentation in the
- * regionserver. By ensuring that all KeyValues in a given memstore refer
- * only to large chunks of contiguous memory, we ensure that large blocks
- * get freed up when the memstore is flushed.
- * <p>
- * Without the MSLAB, the byte array allocated during insertion end up
- * interleaved throughout the heap, and the old generation gets progressively
- * more fragmented until a stop-the-world compacting collection occurs.
- * <p>
- * TODO: we should probably benchmark whether word-aligning the allocations
- * would provide a performance improvement - probably would speed up the
- * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
- * anyway
- */
-@InterfaceAudience.Private
-public class HeapMemStoreLAB implements MemStoreLAB {
-
- static final String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
- static final int CHUNK_SIZE_DEFAULT = 2048 * 1024;
- static final String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
- static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through
- // allocator
-
- static final Log LOG = LogFactory.getLog(HeapMemStoreLAB.class);
-
- private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
- // A queue of chunks from pool contained by this memstore LAB
- // TODO: in the future, it would be better to have List implementation instead of Queue,
- // as FIFO order is not so important here
- @VisibleForTesting
- BlockingQueue<PooledChunk> pooledChunkQueue = null;
- private final int chunkSize;
- private final int maxAlloc;
- private final MemStoreChunkPool chunkPool;
-
- // This flag is for closing this instance, its set when clearing snapshot of
- // memstore
- private volatile boolean closed = false;
- // This flag is for reclaiming chunks. Its set when putting chunks back to
- // pool
- private AtomicBoolean reclaimed = new AtomicBoolean(false);
- // Current count of open scanners which reading data from this MemStoreLAB
- private final AtomicInteger openScannerCount = new AtomicInteger();
-
- // Used in testing
- public HeapMemStoreLAB() {
- this(new Configuration());
- }
-
- public HeapMemStoreLAB(Configuration conf) {
- chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
- maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
- this.chunkPool = MemStoreChunkPool.getPool(conf);
- // currently chunkQueue is only used for chunkPool
- if (this.chunkPool != null) {
- // set queue length to chunk pool max count to avoid keeping reference of
- // too many non-reclaimable chunks
- pooledChunkQueue = new LinkedBlockingQueue<PooledChunk>(chunkPool.getMaxCount());
- }
-
- // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
- Preconditions.checkArgument(maxAlloc <= chunkSize,
- MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
- }
-
-
- @Override
- public Cell copyCellInto(Cell cell) {
- int size = KeyValueUtil.length(cell);
- Preconditions.checkArgument(size >= 0, "negative size");
- // Callers should satisfy large allocations directly from JVM since they
- // don't cause fragmentation as badly.
- if (size > maxAlloc) {
- return null;
- }
- Chunk c = null;
- int allocOffset = 0;
- while (true) {
- c = getOrMakeChunk();
- // Try to allocate from this chunk
- allocOffset = c.alloc(size);
- if (allocOffset != -1) {
- // We succeeded - this is the common case - small alloc
- // from a big buffer
- break;
- }
- // not enough space!
- // try to retire this chunk
- tryRetireChunk(c);
- }
- return KeyValueUtil.copyCellTo(cell, c.getData(), allocOffset, size);
- }
-
- /**
- * Close this instance since it won't be used any more, try to put the chunks
- * back to pool
- */
- @Override
- public void close() {
- this.closed = true;
- // We could put back the chunks to pool for reusing only when there is no
- // opening scanner which will read their data
- if (chunkPool != null && openScannerCount.get() == 0
- && reclaimed.compareAndSet(false, true)) {
- chunkPool.putbackChunks(this.pooledChunkQueue);
- }
- }
-
- /**
- * Called when opening a scanner on the data of this MemStoreLAB
- */
- @Override
- public void incScannerCount() {
- this.openScannerCount.incrementAndGet();
- }
-
- /**
- * Called when closing a scanner on the data of this MemStoreLAB
- */
- @Override
- public void decScannerCount() {
- int count = this.openScannerCount.decrementAndGet();
- if (this.closed && chunkPool != null && count == 0
- && reclaimed.compareAndSet(false, true)) {
- chunkPool.putbackChunks(this.pooledChunkQueue);
- }
- }
-
- /**
- * Try to retire the current chunk if it is still
- * <code>c</code>. Postcondition is that curChunk.get()
- * != c
- * @param c the chunk to retire
- * @return true if we won the race to retire the chunk
- */
- private void tryRetireChunk(Chunk c) {
- curChunk.compareAndSet(c, null);
- // If the CAS succeeds, that means that we won the race
- // to retire the chunk. We could use this opportunity to
- // update metrics on external fragmentation.
- //
- // If the CAS fails, that means that someone else already
- // retired the chunk for us.
- }
-
- /**
- * Get the current chunk, or, if there is no current chunk,
- * allocate a new one from the JVM.
- */
- private Chunk getOrMakeChunk() {
- while (true) {
- // Try to get the chunk
- Chunk c = curChunk.get();
- if (c != null) {
- return c;
- }
-
- // No current chunk, so we want to allocate one. We race
- // against other allocators to CAS in an uninitialized chunk
- // (which is cheap to allocate)
- if (chunkPool != null) {
- c = chunkPool.getChunk();
- }
- boolean pooledChunk = false;
- if (c != null) {
- // This is chunk from pool
- pooledChunk = true;
- } else {
- c = new Chunk(chunkSize);
- }
- if (curChunk.compareAndSet(null, c)) {
- // we won race - now we need to actually do the expensive
- // allocation step
- c.init();
- if (pooledChunk) {
- if (!this.closed && !this.pooledChunkQueue.offer((PooledChunk) c)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
- + pooledChunkQueue.size());
- }
- }
- }
- return c;
- } else if (pooledChunk) {
- chunkPool.putbackChunk((PooledChunk) c);
- }
- // someone else won race - that's fine, we'll try to grab theirs
- // in the next iteration of the loop.
- }
- }
-
- @VisibleForTesting
- Chunk getCurrentChunk() {
- return this.curChunk.get();
- }
-
-
- BlockingQueue<PooledChunk> getPooledChunks() {
- return this.pooledChunkQueue;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
index 7646293..a2f546a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
@@ -36,13 +36,15 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
-import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.annotations.VisibleForTesting;
/**
- * Manages tuning of Heap memory using <code>HeapMemoryTuner</code>.
+ * Manages tuning of Heap memory using <code>HeapMemoryTuner</code>. Most part of the heap memory is
+ * split between Memstores and BlockCache. This manager helps in tuning sizes of both these
+ * dynamically, as per the R/W load on the servers.
*/
@InterfaceAudience.Private
public class HeapMemoryManager {
@@ -91,7 +93,7 @@ public class HeapMemoryManager {
private List<HeapMemoryTuneObserver> tuneObservers = new ArrayList<HeapMemoryTuneObserver>();
public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher,
- Server server, RegionServerAccounting regionServerAccounting) {
+ Server server, RegionServerAccounting regionServerAccounting) {
ResizableBlockCache l1Cache = CacheConfig.getL1(conf);
if (l1Cache != null) {
return new HeapMemoryManager(l1Cache, memStoreFlusher, server, regionServerAccounting);
@@ -117,10 +119,10 @@ public class HeapMemoryManager {
private boolean doInit(Configuration conf) {
boolean tuningEnabled = true;
- globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false);
+ globalMemStorePercent = MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false);
blockCachePercent = conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY,
HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
- HeapMemorySizeUtil.checkForClusterFreeMemoryLimit(conf);
+ MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(conf);
// Initialize max and min range for memstore heap space
globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY,
globalMemStorePercent);
@@ -128,14 +130,14 @@ public class HeapMemoryManager {
globalMemStorePercent);
if (globalMemStorePercent < globalMemStorePercentMinRange) {
LOG.warn("Setting " + MEMSTORE_SIZE_MIN_RANGE_KEY + " to " + globalMemStorePercent
- + ", same value as " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
+ + ", same value as " + MemorySizeUtil.MEMSTORE_SIZE_KEY
+ " because supplied value greater than initial memstore size value.");
globalMemStorePercentMinRange = globalMemStorePercent;
conf.setFloat(MEMSTORE_SIZE_MIN_RANGE_KEY, globalMemStorePercentMinRange);
}
if (globalMemStorePercent > globalMemStorePercentMaxRange) {
LOG.warn("Setting " + MEMSTORE_SIZE_MAX_RANGE_KEY + " to " + globalMemStorePercent
- + ", same value as " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
+ + ", same value as " + MemorySizeUtil.MEMSTORE_SIZE_KEY
+ " because supplied value less than initial memstore size value.");
globalMemStorePercentMaxRange = globalMemStorePercent;
conf.setFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, globalMemStorePercentMaxRange);
@@ -167,7 +169,7 @@ public class HeapMemoryManager {
}
int gml = (int) (globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE);
- this.l2BlockCachePercent = HeapMemorySizeUtil.getL2BlockCacheHeapPercent(conf);
+ this.l2BlockCachePercent = MemorySizeUtil.getL2BlockCacheHeapPercent(conf);
int bcul = (int) ((blockCachePercentMinRange + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
@@ -340,7 +342,7 @@ public class HeapMemoryManager {
if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
LOG.info("Current heap configuration from HeapMemoryTuner exceeds "
+ "the threshold required for successful cluster operation. "
- + "The combined value cannot exceed 0.8. " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
+ + "The combined value cannot exceed 0.8. " + MemorySizeUtil.MEMSTORE_SIZE_KEY
+ " is " + memstoreSize + " and " + HFILE_BLOCK_CACHE_SIZE_KEY + " is "
+ blockCacheSize);
// TODO can adjust the value so as not exceed 80%. Is that correct? may be.
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
index db2cd18..926dd7a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
@@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import java.lang.management.ManagementFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -29,8 +28,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
import org.apache.hadoop.util.StringUtils;
@@ -45,7 +42,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* collection on JVM.
*
* The pool instance is globally unique and could be obtained through
- * {@link MemStoreChunkPool#getPool(Configuration)}
+ * {@link MemStoreChunkPool#initialize(long, float, float, int, boolean)}
*
* {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating
* bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called
@@ -55,10 +52,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
@InterfaceAudience.Private
public class MemStoreChunkPool implements HeapMemoryTuneObserver {
private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class);
- final static String CHUNK_POOL_MAXSIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize";
- final static String CHUNK_POOL_INITIALSIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize";
- final static float POOL_MAX_SIZE_DEFAULT = 1.0f;
- final static float POOL_INITIAL_SIZE_DEFAULT = 0.0f;
// Static reference to the MemStoreChunkPool
static MemStoreChunkPool GLOBAL_INSTANCE;
@@ -68,7 +61,7 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
private int maxCount;
// A queue of reclaimed chunks
- private final BlockingQueue<PooledChunk> reclaimedChunks;
+ private final BlockingQueue<Chunk> reclaimedChunks;
private final int chunkSize;
private final float poolSizePercentage;
@@ -78,15 +71,17 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
private static final int statThreadPeriod = 60 * 5;
private final AtomicLong chunkCount = new AtomicLong();
private final AtomicLong reusedChunkCount = new AtomicLong();
+ private final boolean offheap;
- MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount,
- int initialCount, float poolSizePercentage) {
+ MemStoreChunkPool(int chunkSize, int maxCount, int initialCount, float poolSizePercentage,
+ boolean offheap) {
this.maxCount = maxCount;
this.chunkSize = chunkSize;
this.poolSizePercentage = poolSizePercentage;
- this.reclaimedChunks = new LinkedBlockingQueue<PooledChunk>();
+ this.offheap = offheap;
+ this.reclaimedChunks = new LinkedBlockingQueue<>();
for (int i = 0; i < initialCount; i++) {
- PooledChunk chunk = new PooledChunk(chunkSize);
+ Chunk chunk = this.offheap ? new OffheapChunk(chunkSize) : new OnheapChunk(chunkSize);
chunk.init();
reclaimedChunks.add(chunk);
}
@@ -108,8 +103,8 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
* @see #putbackChunk(Chunk)
* @see #putbackChunks(BlockingQueue)
*/
- PooledChunk getChunk() {
- PooledChunk chunk = reclaimedChunks.poll();
+ Chunk getChunk() {
+ Chunk chunk = reclaimedChunks.poll();
if (chunk != null) {
chunk.reset();
reusedChunkCount.incrementAndGet();
@@ -118,7 +113,7 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
while (true) {
long created = this.chunkCount.get();
if (created < this.maxCount) {
- chunk = new PooledChunk(chunkSize);
+ chunk = this.offheap ? new OffheapChunk(this.chunkSize) : new OnheapChunk(this.chunkSize);
if (this.chunkCount.compareAndSet(created, created + 1)) {
break;
}
@@ -135,9 +130,9 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
* skip the remaining chunks
* @param chunks
*/
- synchronized void putbackChunks(BlockingQueue<PooledChunk> chunks) {
+ synchronized void putbackChunks(BlockingQueue<Chunk> chunks) {
int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
- PooledChunk chunk = null;
+ Chunk chunk = null;
while ((chunk = chunks.poll()) != null && toAdd > 0) {
reclaimedChunks.add(chunk);
toAdd--;
@@ -149,7 +144,7 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
* skip it
* @param chunk
*/
- synchronized void putbackChunk(PooledChunk chunk) {
+ synchronized void putbackChunk(Chunk chunk) {
if (reclaimedChunks.size() < this.maxCount) {
reclaimedChunks.add(chunk);
}
@@ -191,51 +186,41 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
}
/**
- * @param conf
* @return the global MemStoreChunkPool instance
*/
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DC_DOUBLECHECK",
- justification="Intentional")
- static MemStoreChunkPool getPool(Configuration conf) {
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC",
+ justification = "Method is called by single thread at the starting of RS")
+ static MemStoreChunkPool initialize(long globalMemStoreSize, float poolSizePercentage,
+ float initialCountPercentage, int chunkSize, boolean offheap) {
if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE;
+ if (chunkPoolDisabled) return null;
- synchronized (MemStoreChunkPool.class) {
- if (chunkPoolDisabled) return null;
- if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE;
- // When MSLAB is turned OFF no need to init chunk pool at all.
- if (!conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
- chunkPoolDisabled = true;
- return null;
- }
- float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, POOL_MAX_SIZE_DEFAULT);
- if (poolSizePercentage <= 0) {
- chunkPoolDisabled = true;
- return null;
- }
- if (poolSizePercentage > 1.0) {
- throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
- }
- long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
- long globalMemStoreLimit = (long) (heapMax * HeapMemorySizeUtil.getGlobalMemStorePercent(conf,
- false));
- int chunkSize = conf.getInt(HeapMemStoreLAB.CHUNK_SIZE_KEY,
- HeapMemStoreLAB.CHUNK_SIZE_DEFAULT);
- int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize);
-
- float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY,
- POOL_INITIAL_SIZE_DEFAULT);
- if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
- throw new IllegalArgumentException(CHUNK_POOL_INITIALSIZE_KEY
- + " must be between 0.0 and 1.0");
- }
-
- int initialCount = (int) (initialCountPercentage * maxCount);
- LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
- + ", max count " + maxCount + ", initial count " + initialCount);
- GLOBAL_INSTANCE = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount,
- poolSizePercentage);
- return GLOBAL_INSTANCE;
+ if (poolSizePercentage <= 0) {
+ chunkPoolDisabled = true;
+ return null;
+ }
+ if (poolSizePercentage > 1.0) {
+ throw new IllegalArgumentException(
+ MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
+ }
+ int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize);
+ if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
+ throw new IllegalArgumentException(
+ MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0");
}
+ int initialCount = (int) (initialCountPercentage * maxCount);
+ LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
+ + ", max count " + maxCount + ", initial count " + initialCount);
+ GLOBAL_INSTANCE = new MemStoreChunkPool(chunkSize, maxCount, initialCount, poolSizePercentage,
+ offheap);
+ return GLOBAL_INSTANCE;
+ }
+
+ /**
+ * @return The singleton instance of this pool.
+ */
+ static MemStoreChunkPool getPool() {
+ return GLOBAL_INSTANCE;
}
int getMaxCount() {
@@ -247,12 +232,6 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
chunkPoolDisabled = false;
}
- public static class PooledChunk extends Chunk {
- PooledChunk(int size) {
- super(size);
- }
- }
-
@Override
public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize);
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 2f4d225..15cf97c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -24,7 +24,7 @@ import com.google.common.base.Preconditions;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
-import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryType;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
@@ -49,11 +49,12 @@ import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
@@ -109,12 +110,21 @@ class MemStoreFlusher implements FlushRequester {
this.conf = conf;
this.server = server;
this.threadWakeFrequency =
- conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
- long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
- float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
- this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
- this.globalMemStoreLimitLowMarkPercent =
- HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
+ conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
+ Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemstoreSize(conf);
+ this.globalMemStoreLimit = pair.getFirst();
+ boolean onheap = pair.getSecond() == MemoryType.HEAP;
+ // When off heap memstore in use we configure the global off heap space for memstore as bytes
+ // not as % of max memory size. In such case, the lower water mark should be specified using the
+ // key "hbase.regionserver.global.memstore.size.lower.limit" which says % of the global upper
+ // bound and defaults to 95%. In on heap case also specifying this way is ideal. But in the past
+ // we used to take lower bound also as the % of xmx (38% as default). For backward compatibility
+ // for this deprecated config,we will fall back to read that config when new one is missing.
+ // Only for on heap case, do this fallback mechanism. For off heap it makes no sense.
+ // TODO When to get rid of the deprecated config? ie
+ // "hbase.regionserver.global.memstore.lowerLimit". Can get rid of this boolean passing then.
+ this.globalMemStoreLimitLowMarkPercent = MemorySizeUtil.getGlobalMemStoreHeapLowerMark(conf,
+ onheap);
this.globalMemStoreLimitLowMark =
(long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
@@ -126,7 +136,7 @@ class MemStoreFlusher implements FlushRequester {
+ TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1)
+ ", globalMemStoreLimitLowMark="
+ TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1)
- + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1));
+ + ", Offheap=" + !onheap);
}
public LongAdder getUpdatesBlockedMsHighWater() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
index 706e243..f6d1607 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
@@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
* A memstore-local allocation buffer.
@@ -46,6 +48,19 @@ public interface MemStoreLAB {
String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
boolean USEMSLAB_DEFAULT = true;
+ String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
+
+ String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
+ int CHUNK_SIZE_DEFAULT = 2048 * 1024;
+ String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
+ int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through
+ // allocator
+
+ // MSLAB pool related configs
+ String CHUNK_POOL_MAXSIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize";
+ String CHUNK_POOL_INITIALSIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize";
+ float POOL_MAX_SIZE_DEFAULT = 1.0f;
+ float POOL_INITIAL_SIZE_DEFAULT = 0.0f;
/**
* Allocates slice in this LAB and copy the passed Cell into this area. Returns new Cell instance
@@ -68,4 +83,17 @@ public interface MemStoreLAB {
*/
void decScannerCount();
+ public static MemStoreLAB newInstance(Configuration conf) {
+ MemStoreLAB memStoreLAB = null;
+ if (isEnabled(conf)) {
+ String className = conf.get(MSLAB_CLASS_NAME, MemStoreLABImpl.class.getName());
+ memStoreLAB = ReflectionUtils.instantiateWithCustomCtor(className,
+ new Class[] { Configuration.class }, new Object[] { conf });
+ }
+ return memStoreLAB;
+ }
+
+ public static boolean isEnabled(Configuration conf) {
+ return conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
new file mode 100644
index 0000000..30e4311
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -0,0 +1,243 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * A memstore-local allocation buffer.
+ * <p>
+ * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
+ * big (2MB) byte[] chunks from and then doles it out to threads that request
+ * slices into the array.
+ * <p>
+ * The purpose of this class is to combat heap fragmentation in the
+ * regionserver. By ensuring that all Cells in a given memstore refer
+ * only to large chunks of contiguous memory, we ensure that large blocks
+ * get freed up when the memstore is flushed.
+ * <p>
+ * Without the MSLAB, the byte array allocated during insertion end up
+ * interleaved throughout the heap, and the old generation gets progressively
+ * more fragmented until a stop-the-world compacting collection occurs.
+ * <p>
+ * TODO: we should probably benchmark whether word-aligning the allocations
+ * would provide a performance improvement - probably would speed up the
+ * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
+ * anyway.
+ * The chunks created by this MemStoreLAB can get pooled at {@link MemStoreChunkPool}.
+ * When the Chunk comes pool, it can be either an on heap or an off heap backed chunk. The chunks,
+ * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be
+ * always on heap backed.
+ */
+@InterfaceAudience.Private
+public class MemStoreLABImpl implements MemStoreLAB {
+
+ static final Log LOG = LogFactory.getLog(MemStoreLABImpl.class);
+
+ private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
+ // A queue of chunks from pool contained by this memstore LAB
+ // TODO: in the future, it would be better to have List implementation instead of Queue,
+ // as FIFO order is not so important here
+ @VisibleForTesting
+ BlockingQueue<Chunk> pooledChunkQueue = null;
+ private final int chunkSize;
+ private final int maxAlloc;
+ private final MemStoreChunkPool chunkPool;
+
+ // This flag is for closing this instance, its set when clearing snapshot of
+ // memstore
+ private volatile boolean closed = false;
+ // This flag is for reclaiming chunks. Its set when putting chunks back to
+ // pool
+ private AtomicBoolean reclaimed = new AtomicBoolean(false);
+ // Current count of open scanners which reading data from this MemStoreLAB
+ private final AtomicInteger openScannerCount = new AtomicInteger();
+
+ // Used in testing
+ public MemStoreLABImpl() {
+ this(new Configuration());
+ }
+
+ public MemStoreLABImpl(Configuration conf) {
+ chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
+ maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
+ this.chunkPool = MemStoreChunkPool.getPool();
+ // currently chunkQueue is only used for chunkPool
+ if (this.chunkPool != null) {
+ // set queue length to chunk pool max count to avoid keeping reference of
+ // too many non-reclaimable chunks
+ pooledChunkQueue = new LinkedBlockingQueue<>(chunkPool.getMaxCount());
+ }
+
+ // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
+ Preconditions.checkArgument(maxAlloc <= chunkSize,
+ MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
+ }
+
+
+ @Override
+ public Cell copyCellInto(Cell cell) {
+ int size = KeyValueUtil.length(cell);
+ Preconditions.checkArgument(size >= 0, "negative size");
+ // Callers should satisfy large allocations directly from JVM since they
+ // don't cause fragmentation as badly.
+ if (size > maxAlloc) {
+ return null;
+ }
+ Chunk c = null;
+ int allocOffset = 0;
+ while (true) {
+ c = getOrMakeChunk();
+ // Try to allocate from this chunk
+ allocOffset = c.alloc(size);
+ if (allocOffset != -1) {
+ // We succeeded - this is the common case - small alloc
+ // from a big buffer
+ break;
+ }
+ // not enough space!
+ // try to retire this chunk
+ tryRetireChunk(c);
+ }
+ return CellUtil.copyCellTo(cell, c.getData(), allocOffset, size);
+ }
+
+ /**
+ * Close this instance since it won't be used any more, try to put the chunks
+ * back to pool
+ */
+ @Override
+ public void close() {
+ this.closed = true;
+ // We could put back the chunks to pool for reusing only when there is no
+ // opening scanner which will read their data
+ if (chunkPool != null && openScannerCount.get() == 0
+ && reclaimed.compareAndSet(false, true)) {
+ chunkPool.putbackChunks(this.pooledChunkQueue);
+ }
+ }
+
+ /**
+ * Called when opening a scanner on the data of this MemStoreLAB
+ */
+ @Override
+ public void incScannerCount() {
+ this.openScannerCount.incrementAndGet();
+ }
+
+ /**
+ * Called when closing a scanner on the data of this MemStoreLAB
+ */
+ @Override
+ public void decScannerCount() {
+ int count = this.openScannerCount.decrementAndGet();
+ if (this.closed && chunkPool != null && count == 0
+ && reclaimed.compareAndSet(false, true)) {
+ chunkPool.putbackChunks(this.pooledChunkQueue);
+ }
+ }
+
+ /**
+ * Try to retire the current chunk if it is still
+ * <code>c</code>. Postcondition is that curChunk.get()
+ * != c
+ * @param c the chunk to retire
+ * @return true if we won the race to retire the chunk
+ */
+ private void tryRetireChunk(Chunk c) {
+ curChunk.compareAndSet(c, null);
+ // If the CAS succeeds, that means that we won the race
+ // to retire the chunk. We could use this opportunity to
+ // update metrics on external fragmentation.
+ //
+ // If the CAS fails, that means that someone else already
+ // retired the chunk for us.
+ }
+
+ /**
+ * Get the current chunk, or, if there is no current chunk,
+ * allocate a new one from the JVM.
+ */
+ private Chunk getOrMakeChunk() {
+ while (true) {
+ // Try to get the chunk
+ Chunk c = curChunk.get();
+ if (c != null) {
+ return c;
+ }
+
+ // No current chunk, so we want to allocate one. We race
+ // against other allocators to CAS in an uninitialized chunk
+ // (which is cheap to allocate)
+ if (chunkPool != null) {
+ c = chunkPool.getChunk();
+ }
+ boolean pooledChunk = false;
+ if (c != null) {
+ // This is chunk from pool
+ pooledChunk = true;
+ } else {
+ c = new OnheapChunk(chunkSize);// When chunk is not from pool, always make it as on heap.
+ }
+ if (curChunk.compareAndSet(null, c)) {
+ // we won race - now we need to actually do the expensive
+ // allocation step
+ c.init();
+ if (pooledChunk) {
+ if (!this.closed && !this.pooledChunkQueue.offer(c)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
+ + pooledChunkQueue.size());
+ }
+ }
+ }
+ return c;
+ } else if (pooledChunk) {
+ chunkPool.putbackChunk(c);
+ }
+ // someone else won race - that's fine, we'll try to grab theirs
+ // in the next iteration of the loop.
+ }
+ }
+
+ @VisibleForTesting
+ Chunk getCurrentChunk() {
+ return this.curChunk.get();
+ }
+
+
+ BlockingQueue<Chunk> getPooledChunks() {
+ return this.pooledChunkQueue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
new file mode 100644
index 0000000..ed98cfa
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An off heap chunk implementation.
+ */
+@InterfaceAudience.Private
+public class OffheapChunk extends Chunk {
+
+ OffheapChunk(int size) {
+ super(size);
+ }
+
+ @Override
+ public void init() {
+ assert nextFreeOffset.get() == UNINITIALIZED;
+ try {
+ if (data == null) {
+ data = ByteBuffer.allocateDirect(this.size);
+ }
+ } catch (OutOfMemoryError e) {
+ boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
+ assert failInit; // should be true.
+ throw e;
+ }
+ // Mark that it's ready for use
+ boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
+ // We should always succeed the above CAS since only one thread
+ // calls init()!
+ Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
new file mode 100644
index 0000000..bd33cb5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An on heap chunk implementation.
+ */
+@InterfaceAudience.Private
+public class OnheapChunk extends Chunk {
+
+ OnheapChunk(int size) {
+ super(size);
+ }
+
+ public void init() {
+ assert nextFreeOffset.get() == UNINITIALIZED;
+ try {
+ if (data == null) {
+ data = ByteBuffer.allocate(this.size);
+ }
+ } catch (OutOfMemoryError e) {
+ boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
+ assert failInit; // should be true.
+ throw e;
+ }
+ // Mark that it's ready for use
+ boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
+ // We should always succeed the above CAS since only one thread
+ // calls init()!
+ Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 554b29a..a61a9f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -45,7 +45,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ByteBufferedCell;
+import org.apache.hadoop.hbase.ByteBufferCell;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
@@ -1174,8 +1174,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// Since byte buffers can point all kinds of crazy places it's harder to keep track
// of which blocks are kept alive by what byte buffer.
// So we make a guess.
- if (c instanceof ByteBufferedCell) {
- ByteBufferedCell bbCell = (ByteBufferedCell) c;
+ if (c instanceof ByteBufferCell) {
+ ByteBufferCell bbCell = (ByteBufferCell) c;
ByteBuffer bb = bbCell.getValueByteBuffer();
if (bb != lastBlock) {
context.incrementResponseBlockSize(bb.capacity());
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
index fa8860a..01e07ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
import java.io.IOException;
import java.util.ArrayList;
@@ -35,8 +34,6 @@ import java.util.List;
@InterfaceAudience.Private
public final class SegmentFactory {
- static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
-
private SegmentFactory() {}
private static SegmentFactory instance = new SegmentFactory();
@@ -47,7 +44,7 @@ public final class SegmentFactory {
// create skip-list-based (non-flat) immutable segment from compacting old immutable segments
public ImmutableSegment createImmutableSegment(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator) {
- return new ImmutableSegment(comparator, iterator, getMemStoreLAB(conf));
+ return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf));
}
// create new flat immutable segment from compacting old immutable segments
@@ -57,7 +54,7 @@ public final class SegmentFactory {
throws IOException {
Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED,
"wrong immutable segment type");
- MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
+ MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf);
return
// the last parameter "false" means not to merge, but to compact the pipeline
// in order to create the new segment
@@ -77,7 +74,7 @@ public final class SegmentFactory {
// create mutable segment
public MutableSegment createMutableSegment(final Configuration conf, CellComparator comparator) {
- MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
+ MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf);
return generateMutableSegment(conf, comparator, memStoreLAB);
}
@@ -103,16 +100,6 @@ public final class SegmentFactory {
return new MutableSegment(set, comparator, memStoreLAB);
}
- private MemStoreLAB getMemStoreLAB(Configuration conf) {
- MemStoreLAB memStoreLAB = null;
- if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
- String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
- memStoreLAB = ReflectionUtils.instantiateWithCustomCtor(className,
- new Class[] { Configuration.class }, new Object[] { conf });
- }
- return memStoreLAB;
- }
-
private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List<ImmutableSegment> segments) {
List<MemStoreLAB> mslabs = new ArrayList<MemStoreLAB>();
for (ImmutableSegment segment : segments) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index aa57881..e7c4f98 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.DrainBarrier;
@@ -386,8 +386,7 @@ public abstract class AbstractFSWAL<W> implements WAL {
this.logrollsize = (long) (blocksize
* conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
- float memstoreRatio = conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, conf.getFloat(
- HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY, HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE));
+ float memstoreRatio = MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false);
boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null;
if (maxLogsDefined) {
LOG.warn("'hbase.regionserver.maxlogs' was deprecated.");
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueFilter.java
index 059d717..48a5b6f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueFilter.java
@@ -27,7 +27,7 @@ import java.util.regex.Pattern;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TestCellUtil.ByteBufferedCellImpl;
+import org.apache.hadoop.hbase.TestCellUtil.ByteBufferCellImpl;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.testclassification.FilterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -108,7 +108,7 @@ public class TestSingleColumnValueFilter {
assertTrue("less than", filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW);
filter.reset();
byte[] buffer = kv.getBuffer();
- Cell c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ Cell c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("less than", filter.filterKeyValue(c) == Filter.ReturnCode.NEXT_ROW);
filter.reset();
@@ -117,7 +117,7 @@ public class TestSingleColumnValueFilter {
assertTrue("Equals 100", filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW);
filter.reset();
buffer = kv.getBuffer();
- c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("Equals 100", filter.filterKeyValue(c) == Filter.ReturnCode.NEXT_ROW);
filter.reset();
@@ -126,7 +126,7 @@ public class TestSingleColumnValueFilter {
assertTrue("include 120", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
filter.reset();
buffer = kv.getBuffer();
- c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("include 120", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
}
@@ -135,29 +135,29 @@ public class TestSingleColumnValueFilter {
KeyValue kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_2);
assertTrue("basicFilter1", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
byte[] buffer = kv.getBuffer();
- Cell c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ Cell c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("basicFilter1", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_3);
assertTrue("basicFilter2", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
buffer = kv.getBuffer();
- c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("basicFilter2", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_4);
assertTrue("basicFilter3", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
buffer = kv.getBuffer();
- c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("basicFilter3", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
assertFalse("basicFilterNotNull", filter.filterRow());
filter.reset();
kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1);
assertTrue("basicFilter4", filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW);
buffer = kv.getBuffer();
- c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("basicFilter4", filter.filterKeyValue(c) == Filter.ReturnCode.NEXT_ROW);
kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_2);
assertTrue("basicFilter4", filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW);
buffer = kv.getBuffer();
- c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("basicFilter4", filter.filterKeyValue(c) == Filter.ReturnCode.NEXT_ROW);
assertFalse("basicFilterAllRemaining", filter.filterAllRemaining());
assertTrue("basicFilterNotNull", filter.filterRow());
@@ -166,12 +166,12 @@ public class TestSingleColumnValueFilter {
kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1);
assertTrue("basicFilter5", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
buffer = kv.getBuffer();
- c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("basicFilter5", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_2);
assertTrue("basicFilter5", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
buffer = kv.getBuffer();
- c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("basicFilter5", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
assertFalse("basicFilterNotNull", filter.filterRow());
}
@@ -181,14 +181,14 @@ public class TestSingleColumnValueFilter {
KeyValue kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, FULLSTRING_1);
assertTrue("null1", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
byte[] buffer = kv.getBuffer();
- Cell c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ Cell c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("null1", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
assertFalse("null1FilterRow", filter.filterRow());
filter.reset();
kv = new KeyValue(ROW, COLUMN_FAMILY, Bytes.toBytes("qual2"), FULLSTRING_2);
assertTrue("null2", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
buffer = kv.getBuffer();
- c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("null2", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
assertTrue("null2FilterRow", filter.filterRow());
}
@@ -200,13 +200,13 @@ public class TestSingleColumnValueFilter {
assertTrue("substrTrue",
filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
byte[] buffer = kv.getBuffer();
- Cell c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ Cell c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("substrTrue", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER,
FULLSTRING_2);
assertTrue("substrFalse", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
buffer = kv.getBuffer();
- c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("substrFalse", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
assertFalse("substrFilterAllRemaining", filter.filterAllRemaining());
assertFalse("substrFilterNotNull", filter.filterRow());
@@ -219,13 +219,13 @@ public class TestSingleColumnValueFilter {
assertTrue("regexTrue",
filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
byte[] buffer = kv.getBuffer();
- Cell c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ Cell c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("regexTrue", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER,
FULLSTRING_2);
assertTrue("regexFalse", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
buffer = kv.getBuffer();
- c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("regexFalse", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
assertFalse("regexFilterAllRemaining", filter.filterAllRemaining());
assertFalse("regexFilterNotNull", filter.filterRow());
@@ -238,7 +238,7 @@ public class TestSingleColumnValueFilter {
assertTrue("regexTrue",
filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
byte[] buffer = kv.getBuffer();
- Cell c = new ByteBufferedCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
+ Cell c = new ByteBufferCellImpl(ByteBuffer.wrap(buffer), 0, buffer.length);
assertTrue("regexTrue", filter.filterKeyValue(c) == Filter.ReturnCode.INCLUDE);
assertFalse("regexFilterAllRemaining", filter.filterAllRemaining());
assertFalse("regexFilterNotNull", filter.filterRow());
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
index 62506ad..3b4d068 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
@@ -44,7 +44,6 @@ public class TestCellFlatSet extends TestCase {
private Cell descCells[];
private CellArrayMap descCbOnHeap;
private final static Configuration CONF = new Configuration();
- private HeapMemStoreLAB mslab;
private KeyValue lowerOuterCell;
private KeyValue upperOuterCell;
@@ -73,9 +72,8 @@ public class TestCellFlatSet extends TestCase {
descCells = new Cell[] {kv4,kv3,kv2,kv1};
descCbOnHeap = new CellArrayMap(CellComparator.COMPARATOR,descCells,0,NUM_OF_CELLS,true);
CONF.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
- CONF.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
+ CONF.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
MemStoreChunkPool.chunkPoolDisabled = false;
- mslab = new HeapMemStoreLAB(CONF);
}
/* Create and test CellSet based on CellArrayMap */
http://git-wip-us.apache.org/repos/asf/hbase/blob/86e17858/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 4f2b12f..d1bbd50 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -87,7 +89,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
super.internalSetUp();
Configuration conf = new Configuration();
conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
- conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
+ conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf);
HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
@@ -95,7 +97,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
this.regionServicesForStores = region.getRegionServicesForStores();
this.store = new HStore(region, hcd, conf);
- chunkPool = MemStoreChunkPool.getPool(conf);
+ long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
+ .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
+ chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f,
+ MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false);
assertTrue(chunkPool != null);
}