You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/06/30 20:18:29 UTC
svn commit: r1498167 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/
Author: liyin
Date: Sun Jun 30 18:18:28 2013
New Revision: 1498167
URL: http://svn.apache.org/r1498167
Log:
[HBASE-8837] Initial commit of SLAB allocator for the MemStore.
Author: shaneh
Summary:
Slab allocator for HBase MemStore. Uses direct byte buffers
to manage a slab cache for the MemStore to address fragmentation
issue that causes stop the world garbage collections in the JVM.
Test Plan: Tested on HBASEDEV139 using YCSB to generate load.
Reviewers: liyintang, rshroff, manukranthk, adela
Reviewed By: manukranthk
CC: hbase-eng@, arice
Differential Revision: https://phabricator.fb.com/D835989
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1498167&r1=1498166&r2=1498167&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Sun Jun 30 18:18:28 2013
@@ -790,6 +790,26 @@ public final class HConstants {
public static final String HBASE_ENABLE_QOS_KEY = "hbase.enable.qos";
public static final String HBASE_ENABLE_SYNCFILERANGE_THROTTLING_KEY = "hbase.enable.syncfilerange.throttling";
+ /*
+ * MSLAB Constants
+ */
+ public final static String MSLAB_CHUNK_POOL_MAX_SIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize";
+ public final static String MSLAB_CHUNK_POOL_INITIAL_SIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize";
+ public final static float MSLAB_POOL_MAX_SIZE_DEFAULT = 0.0f;
+ public final static float MSLAB_POOL_INITIAL_SIZE_DEFAULT = 0.0f;
+
+ public final static String MSLAB_CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
+ public final static int MSLAB_CHUNK_SIZE_DEFAULT = 2 * 1024 * 1024;
+
+ public final static String MSLAB_MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
+ public final static int MSLAB_MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through allocator
+
+ public final static String MSLAB_MAX_SIZE_KEY = "hbase.hregion.memstore.mslab.max.size";
+ public final static float MSLAB_MAX_SIZE_DEFAULT = 1.25f; // Stop using SLAB if larger than this percentage of memstore size
+
+ public final static float MSLAB_PCT_LOWER_LIMIT = 0.0f;
+ public final static float MSLAB_PCT_UPPER_LIMIT = 2.0f;
+
private HConstants() {
// Can't be instantiated with this constructor.
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1498167&r1=1498166&r2=1498167&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Sun Jun 30 18:18:28 2013
@@ -33,6 +33,8 @@ import java.util.concurrent.locks.Reentr
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
@@ -55,6 +57,12 @@ import org.apache.hadoop.hbase.util.Coll
public class MemStore implements HeapSize {
private static final Log LOG = LogFactory.getLog(MemStore.class);
+ static final String USE_MSLAB_KEY =
+ "hbase.hregion.memstore.mslab.enabled";
+ private static final boolean USE_MSLAB_DEFAULT = false;
+
+ private Configuration conf;
+
// MemStore. Use a KeyValueSkipListSet rather than SkipListSet because of the
// better semantics. The Map will overwrite if passed a key it already had
// whereas the Set will not add new KV if key is same though value might be
@@ -84,18 +92,28 @@ public class MemStore implements HeapSiz
TimeRangeTracker timeRangeTracker;
TimeRangeTracker snapshotTimeRangeTracker;
+ MemStoreChunkPool chunkPool;
+ volatile MemStoreLAB allocator;
+ volatile MemStoreLAB snapshotAllocator;
+
+ // Keep track of the total size of KVs not just the
+ // bytes stored in the MSLAB
+ private final AtomicLong successfullyAllocatedKvBytes;
+
/**
* Default constructor. Used for tests.
*/
public MemStore() {
- this(KeyValue.COMPARATOR);
+ this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
}
/**
* Constructor.
* @param c Comparator
*/
- public MemStore(final KeyValue.KVComparator c) {
+ public MemStore(final Configuration conf,
+ final KeyValue.KVComparator c) {
+ this.conf = conf;
this.comparator = c;
this.comparatorIgnoreTimestamp =
this.comparator.getComparatorIgnoringTimestamps();
@@ -107,6 +125,17 @@ public class MemStore implements HeapSiz
this.size = new AtomicLong(DEEP_OVERHEAD);
this.numDeletesInKvSet = new AtomicLong(0);
this.numDeletesInSnapshot = new AtomicLong(0);
+
+ this.successfullyAllocatedKvBytes = new AtomicLong(0);
+
+ if (conf.getBoolean(USE_MSLAB_KEY, USE_MSLAB_DEFAULT)) {
+ this.chunkPool = MemStoreChunkPool.getPool(conf);
+ this.allocator = new MemStoreLAB(conf, chunkPool);
+ } else {
+ this.allocator = null;
+ this.chunkPool = null;
+ }
+
}
void dump() {
@@ -137,6 +166,13 @@ public class MemStore implements HeapSiz
this.kvset = new KeyValueSkipListSet(this.comparator);
this.snapshotTimeRangeTracker = this.timeRangeTracker;
this.timeRangeTracker = new TimeRangeTracker();
+
+ this.snapshotAllocator = this.allocator;
+ // Reset allocator so we get a fresh buffer for the new memstore
+ if (allocator != null) {
+ this.allocator = new MemStoreLAB(conf, chunkPool);
+ }
+
// Reset heap to not include any keys
this.size.set(DEEP_OVERHEAD);
this.numDeletesInSnapshot = numDeletesInKvSet;
@@ -168,6 +204,7 @@ public class MemStore implements HeapSiz
*/
void clearSnapshot(final SortedSet<KeyValue> ss)
throws UnexpectedException {
+ MemStoreLAB tmpAllocator = null;
this.lock.writeLock().lock();
try {
if (this.snapshot != ss) {
@@ -180,9 +217,16 @@ public class MemStore implements HeapSiz
this.snapshot = new KeyValueSkipListSet(this.comparator);
this.snapshotTimeRangeTracker = new TimeRangeTracker();
}
+ if (this.snapshotAllocator != null) {
+ tmpAllocator = this.snapshotAllocator;
+ this.snapshotAllocator = null;
+ }
} finally {
this.lock.writeLock().unlock();
}
+ if (tmpAllocator != null) {
+ tmpAllocator.close();
+ }
}
/**
@@ -194,10 +238,11 @@ public class MemStore implements HeapSiz
long s = -1;
this.lock.readLock().lock();
try {
- s = heapSizeChange(kv, this.kvset.add(kv));
- timeRangeTracker.includeTimestamp(kv);
+ KeyValue toAdd = maybeCloneWithAllocator(kv);
+ s = heapSizeChange(toAdd, this.kvset.add(toAdd));
+ timeRangeTracker.includeTimestamp(toAdd);
this.size.addAndGet(s);
- if (kv.isDelete()) {
+ if (toAdd.isDelete()) {
this.numDeletesInKvSet.incrementAndGet();
}
} finally {
@@ -206,25 +251,33 @@ public class MemStore implements HeapSiz
return s;
}
+ private KeyValue maybeCloneWithAllocator(KeyValue kv) {
+ if (allocator == null) {
+ return kv;
+ }
+
+ int len = kv.getLength();
+ MemStoreLAB.Allocation alloc = allocator.allocateBytes(len);
+ if (alloc == null) {
+ // The allocator decided not to do anything.
+ return kv;
+ }
+ System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
+ KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
+ newKv.setMemstoreTS(kv.getMemstoreTS());
+ this.successfullyAllocatedKvBytes.addAndGet(newKv.heapSize());
+ return newKv;
+
+ }
+
+
/**
* Write a delete
* @param delete
* @return approximate size of the passed key and value.
*/
long delete(final KeyValue delete) {
- long s = 0;
- this.lock.readLock().lock();
- try {
- s += heapSizeChange(delete, this.kvset.add(delete));
- timeRangeTracker.includeTimestamp(delete);
- } finally {
- this.lock.readLock().unlock();
- }
- if (delete.isDelete()) {
- this.numDeletesInKvSet.incrementAndGet();
- }
- this.size.addAndGet(s);
- return s;
+ return add(delete);
}
/**
@@ -524,6 +577,10 @@ public class MemStore implements HeapSiz
volatile KeyValueSkipListSet kvsetAtCreation;
volatile KeyValueSkipListSet snapshotAtCreation;
+ // The allocator and snapshot allocator at the time of creating this scanner
+ volatile MemStoreLAB allocatorAtCreation;
+ volatile MemStoreLAB snapshotAllocatorAtCreation;
+
/*
Some notes...
@@ -548,6 +605,15 @@ public class MemStore implements HeapSiz
kvsetAtCreation = kvset;
snapshotAtCreation = snapshot;
+ if (allocator != null) {
+ this.allocatorAtCreation = allocator;
+ this.allocatorAtCreation.incScannerCount();
+ }
+ if (snapshotAllocator != null) {
+ this.snapshotAllocatorAtCreation = snapshotAllocator;
+ this.snapshotAllocatorAtCreation.incScannerCount();
+ }
+
//DebugPrint.println(" MS new@" + hashCode());
}
@@ -664,6 +730,15 @@ public class MemStore implements HeapSiz
this.kvsetIt = null;
this.snapshotIt = null;
+
+ if (allocatorAtCreation != null) {
+ this.allocatorAtCreation.decScannerCount();
+ this.allocatorAtCreation = null;
+ }
+ if (snapshotAllocatorAtCreation != null) {
+ this.snapshotAllocatorAtCreation.decScannerCount();
+ this.snapshotAllocatorAtCreation = null;
+ }
}
/**
@@ -719,7 +794,7 @@ public class MemStore implements HeapSiz
}
/**
- * Get the entire heap usage for this MemStore not including keys in the
+ * Get the entire heap successfullyAllocatedBytes for this MemStore not including keys in the
* snapshot.
*/
@Override
@@ -728,7 +803,7 @@ public class MemStore implements HeapSiz
}
/**
- * Get the heap usage of KVs in this MemStore.
+ * Get the heap successfullyAllocatedBytes of KVs in this MemStore.
*/
public long keySize() {
return heapSize() - DEEP_OVERHEAD;
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java?rev=1498167&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java Sun Jun 30 18:18:28 2013
@@ -0,0 +1,216 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Chunk;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A pool of {@link MemStoreLAB.Chunk } instances.
+ *
+ * MemStoreChunkPool caches a number of retired chunks for reusing, it could
+ * decrease allocating bytes when writing, thereby optimizing the garbage
+ * collection on JVM.
+ *
+ * The pool instance is globally unique and could be obtained through
+ * {@link MemStoreChunkPool#getPool(Configuration)}
+ *
+ * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating
+ * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called
+ * when MemStore clearing snapshot for flush
+ */
+@InterfaceAudience.Private
+public class MemStoreChunkPool {
+ private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class);
+
+ // Static reference to the MemStoreChunkPool
+ private static MemStoreChunkPool globalInstance;
+ /** Boolean whether we have disabled the memstore chunk pool entirely. */
+ static boolean chunkPoolDisabled = false;
+
+ private final int maxCount;
+
+ // A queue of reclaimed chunks
+ private final BlockingQueue<Chunk> reclaimedChunks;
+ private final int chunkSize;
+
+ /** Statistics thread schedule pool */
+ private final ScheduledExecutorService scheduleThreadPool;
+ /** Statistics thread */
+ private static final int statThreadPeriod = 60 * 5;
+ private AtomicLong createdChunkCount = new AtomicLong();
+ private AtomicLong reusedChunkCount = new AtomicLong();
+
+ MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount,
+ int initialCount) {
+ this.maxCount = maxCount;
+ this.chunkSize = chunkSize;
+ this.reclaimedChunks = new LinkedBlockingQueue<Chunk>();
+ for (int i = 0; i < initialCount; i++) {
+ Chunk chunk = new Chunk(chunkSize);
+ chunk.init();
+ reclaimedChunks.add(chunk);
+ }
+ final String n = Thread.currentThread().getName();
+ scheduleThreadPool = Executors.newScheduledThreadPool(1,
+ new ThreadFactoryBuilder().setNameFormat(n+"-MemStoreChunkPool Statistics")
+ .setDaemon(true).build());
+ this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
+ statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Poll a chunk from the pool, reset it if not null, else create a new chunk
+ * to return
+ * @return a chunk
+ */
+ Chunk getChunk() {
+ Chunk chunk = reclaimedChunks.poll();
+ if (chunk == null) {
+ chunk = new Chunk(chunkSize);
+ createdChunkCount.incrementAndGet();
+ } else {
+ chunk.reset();
+ reusedChunkCount.incrementAndGet();
+ }
+ return chunk;
+ }
+
+ /**
+ * Add the chunks to the pool, when the pool achieves the max size, it will
+ * skip the remaining chunks
+ * @param chunks
+ */
+ void putbackChunks(BlockingQueue<Chunk> chunks) {
+ int maxNumToPutback = this.maxCount - reclaimedChunks.size();
+ if (maxNumToPutback <= 0) {
+ return;
+ }
+ chunks.drainTo(reclaimedChunks, maxNumToPutback);
+ }
+
+ /**
+ * Add the chunk to the pool, if the pool has achieved the max size, it will
+ * skip it
+ * @param chunk
+ */
+ void putbackChunk(Chunk chunk) {
+ if (reclaimedChunks.size() >= this.maxCount) {
+ return;
+ }
+ reclaimedChunks.add(chunk);
+ }
+
+ int getPoolSize() {
+ return this.reclaimedChunks.size();
+ }
+
+ /*
+ * Only used in testing
+ */
+ void clearChunks() {
+ this.reclaimedChunks.clear();
+ }
+
+ private static class StatisticsThread extends Thread {
+ MemStoreChunkPool mcp;
+
+ public StatisticsThread(MemStoreChunkPool mcp) {
+ super("MemStoreChunkPool.StatisticsThread");
+ setDaemon(true);
+ this.mcp = mcp;
+ }
+
+ @Override
+ public void run() {
+ mcp.logStats();
+ }
+ }
+
+ private void logStats() {
+ if (!LOG.isDebugEnabled()) return;
+ long created = createdChunkCount.get();
+ long reused = reusedChunkCount.get();
+ long total = created + reused;
+ LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
+ + ",created chunk count=" + created
+ + ",reused chunk count=" + reused
+ + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
+ (float) reused / (float) total, 2)));
+ }
+
+ /**
+ * @param conf
+ * @return the global MemStoreChunkPool instance
+ */
+ static synchronized MemStoreChunkPool getPool(Configuration conf) {
+ if (globalInstance != null) return globalInstance;
+ if (chunkPoolDisabled) return null;
+
+
+ float poolSizePercentage = conf.getFloat(HConstants.MSLAB_CHUNK_POOL_MAX_SIZE_KEY,
+ HConstants.MSLAB_POOL_MAX_SIZE_DEFAULT);
+ if (poolSizePercentage <= 0) {
+ chunkPoolDisabled = true;
+ return null;
+ }
+ if (poolSizePercentage > 1.0) {
+ throw new IllegalArgumentException(HConstants.MSLAB_CHUNK_POOL_MAX_SIZE_KEY
+ + " must be between 0.0 and 1.0");
+ }
+ long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
+ .getMax();
+ long globalMemStoreLimit = MemStoreFlusher.globalMemStoreLimit(heapMax,
+ MemStoreFlusher.DEFAULT_UPPER, MemStoreFlusher.UPPER_KEY, conf);
+ int chunkSize = conf.getInt(HConstants.MSLAB_CHUNK_SIZE_KEY,
+ HConstants.MSLAB_CHUNK_SIZE_DEFAULT);
+ int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize);
+
+ float initialCountPercentage = conf.getFloat(HConstants.MSLAB_CHUNK_POOL_INITIAL_SIZE_KEY,
+ HConstants.MSLAB_POOL_INITIAL_SIZE_DEFAULT);
+ if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
+ throw new IllegalArgumentException(HConstants.MSLAB_CHUNK_POOL_INITIAL_SIZE_KEY
+ + " must be between 0.0 and 1.0");
+ }
+
+ int initialCount = (int) (initialCountPercentage * maxCount);
+ LOG.info("Allocating MemStoreChunkPool with chunk size "
+ + StringUtils.byteDesc(chunkSize) + ", max count " + maxCount
+ + ", initial count " + initialCount);
+ globalInstance = new MemStoreChunkPool(conf, chunkSize, maxCount,
+ initialCount);
+ return globalInstance;
+ }
+
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1498167&r1=1498166&r2=1498167&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Sun Jun 30 18:18:28 2013
@@ -68,9 +68,9 @@ class MemStoreFlusher implements FlushRe
protected final long globalMemStoreLimit;
protected final long globalMemStoreLimitLowMark;
- private static final float DEFAULT_UPPER = 0.4f;
+ protected static final float DEFAULT_UPPER = 0.4f;
private static final float DEFAULT_LOWER = 0.25f;
- private static final String UPPER_KEY =
+ protected static final String UPPER_KEY =
"hbase.regionserver.global.memstore.upperLimit";
private static final String LOWER_KEY =
"hbase.regionserver.global.memstore.lowerLimit";
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java?rev=1498167&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java Sun Jun 30 18:18:28 2013
@@ -0,0 +1,389 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+
+/**
+ * A memstore-local allocation buffer.
+ * <p>
+ * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
+ * big (2MB) byte[] chunks from and then doles it out to threads that request
+ * slices into the array.
+ * <p>
+ * The purpose of this class is to combat heap fragmentation in the
+ * regionserver. By ensuring that all KeyValues in a given memstore refer
+ * only to large chunks of contiguous memory, we ensure that large blocks
+ * get freed up when the memstore is flushed.
+ * <p>
+ * Without the MSLAB, the byte array allocated during insertion end up
+ * interleaved throughout the heap, and the old generation gets progressively
+ * more fragmented until a stop-the-world compacting collection occurs.
+ * <p>
+ * TODO: we should probably benchmark whether word-aligning the allocations
+ * would provide a performance improvement - probably would speed up the
+ * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
+ * anyway
+ */
+@InterfaceAudience.Private
+public class MemStoreLAB {
+ private static final Log LOG = LogFactory.getLog(MemStoreLAB.class);
+ private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
+ // A queue of chunks contained by this memstore
+ private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
+
+ final int chunkSize;
+ final int maxAlloc;
+ final long slabLimit;
+
+ private final MemStoreChunkPool chunkPool;
+
+ // This flag is for closing this instance, its set when clearing snapshot of
+ // memstore
+ private volatile boolean closed = false;
+ // This flag is for reclaiming chunks. Its set when putting chunks back to
+ // pool
+ private AtomicBoolean reclaimed = new AtomicBoolean(false);
+ // Current count of open scanners which reading data from this MemStoreLAB
+ private final AtomicInteger openScannerCount = new AtomicInteger();
+
+ // Keep track of how many bytes in total have been set aside by the MemStoreLAB
+ private final AtomicLong totalSLABAllocatedBytes = new AtomicLong(0l);
+
+ // Keep track of how many successful allocations
+ private final AtomicLong numSuccessfulAllocations = new AtomicLong(0);
+ // Keep track of how many bytes have been used
+ private final AtomicLong successfullyAllocatedBytes = new AtomicLong(0);
+ // Keep track of the number of failed allocations
+ private final AtomicLong numDeniedAllocations = new AtomicLong(0);
+ // Keep track of the number of failed allocation bytes
+ private final AtomicLong deniedAllocationBytes = new AtomicLong(0);
+
+ // Used in testing
+ public MemStoreLAB() {
+ this(new Configuration());
+ }
+
+ private MemStoreLAB(Configuration conf) {
+ this(conf, MemStoreChunkPool.getPool(conf));
+ }
+
+ public MemStoreLAB(Configuration conf, MemStoreChunkPool pool) {
+ chunkSize = conf.getInt(HConstants.MSLAB_CHUNK_SIZE_KEY, HConstants.MSLAB_CHUNK_SIZE_DEFAULT);
+ maxAlloc = conf.getInt(HConstants.MSLAB_MAX_ALLOC_KEY, HConstants.MSLAB_MAX_ALLOC_DEFAULT);
+ float slabLimitPct = conf.getFloat(HConstants.MSLAB_MAX_SIZE_KEY, HConstants.MSLAB_MAX_SIZE_DEFAULT);
+ if (slabLimitPct < HConstants.MSLAB_PCT_LOWER_LIMIT || slabLimitPct > HConstants.MSLAB_PCT_UPPER_LIMIT) {
+ LOG.warn(HConstants.MSLAB_MAX_SIZE_KEY + " must be in the range " + HConstants.MSLAB_PCT_LOWER_LIMIT +
+ " to " + HConstants.MSLAB_PCT_UPPER_LIMIT + " inclusive.");
+ slabLimitPct = HConstants.MSLAB_MAX_SIZE_DEFAULT;
+ }
+
+ this.chunkPool = pool;
+
+ slabLimit = (long) (slabLimitPct * conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+ HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE));
+
+ // if we don't exclude allocations >CHUNK_SIZE, we'd infinite loop on one!
+ Preconditions.checkArgument(
+ maxAlloc <= chunkSize,
+ HConstants.MSLAB_MAX_ALLOC_KEY + " must be less than " + HConstants.MSLAB_CHUNK_SIZE_KEY);
+ }
+
+ /**
+ * Allocate a slice of the given length.
+ *
+ * If the size is larger than the maximum size specified for this
+ * allocator, returns null.
+ */
+ protected Allocation allocateBytes(int size) {
+ Preconditions.checkArgument(size >= 0, "negative size");
+
+ // Callers should satisfy large allocations directly from JVM since they
+ // don't cause fragmentation as badly.
+ if (size > maxAlloc) {
+ this.numDeniedAllocations.incrementAndGet();
+ this.deniedAllocationBytes.addAndGet(size);
+ return null;
+ }
+
+ // Check if the SLAB allocator is allocating too much memory
+ if (this.totalSLABAllocatedBytes.get() >= slabLimit) {
+ LOG.warn("MSLAB has reached the maximum size of " + slabLimit + " and will no longer be used for allocations.");
+ return null;
+ }
+
+ while (true) {
+ Chunk c = getOrMakeChunk();
+
+ // Try to allocate from this chunk
+ int allocOffset = c.alloc(size);
+ if (allocOffset != -1) {
+ // We succeeded - this is the common case - small alloc
+ // from a big buffer
+ this.numSuccessfulAllocations.incrementAndGet();
+ this.successfullyAllocatedBytes.addAndGet(size);
+ return new Allocation(c.data, allocOffset);
+ }
+
+ // not enough space!
+ // try to retire this chunk
+ tryRetireChunk(c);
+ }
+ }
+
+ /**
+ * Close this instance since it won't be used any more, try to put the chunks
+ * back to pool
+ */
+ void close() {
+ this.closed = true;
+
+ // Get the stats of the final chunk
+ Chunk c = curChunk.get();
+
+ // We could put back the chunks to pool for reusing only when there is no
+ // opening scanner which will read their data
+ if (chunkPool != null && openScannerCount.get() == 0
+ && reclaimed.compareAndSet(false, true)) {
+ chunkPool.putbackChunks(this.chunkQueue);
+ }
+ }
+
+ /**
+ * Called when opening a scanner on the data of this MemStoreLAB
+ */
+ void incScannerCount() {
+ this.openScannerCount.incrementAndGet();
+ }
+
+ /**
+ * Called when closing a scanner on the data of this MemStoreLAB
+ */
+ void decScannerCount() {
+ int count = this.openScannerCount.decrementAndGet();
+
+ if (chunkPool != null && count == 0 && this.closed
+ && reclaimed.compareAndSet(false, true)) {
+ chunkPool.putbackChunks(this.chunkQueue);
+ }
+
+ assert count >= 0;
+ }
+
+ /**
+ * Try to retire the current chunk if it is still
+ * <code>c</code>. Postcondition is that curChunk.get()
+ * != c
+ */
+ private void tryRetireChunk(Chunk c) {
+ if(curChunk.compareAndSet(c, null)) {
+ // We retired the chunk so get the stats
+ //this.usage.addAndGet(c.nextFreeOffset.get());
+ //this.totalSLABAllocatedBytes.addAndGet(c.size);
+ //this.totalAllocations.addAndGet(c.allocCount.get());
+ }
+ // If the CAS succeeds, that means that we won the race
+ // to retire the chunk. We could use this opportunity to
+ // update metrics on external fragmentation.
+ //
+ // If the CAS fails, that means that someone else already
+ // retired the chunk for us.
+ }
+
+ /**
+ * Get the current chunk, or, if there is no current chunk,
+ * allocate a new one from the JVM.
+ */
+ private Chunk getOrMakeChunk() {
+ while (true) {
+ // Try to get the chunk
+ Chunk c = curChunk.get();
+ if (c != null) {
+ return c;
+ }
+
+ // No current chunk, so we want to allocate one. We race
+ // against other allocators to CAS in an uninitialized chunk
+ // (which is cheap to allocate)
+ c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
+ if (curChunk.compareAndSet(null, c)) {
+ this.totalSLABAllocatedBytes.addAndGet(c.size);
+ // we won race - now we need to actually do the expensive
+ // allocation step
+ c.init();
+ this.chunkQueue.add(c);
+ return c;
+ } else if (chunkPool != null) {
+ chunkPool.putbackChunk(c);
+ }
+ // someone else won race - that's fine, we'll try to grab theirs
+ // in the next iteration of the loop.
+ }
+ }
+
+ /**
+ * A chunk of memory out of which allocations are sliced.
+ */
+ static class Chunk {
+ /** Actual underlying data */
+ private byte[] data;
+
+ private static final int UNINITIALIZED = -1;
+ private static final int OOM = -2;
+ /**
+ * Offset for the next allocation, or the sentinel value -1
+ * which implies that the chunk is still uninitialized.
+ * */
+ private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
+
+ /** Total number of allocations satisfied from this buffer */
+ private AtomicInteger allocCount = new AtomicInteger();
+
+ /** Size of chunk in bytes */
+ private final int size;
+
+ /**
+ * Create an uninitialized chunk. Note that memory is not allocated yet, so
+ * this is cheap.
+ * @param size in bytes
+ */
+ Chunk(int size) {
+ this.size = size;
+ }
+
+ /**
+ * Actually claim the memory for this chunk. This should only be called from
+ * the thread that constructed the chunk. It is thread-safe against other
+ * threads calling alloc(), who will block until the allocation is complete.
+ */
+ protected void init() {
+ assert nextFreeOffset.get() == UNINITIALIZED;
+ try {
+ if (data == null) {
+ data = new byte[size];
+ }
+ } catch (OutOfMemoryError e) {
+ boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
+ assert failInit; // should be true.
+ throw e;
+ }
+ // Mark that it's ready for use
+ boolean initialized = nextFreeOffset.compareAndSet(
+ UNINITIALIZED, 0);
+ // We should always succeed the above CAS since only one thread
+ // calls init()!
+ Preconditions.checkState(initialized,
+ "Multiple threads tried to init same chunk");
+ }
+
+ /**
+ * Reset the offset to UNINITIALIZED before before reusing an old chunk
+ */
+ void reset() {
+ if (nextFreeOffset.get() != UNINITIALIZED) {
+ nextFreeOffset.set(UNINITIALIZED);
+ allocCount.set(0);
+ }
+ }
+
+ /**
+ * Try to allocate <code>size</code> bytes from the chunk.
+ * @return the offset of the successful allocation, or -1 to indicate not-enough-space
+ */
+ public int alloc(int size) {
+ while (true) {
+ int oldOffset = nextFreeOffset.get();
+ if (oldOffset == UNINITIALIZED) {
+ // The chunk doesn't have its data allocated yet.
+ // Since we found this in curChunk, we know that whoever
+ // CAS-ed it there is allocating it right now. So spin-loop
+ // shouldn't spin long!
+ Thread.yield();
+ continue;
+ }
+ if (oldOffset == OOM) {
+ // doh we ran out of ram. return -1 to chuck this away.
+ return -1;
+ }
+
+ if (oldOffset + size > data.length) {
+ return -1; // alloc doesn't fit
+ }
+
+ // Try to atomically claim this chunk
+ if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
+ // we got the alloc
+ allocCount.incrementAndGet();
+ return oldOffset;
+ }
+ // we raced and lost alloc, try again
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Chunk@" + System.identityHashCode(this) +
+ " allocs=" + allocCount.get() + "waste=" +
+ (data.length - nextFreeOffset.get());
+ }
+ }
+
+ /**
+ * The result of a single allocation. Contains the chunk that the
+ * allocation points into, and the offset in this array where the
+ * slice begins.
+ */
+ public static class Allocation {
+ private final byte[] data;
+ private final int offset;
+
+ private Allocation(byte[] data, int off) {
+ this.data = data;
+ this.offset = off;
+ }
+
+ @Override
+ public String toString() {
+ return "Allocation(" + "totalSLABAllocatedBytes=" + data.length + ", off=" + offset
+ + ")";
+ }
+
+ byte[] getData() {
+ return data;
+ }
+
+ int getOffset() {
+ return offset;
+ }
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1498167&r1=1498166&r2=1498167&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sun Jun 30 18:18:28 2013
@@ -188,7 +188,7 @@ public class Store extends SchemaConfigu
* @param region
* @param family HColumnDescriptor for this column
* @param fs file system object
- * @param conf configuration object
+ * @param confParam configuration object
* failed. Can be null.
* @throws IOException
*/
@@ -250,7 +250,7 @@ public class Store extends SchemaConfigu
LOG.info("time to purge deletes set to " + timeToPurgeDeletes +
"ms in store " + this);
- this.memstore = new MemStore(this.comparator);
+ this.memstore = new MemStore(conf, this.comparator);
this.storeNameStr = getColumnFamilyName();
// Setting up cache configuration for this family
@@ -698,7 +698,9 @@ public class Store extends SchemaConfigu
/**
* Snapshot this stores memstore. Call before running
- * {@link #flushCache(long, SortedSet<KeyValue>)} so it has some work to do.
+ * {@link #flushCache(long, java.util.SortedSet, TimeRangeTracker,
+ * org.apache.hadoop.hbase.monitoring.MonitoredTask)}
+ * so it has some work to do.
*/
void snapshot() {
this.memstore.snapshot();
@@ -999,18 +1001,18 @@ public class Store extends SchemaConfigu
* Existing StoreFiles are not destroyed until the new compacted StoreFile is
* completely written-out to disk.
*
- * @param CompactionRequest
+ * @param compactionRequest
* compaction details obtained from requestCompaction()
* @throws IOException
*/
- void compact(CompactionRequest cr) throws IOException {
- if (cr == null || cr.getFiles().isEmpty()) {
+ void compact(CompactionRequest compactionRequest) throws IOException {
+ if (compactionRequest == null || compactionRequest.getFiles().isEmpty()) {
return;
}
- Preconditions.checkArgument(cr.getStore().toString()
+ Preconditions.checkArgument(compactionRequest.getStore().toString()
.equals(this.toString()));
- List<StoreFile> filesToCompact = cr.getFiles();
+ List<StoreFile> filesToCompact = compactionRequest.getFiles();
synchronized (filesCompacting) {
// sanity check: we're compacting files that this store knows about
@@ -1023,31 +1025,32 @@ public class Store extends SchemaConfigu
// Ready to go. Have list of files to compact.
MonitoredTask status = TaskMonitor.get().createStatus(
- (cr.isMajor() ? "Major " : "") + "Compaction (ID: " + cr.getCompactSelectionID() + ") of "
+ (compactionRequest.isMajor() ? "Major " : "")
+ + "Compaction (ID: " + compactionRequest.getCompactSelectionID() + ") of "
+ this.storeNameStr + " on "
+ this.region.getRegionInfo().getRegionNameAsString());
- LOG.info("Starting compaction (ID: " + cr.getCompactSelectionID() + ") of "
+ LOG.info("Starting compaction (ID: " + compactionRequest.getCompactSelectionID() + ") of "
+ filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ " into " + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
- + StringUtils.humanReadableInt(cr.getSize()));
+ + StringUtils.humanReadableInt(compactionRequest.getSize()));
StoreFile sf = null;
try {
status.setStatus("Compacting " + filesToCompact.size() + " file(s)");
long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
- StoreFile.Writer writer = compactStores(filesToCompact, cr.isMajor(), maxId);
+ StoreFile.Writer writer = compactStores(filesToCompact, compactionRequest.isMajor(), maxId);
// Move the compaction into place.
sf = completeCompaction(filesToCompact, writer);
// Report that the compaction is complete.
status.markComplete("Completed compaction");
- LOG.info("Completed" + (cr.isMajor() ? " major " : " ")
- + "compaction (ID: " + cr.getCompactSelectionID() + ") of "
+ LOG.info("Completed" + (compactionRequest.isMajor() ? " major " : " ")
+ + "compaction (ID: " + compactionRequest.getCompactSelectionID() + ") of "
+ filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ "; This selection was in queue for "
- + StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()) + ", and took "
+ + StringUtils.formatTimeDiff(compactionStartTime, compactionRequest.getSelectionTime()) + ", and took "
+ StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(),
compactionStartTime)
+ " to execute. New storefile name="
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=1498167&r1=1498166&r2=1498167&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java Sun Jun 30 18:18:28 2013
@@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
import java.rmi.UnexpectedException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -32,6 +34,7 @@ import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
@@ -475,7 +478,7 @@ public class TestMemStore extends TestCa
}
public void testMultipleVersionsSimple() throws Exception {
- MemStore m = new MemStore(KeyValue.COMPARATOR);
+ MemStore m = new MemStore(HBaseConfiguration.create() , KeyValue.COMPARATOR);
byte [] row = Bytes.toBytes("testRow");
byte [] family = Bytes.toBytes("testFamily");
byte [] qf = Bytes.toBytes("testQualifier");
@@ -495,7 +498,7 @@ public class TestMemStore extends TestCa
}
public void testBinary() throws IOException {
- MemStore mc = new MemStore(KeyValue.ROOT_COMPARATOR);
+ MemStore mc = new MemStore(HBaseConfiguration.create(), KeyValue.ROOT_COMPARATOR);
final int start = 43;
final int end = 46;
for (int k = start; k <= end; k++) {
@@ -763,7 +766,6 @@ public class TestMemStore extends TestCa
assertEquals(delete, memstore.kvset.first());
}
-
////////////////////////////////////
//Test for timestamps
////////////////////////////////////
@@ -796,6 +798,52 @@ public class TestMemStore extends TestCa
//assertTrue(!memstore.shouldSeek(scan));
}
+ ////////////////////////////////////
+ //Test for OOM with MSLAB
+ ////////////////////////////////////
+
+ /**
+ * Test a pathological pattern that shows why we can't currently
+ * use the MSLAB for upsert workloads. This test inserts data
+ * in the following pattern:
+ *
+ * - row0001 through row1000 (fills up one 2M Chunk)
+ * - row0002 through row1001 (fills up another 2M chunk, leaves one reference
+ * to the first chunk
+ * - row0003 through row1002 (another chunk, another dangling reference)
+ *
+ * This causes OOME pretty quickly if we use MSLAB for upsert
+ * since each 2M chunk is held onto by a single reference.
+ */
+ public void testUpsertMSLAB() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ conf.setBoolean(MemStore.USE_MSLAB_KEY, true);
+ memstore = new MemStore(conf, KeyValue.COMPARATOR);
+
+ int ROW_SIZE = 2048;
+ byte[] qualifier = new byte[ROW_SIZE - 4];
+
+ MemoryMXBean bean = ManagementFactory.getMemoryMXBean();
+ for (int i = 0; i < 3; i++) { System.gc(); }
+ long usageBefore = bean.getHeapMemoryUsage().getUsed();
+
+ long size = 0;
+ long ts=0;
+
+ for (int newValue = 0; newValue < 1000; newValue++) {
+ for (int row = newValue; row < newValue + 1000; row++) {
+ byte[] rowBytes = Bytes.toBytes(row);
+ size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts);
+ }
+ }
+ System.out.println("Wrote " + ts + " vals");
+ for (int i = 0; i < 3; i++) { System.gc(); }
+ long usageAfter = bean.getHeapMemoryUsage().getUsed();
+ System.out.println("Memory used: " + (usageAfter - usageBefore)
+ + " (heapsize: " + memstore.heapSize() +
+ " size: " + size + ")");
+ }
+
//////////////////////////////////////////////////////////////////////////////
// Helpers
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java?rev=1498167&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java Sun Jun 30 18:18:28 2013
@@ -0,0 +1,178 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.*;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MultithreadedTestUtil;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
+import org.junit.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.MapMaker;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+
+public class TestMemStoreLAB {
+
+ /**
+ * Test a bunch of random allocations
+ */
+ @Test
+ public void testLABRandomAllocation() {
+ Random rand = new Random();
+ MemStoreLAB mslab = new MemStoreLAB();
+ int expectedOff = 0;
+ byte[] lastBuffer = null;
+ // 100K iterations by 0-1K alloc -> 50MB expected
+ // should be reasonable for unit test and also cover wraparound
+ // behavior
+ for (int i = 0; i < 100000; i++) {
+ int size = rand.nextInt(1000);
+ Allocation alloc = mslab.allocateBytes(size);
+
+ if (alloc.getData() != lastBuffer) {
+ expectedOff = 0;
+ lastBuffer = alloc.getData();
+ }
+ assertEquals(expectedOff, alloc.getOffset());
+ assertTrue("Allocation " + alloc + " overruns buffer",
+ alloc.getOffset() + size <= alloc.getData().length);
+ expectedOff += size;
+ }
+ }
+
+ @Test
+ public void testLABLargeAllocation() {
+ MemStoreLAB mslab = new MemStoreLAB();
+ Allocation alloc = mslab.allocateBytes(HConstants.MSLAB_MAX_ALLOC_DEFAULT + 1);
+ assertNull("Allocation larger than the max allocation allowed shouldn't be satisfied by LAB.",
+ alloc);
+ }
+
+ /**
+ * Test allocation from lots of threads, making sure the results don't
+ * overlap in any way
+ */
+ @Test
+ public void testLABThreading() throws Exception {
+ Configuration conf = new Configuration();
+ MultithreadedTestUtil.TestContext ctx =
+ new MultithreadedTestUtil.TestContext(conf);
+
+ final AtomicInteger totalAllocated = new AtomicInteger();
+
+ final MemStoreLAB mslab = new MemStoreLAB();
+ List<List<AllocRecord>> allocations = Lists.newArrayList();
+
+ for (int i = 0; i < 10; i++) {
+ final List<AllocRecord> allocsByThisThread = Lists.newLinkedList();
+ allocations.add(allocsByThisThread);
+
+ TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
+ private Random r = new Random();
+ @Override
+ public void doAnAction() throws Exception {
+ int size = r.nextInt(1000);
+ Allocation alloc = mslab.allocateBytes(size);
+ totalAllocated.addAndGet(size);
+ allocsByThisThread.add(new AllocRecord(alloc, size));
+ }
+ };
+ ctx.addThread(t);
+ }
+
+ ctx.startThreads();
+ while (totalAllocated.get() < 50*1024*1024 && ctx.shouldRun()) {
+ Thread.sleep(10);
+ }
+ ctx.stop();
+
+ // Partition the allocations by the actual byte[] they point into,
+ // make sure offsets are unique for each chunk
+ Map<byte[], Map<Integer, AllocRecord>> mapsByChunk =
+ Maps.newHashMap();
+
+ int sizeCounted = 0;
+ for (AllocRecord rec : Iterables.concat(allocations)) {
+ sizeCounted += rec.size;
+ if (rec.size == 0) continue;
+
+ Map<Integer, AllocRecord> mapForThisByteArray =
+ mapsByChunk.get(rec.alloc.getData());
+ if (mapForThisByteArray == null) {
+ mapForThisByteArray = Maps.newTreeMap();
+ mapsByChunk.put(rec.alloc.getData(), mapForThisByteArray);
+ }
+ AllocRecord oldVal = mapForThisByteArray.put(rec.alloc.getOffset(), rec);
+ assertNull("Already had an entry " + oldVal + " for allocation " + rec,
+ oldVal);
+ }
+ assertEquals("Sanity check test", sizeCounted, totalAllocated.get());
+
+ // Now check each byte array to make sure allocations don't overlap
+ for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) {
+ int expectedOff = 0;
+ for (AllocRecord alloc : allocsInChunk.values()) {
+ assertEquals(expectedOff, alloc.alloc.getOffset());
+ assertTrue("Allocation " + alloc + " overruns buffer",
+ alloc.alloc.getOffset() + alloc.size <= alloc.alloc.getData().length);
+ expectedOff += alloc.size;
+ }
+ }
+
+ }
+
+ private static class AllocRecord implements Comparable<AllocRecord>{
+ private final Allocation alloc;
+ private final int size;
+ public AllocRecord(Allocation alloc, int size) {
+ super();
+ this.alloc = alloc;
+ this.size = size;
+ }
+
+ @Override
+ public int compareTo(AllocRecord e) {
+ if (alloc.getData() != e.alloc.getData()) {
+ throw new RuntimeException("Can only compare within a particular array");
+ }
+ return Ints.compare(alloc.getOffset(), e.alloc.getOffset());
+ }
+
+ @Override
+ public String toString() {
+ return "AllocRecord(alloc=" + alloc + ", size=" + size + ")";
+ }
+
+ }
+}