You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/04/26 22:52:20 UTC
[13/40] hbase git commit: HBASE-16438 Create a cell type so that
chunk id is embedded in it (Ram)
HBASE-16438 Create a cell type so that chunk id is embedded in it (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/972e8c8c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/972e8c8c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/972e8c8c
Branch: refs/heads/hbase-12439
Commit: 972e8c8c296d38507077b98c8fc2a33eda9fce66
Parents: 6e962d6
Author: Ramkrishna <ra...@intel.com>
Authored: Wed Apr 19 15:28:03 2017 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Wed Apr 19 15:28:03 2017 +0530
----------------------------------------------------------------------
.../java/org/apache/hadoop/hbase/CellUtil.java | 24 --
.../org/apache/hadoop/hbase/ExtendedCell.java | 10 +
.../org/apache/hadoop/hbase/master/HMaster.java | 2 +
.../hbase/regionserver/ByteBufferChunkCell.java | 48 +++
.../apache/hadoop/hbase/regionserver/Chunk.java | 60 ++-
.../hadoop/hbase/regionserver/ChunkCreator.java | 404 +++++++++++++++++++
.../hbase/regionserver/HRegionServer.java | 14 +-
.../hbase/regionserver/MemStoreChunkPool.java | 265 ------------
.../hadoop/hbase/regionserver/MemStoreLAB.java | 4 +-
.../hbase/regionserver/MemStoreLABImpl.java | 171 ++++----
.../regionserver/NoTagByteBufferChunkCell.java | 48 +++
.../hadoop/hbase/regionserver/OffheapChunk.java | 31 +-
.../hadoop/hbase/regionserver/OnheapChunk.java | 32 +-
.../hadoop/hbase/HBaseTestingUtility.java | 3 +
.../coprocessor/TestCoprocessorInterface.java | 4 +
.../TestRegionObserverScannerOpenHook.java | 3 +
.../coprocessor/TestRegionObserverStacking.java | 3 +
.../io/hfile/TestScannerFromBucketCache.java | 3 +
.../hadoop/hbase/master/TestCatalogJanitor.java | 7 +
.../hadoop/hbase/regionserver/TestBulkLoad.java | 2 +-
.../hbase/regionserver/TestCellFlatSet.java | 2 +-
.../regionserver/TestCompactingMemStore.java | 37 +-
.../TestCompactingToCellArrayMapMemStore.java | 16 +-
.../TestCompactionArchiveConcurrentClose.java | 1 +
.../TestCompactionArchiveIOException.java | 1 +
.../regionserver/TestCompactionPolicy.java | 1 +
.../hbase/regionserver/TestDefaultMemStore.java | 14 +-
.../regionserver/TestFailedAppendAndSync.java | 1 +
.../hbase/regionserver/TestHMobStore.java | 2 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 2 +
.../regionserver/TestHRegionReplayEvents.java | 2 +-
.../regionserver/TestMemStoreChunkPool.java | 48 +--
.../hbase/regionserver/TestMemStoreLAB.java | 119 +++---
.../TestMemstoreLABWithoutPool.java | 168 ++++++++
.../hbase/regionserver/TestRecoveredEdits.java | 1 +
.../hbase/regionserver/TestRegionIncrement.java | 1 +
.../regionserver/TestReversibleScanners.java | 7 +-
.../hadoop/hbase/regionserver/TestStore.java | 1 +
.../TestStoreFileRefresherChore.java | 1 +
.../hbase/regionserver/TestWALLockup.java | 1 +
.../TestWALMonotonicallyIncreasingSeqId.java | 1 +
.../regionserver/wal/AbstractTestFSWAL.java | 2 +
.../hbase/regionserver/wal/TestDurability.java | 3 +
.../hbase/regionserver/wal/TestFSHLog.java | 4 +-
44 files changed, 1055 insertions(+), 519 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index e1bc969..56de21b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -3135,28 +3135,4 @@ public final class CellUtil {
return Type.DeleteFamily.getCode();
}
}
-
- /**
- * Clone the passed cell by copying its data into the passed buf.
- */
- public static Cell copyCellTo(Cell cell, ByteBuffer buf, int offset, int len) {
- int tagsLen = cell.getTagsLength();
- if (cell instanceof ExtendedCell) {
- ((ExtendedCell) cell).write(buf, offset);
- } else {
- // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
- // other case also. The data fragments within Cell is copied into buf as in KeyValue
- // serialization format only.
- KeyValueUtil.appendTo(cell, buf, offset, true);
- }
- if (tagsLen == 0) {
- // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
- // which directly return tagsLen as 0. So we avoid parsing many length components in
- // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
- // call getTagsLength().
- return new NoTagsByteBufferKeyValue(buf, offset, len, cell.getSequenceId());
- } else {
- return new ByteBufferKeyValue(buf, offset, len, cell.getSequenceId());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
index 517873f..10f20ca 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.io.HeapSize;
public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestamp, HeapSize,
Cloneable {
+ public static int CELL_NOT_BASED_ON_CHUNK = -1;
/**
* Write this cell to an OutputStream in a {@link KeyValue} format.
* <br> KeyValue format <br>
@@ -73,4 +74,13 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam
* @return The deep cloned cell
*/
Cell deepClone();
+
+ /**
+ * Extracts the id of the backing bytebuffer of this cell if it was obtained from fixed sized
+ * chunks as in case of MemstoreLAB
+ * @return the chunk id if the cell is backed by fixed sized Chunks, else return -1
+ */
+ default int getChunkId() {
+ return CELL_NOT_BASED_ON_CHUNK;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index bb9f282..f9670e1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -748,6 +748,8 @@ public class HMaster extends HRegionServer implements MasterServices {
this.masterActiveTime = System.currentTimeMillis();
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
+ // Initialize the chunkCreator
+ initializeMemStoreChunkCreator();
this.fileSystemManager = new MasterFileSystem(this);
this.walManager = new MasterWalManager(this);
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java
new file mode 100644
index 0000000..a8f1000
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.ByteBufferKeyValue;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+
+/**
+ * ByteBuffer based cell which has the chunkid at the 0th offset
+ * @see MemStoreLAB
+ */
+//TODO : When moving this cell to CellChunkMap we will have the following things
+// to be serialized
+// chunkId (Integer) + offset (Integer) + length (Integer) + seqId (Long) = 20 bytes
+@InterfaceAudience.Private
+public class ByteBufferChunkCell extends ByteBufferKeyValue {
+ public ByteBufferChunkCell(ByteBuffer buf, int offset, int length) {
+ super(buf, offset, length);
+ }
+
+ public ByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) {
+ super(buf, offset, length, seqId);
+ }
+
+ @Override
+ public int getChunkId() {
+ // The chunkId is embedded at the 0th offset of the bytebuffer
+ return ByteBufferUtils.toInt(buf, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
index 2cbf0a3..fc4aa0b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
@@ -21,8 +21,10 @@ import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
/**
* A chunk of memory out of which allocations are sliced.
@@ -46,13 +48,41 @@ public abstract class Chunk {
/** Size of chunk in bytes */
protected final int size;
+ // The unique id associated with the chunk.
+ private final int id;
+
+ // indicates if the chunk is formed by ChunkCreator#MemstorePool
+ private final boolean fromPool;
+
+ /**
+ * Create an uninitialized chunk. Note that memory is not allocated yet, so
+ * this is cheap.
+ * @param size in bytes
+ * @param id the chunk id
+ */
+ public Chunk(int size, int id) {
+ this(size, id, false);
+ }
+
/**
- * Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap.
- *
+ * Create an uninitialized chunk. Note that memory is not allocated yet, so
+ * this is cheap.
* @param size in bytes
+ * @param id the chunk id
+ * @param fromPool if the chunk is formed by pool
*/
- Chunk(int size) {
+ public Chunk(int size, int id, boolean fromPool) {
this.size = size;
+ this.id = id;
+ this.fromPool = fromPool;
+ }
+
+ int getId() {
+ return this.id;
+ }
+
+ boolean isFromPool() {
+ return this.fromPool;
}
/**
@@ -60,7 +90,24 @@ public abstract class Chunk {
* constructed the chunk. It is thread-safe against other threads calling alloc(), who will block
* until the allocation is complete.
*/
- public abstract void init();
+ public void init() {
+ assert nextFreeOffset.get() == UNINITIALIZED;
+ try {
+ allocateDataBuffer();
+ } catch (OutOfMemoryError e) {
+ boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
+ assert failInit; // should be true.
+ throw e;
+ }
+ // Mark that it's ready for use
+ // Move 8 bytes since the first 8 bytes are having the chunkid in it
+ boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, Bytes.SIZEOF_LONG);
+ // We should always succeed the above CAS since only one thread
+ // calls init()!
+ Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
+ }
+
+ abstract void allocateDataBuffer();
/**
* Reset the offset to UNINITIALIZED before before reusing an old chunk
@@ -74,7 +121,8 @@ public abstract class Chunk {
/**
* Try to allocate <code>size</code> bytes from the chunk.
- *
+ * If a chunk is tried to get allocated before init() call, the thread doing the allocation
+ * will be in busy-wait state as it will keep looping till the nextFreeOffset is set.
* @return the offset of the successful allocation, or -1 to indicate not-enough-space
*/
public int alloc(int size) {
@@ -96,7 +144,7 @@ public abstract class Chunk {
if (oldOffset + size > data.capacity()) {
return -1; // alloc doesn't fit
}
-
+ // TODO : If seqID is to be written add 8 bytes here for nextFreeOFfset
// Try to atomically claim this chunk
if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
// we got the alloc
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
new file mode 100644
index 0000000..d550148
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
@@ -0,0 +1,404 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.lang.ref.SoftReference;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated
+ * with every chunk
+ */
+@InterfaceAudience.Private
+public class ChunkCreator {
+ private static final Log LOG = LogFactory.getLog(ChunkCreator.class);
+ // monotonically increasing chunkid
+ private AtomicInteger chunkID = new AtomicInteger(1);
+ // maps the chunk against the monotonically increasing chunk id. We need to preserve the
+ // natural ordering of the key
+ // CellChunkMap creation should convert the soft ref to hard reference
+ private Map<Integer, SoftReference<Chunk>> chunkIdMap =
+ new ConcurrentHashMap<Integer, SoftReference<Chunk>>();
+ private final int chunkSize;
+ private final boolean offheap;
+ @VisibleForTesting
+ static ChunkCreator INSTANCE;
+ @VisibleForTesting
+ static boolean chunkPoolDisabled = false;
+ private MemStoreChunkPool pool;
+
+ @VisibleForTesting
+ ChunkCreator(int chunkSize, boolean offheap, long globalMemStoreSize, float poolSizePercentage,
+ float initialCountPercentage, HeapMemoryManager heapMemoryManager) {
+ this.chunkSize = chunkSize;
+ this.offheap = offheap;
+ this.pool = initializePool(globalMemStoreSize, poolSizePercentage, initialCountPercentage);
+ if (heapMemoryManager != null && this.pool != null) {
+ // Register with Heap Memory manager
+ heapMemoryManager.registerTuneObserver(this.pool);
+ }
+ }
+
+ /**
+ * Initializes the instance of MSLABChunkCreator
+ * @param chunkSize the chunkSize
+ * @param offheap indicates if the chunk is to be created offheap or not
+ * @param globalMemStoreSize the global memstore size
+ * @param poolSizePercentage pool size percentage
+ * @param initialCountPercentage the initial count of the chunk pool if any
+ * @param heapMemoryManager the heapmemory manager
+ * @return singleton MSLABChunkCreator
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC",
+ justification = "Method is called by single thread at the starting of RS")
+ @VisibleForTesting
+ public static ChunkCreator initialize(int chunkSize, boolean offheap, long globalMemStoreSize,
+ float poolSizePercentage, float initialCountPercentage, HeapMemoryManager heapMemoryManager) {
+ if (INSTANCE != null) return INSTANCE;
+ INSTANCE = new ChunkCreator(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
+ initialCountPercentage, heapMemoryManager);
+ return INSTANCE;
+ }
+
+ static ChunkCreator getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Creates and inits a chunk.
+ * @return the chunk that was initialized
+ */
+ Chunk getChunk() {
+ Chunk chunk = null;
+ if (pool != null) {
+ // the pool creates the chunk internally. The chunk#init() call happens here
+ chunk = this.pool.getChunk();
+ // the pool has run out of maxCount
+ if (chunk == null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("The chunk pool is full. Reached maxCount= " + this.pool.getMaxCount()
+ + ". Creating chunk onheap.");
+ }
+ }
+ }
+ if (chunk == null) {
+ chunk = createChunk();
+ }
+ // put this chunk into the chunkIdMap
+ this.chunkIdMap.put(chunk.getId(), new SoftReference<>(chunk));
+ // now we need to actually do the expensive memory allocation step in case of a new chunk,
+ // else only the offset is set to the beginning of the chunk to accept allocations
+ chunk.init();
+ return chunk;
+ }
+
+ private Chunk createChunk() {
+ return createChunk(false);
+ }
+
+ /**
+ * Creates the chunk either onheap or offheap
+ * @param pool indicates if the chunks have to be created which will be used by the Pool
+ * @return the chunk
+ */
+ private Chunk createChunk(boolean pool) {
+ int id = chunkID.getAndIncrement();
+ assert id > 0;
+ // do not create offheap chunk on demand
+ if (pool && this.offheap) {
+ return new OffheapChunk(chunkSize, id, pool);
+ } else {
+ return new OnheapChunk(chunkSize, id, pool);
+ }
+ }
+
+ @VisibleForTesting
+ // TODO : To be used by CellChunkMap
+ Chunk getChunk(int id) {
+ SoftReference<Chunk> ref = chunkIdMap.get(id);
+ if (ref != null) {
+ return ref.get();
+ }
+ return null;
+ }
+
+ int getChunkSize() {
+ return this.chunkSize;
+ }
+
+ boolean isOffheap() {
+ return this.offheap;
+ }
+
+ private void removeChunks(Set<Integer> chunkIDs) {
+ this.chunkIdMap.keySet().removeAll(chunkIDs);
+ }
+
+ Chunk removeChunk(int chunkId) {
+ SoftReference<Chunk> ref = this.chunkIdMap.remove(chunkId);
+ if (ref != null) {
+ return ref.get();
+ }
+ return null;
+ }
+
+ @VisibleForTesting
+ int size() {
+ return this.chunkIdMap.size();
+ }
+
+ @VisibleForTesting
+ void clearChunkIds() {
+ this.chunkIdMap.clear();
+ }
+
+ /**
+ * A pool of {@link Chunk} instances.
+ *
+ * MemStoreChunkPool caches a number of retired chunks for reusing, it could
+ * decrease allocating bytes when writing, thereby optimizing the garbage
+ * collection on JVM.
+ */
+ private class MemStoreChunkPool implements HeapMemoryTuneObserver {
+ private int maxCount;
+
+ // A queue of reclaimed chunks
+ private final BlockingQueue<Chunk> reclaimedChunks;
+ private final float poolSizePercentage;
+
+ /** Statistics thread schedule pool */
+ private final ScheduledExecutorService scheduleThreadPool;
+ /** Statistics thread */
+ private static final int statThreadPeriod = 60 * 5;
+ private final AtomicLong chunkCount = new AtomicLong();
+ private final AtomicLong reusedChunkCount = new AtomicLong();
+
+ MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) {
+ this.maxCount = maxCount;
+ this.poolSizePercentage = poolSizePercentage;
+ this.reclaimedChunks = new LinkedBlockingQueue<>();
+ for (int i = 0; i < initialCount; i++) {
+ Chunk chunk = createChunk(true);
+ chunk.init();
+ reclaimedChunks.add(chunk);
+ }
+ chunkCount.set(initialCount);
+ final String n = Thread.currentThread().getName();
+ scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
+ .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build());
+ this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod,
+ statThreadPeriod, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have
+ * not yet created max allowed chunks count. When we have already created max allowed chunks and
+ * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk
+ * then.
+ * Note: Chunks returned by this pool must be put back to the pool after its use.
+ * @return a chunk
+ * @see #putbackChunks(Set)
+ */
+ Chunk getChunk() {
+ Chunk chunk = reclaimedChunks.poll();
+ if (chunk != null) {
+ chunk.reset();
+ reusedChunkCount.incrementAndGet();
+ } else {
+ // Make a chunk iff we have not yet created the maxCount chunks
+ while (true) {
+ long created = this.chunkCount.get();
+ if (created < this.maxCount) {
+ if (this.chunkCount.compareAndSet(created, created + 1)) {
+ chunk = createChunk(true);
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ return chunk;
+ }
+
+ /**
+ * Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining
+ * chunks
+ * @param chunks
+ */
+ private void putbackChunks(Set<Integer> chunks) {
+ int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
+ Iterator<Integer> iterator = chunks.iterator();
+ while (iterator.hasNext()) {
+ Integer chunkId = iterator.next();
+ // remove the chunks every time though they are from the pool or not
+ Chunk chunk = ChunkCreator.this.removeChunk(chunkId);
+ if (chunk != null) {
+ if (chunk.isFromPool() && toAdd > 0) {
+ reclaimedChunks.add(chunk);
+ }
+ toAdd--;
+ }
+ }
+ }
+
+ private class StatisticsThread extends Thread {
+ StatisticsThread() {
+ super("MemStoreChunkPool.StatisticsThread");
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ logStats();
+ }
+
+ private void logStats() {
+ if (!LOG.isDebugEnabled()) return;
+ long created = chunkCount.get();
+ long reused = reusedChunkCount.get();
+ long total = created + reused;
+ LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
+ + ",created chunk count=" + created
+ + ",reused chunk count=" + reused
+ + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
+ (float) reused / (float) total, 2)));
+ }
+ }
+
+ private int getMaxCount() {
+ return this.maxCount;
+ }
+
+ @Override
+ public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
+ // don't do any tuning in case of offheap memstore
+ if (isOffheap()) {
+ LOG.warn("Not tuning the chunk pool as it is offheap");
+ return;
+ }
+ int newMaxCount =
+ (int) (newMemstoreSize * poolSizePercentage / getChunkSize());
+ if (newMaxCount != this.maxCount) {
+ // We need an adjustment in the chunks numbers
+ if (newMaxCount > this.maxCount) {
+ // Max chunks getting increased. Just change the variable. Later calls to getChunk() would
+ // create and add them to Q
+ LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount);
+ this.maxCount = newMaxCount;
+ } else {
+ // Max chunks getting decreased. We may need to clear off some of the pooled chunks now
+ // itself. If the extra chunks are serving already, do not pool those when we get them back
+ LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount);
+ this.maxCount = newMaxCount;
+ if (this.reclaimedChunks.size() > newMaxCount) {
+ synchronized (this) {
+ while (this.reclaimedChunks.size() > newMaxCount) {
+ this.reclaimedChunks.poll();
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ static void clearDisableFlag() {
+ chunkPoolDisabled = false;
+ }
+
+ private MemStoreChunkPool initializePool(long globalMemStoreSize, float poolSizePercentage,
+ float initialCountPercentage) {
+ if (poolSizePercentage <= 0) {
+ LOG.info("PoolSizePercentage is less than 0. So not using pool");
+ return null;
+ }
+ if (chunkPoolDisabled) {
+ return null;
+ }
+ if (poolSizePercentage > 1.0) {
+ throw new IllegalArgumentException(
+ MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
+ }
+ int maxCount = (int) (globalMemStoreSize * poolSizePercentage / getChunkSize());
+ if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
+ throw new IllegalArgumentException(
+ MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0");
+ }
+ int initialCount = (int) (initialCountPercentage * maxCount);
+ LOG.info("Allocating MemStoreChunkPool with chunk size "
+ + StringUtils.byteDesc(getChunkSize()) + ", max count " + maxCount
+ + ", initial count " + initialCount);
+ return new MemStoreChunkPool(maxCount, initialCount, poolSizePercentage);
+ }
+
+ @VisibleForTesting
+ int getMaxCount() {
+ if (pool != null) {
+ return pool.getMaxCount();
+ }
+ return 0;
+ }
+
+ @VisibleForTesting
+ int getPoolSize() {
+ if (pool != null) {
+ return pool.reclaimedChunks.size();
+ }
+ return 0;
+ }
+
+ /*
+ * Only used in testing
+ */
+ @VisibleForTesting
+ void clearChunksInPool() {
+ if (pool != null) {
+ pool.reclaimedChunks.clear();
+ }
+ }
+
+ synchronized void putbackChunks(Set<Integer> chunks) {
+ if (pool != null) {
+ pool.putbackChunks(chunks);
+ } else {
+ this.removeChunks(chunks);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index d14571b..c197418 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1472,7 +1472,7 @@ public class HRegionServer extends HasThread implements
startServiceThreads();
startHeapMemoryManager();
// Call it after starting HeapMemoryManager.
- initializeMemStoreChunkPool();
+ initializeMemStoreChunkCreator();
LOG.info("Serving as " + this.serverName +
", RpcServer on " + rpcServices.isa +
", sessionid=0x" +
@@ -1492,7 +1492,7 @@ public class HRegionServer extends HasThread implements
}
}
- private void initializeMemStoreChunkPool() {
+ protected void initializeMemStoreChunkCreator() {
if (MemStoreLAB.isEnabled(conf)) {
// MSLAB is enabled. So initialize MemStoreChunkPool
// By this time, the MemstoreFlusher is already initialized. We can get the global limits from
@@ -1506,12 +1506,10 @@ public class HRegionServer extends HasThread implements
float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
- MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage,
- initialCountPercentage, chunkSize, offheap);
- if (pool != null && this.hMemManager != null) {
- // Register with Heap Memory manager
- this.hMemManager.registerTuneObserver(pool);
- }
+ // init the chunkCreator
+ ChunkCreator chunkCreator =
+ ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
+ initialCountPercentage, this.hMemManager);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
deleted file mode 100644
index b7ac212..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
-import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * A pool of {@link Chunk} instances.
- *
- * MemStoreChunkPool caches a number of retired chunks for reusing, it could
- * decrease allocating bytes when writing, thereby optimizing the garbage
- * collection on JVM.
- *
- * The pool instance is globally unique and could be obtained through
- * {@link MemStoreChunkPool#initialize(long, float, float, int, boolean)}
- *
- * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating
- * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called
- * when MemStore clearing snapshot for flush
- */
-@SuppressWarnings("javadoc")
-@InterfaceAudience.Private
-public class MemStoreChunkPool implements HeapMemoryTuneObserver {
- private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class);
-
- // Static reference to the MemStoreChunkPool
- static MemStoreChunkPool GLOBAL_INSTANCE;
- /** Boolean whether we have disabled the memstore chunk pool entirely. */
- static boolean chunkPoolDisabled = false;
-
- private int maxCount;
-
- // A queue of reclaimed chunks
- private final BlockingQueue<Chunk> reclaimedChunks;
- private final int chunkSize;
- private final float poolSizePercentage;
-
- /** Statistics thread schedule pool */
- private final ScheduledExecutorService scheduleThreadPool;
- /** Statistics thread */
- private static final int statThreadPeriod = 60 * 5;
- private final AtomicLong chunkCount = new AtomicLong();
- private final AtomicLong reusedChunkCount = new AtomicLong();
- private final boolean offheap;
-
- MemStoreChunkPool(int chunkSize, int maxCount, int initialCount, float poolSizePercentage,
- boolean offheap) {
- this.maxCount = maxCount;
- this.chunkSize = chunkSize;
- this.poolSizePercentage = poolSizePercentage;
- this.offheap = offheap;
- this.reclaimedChunks = new LinkedBlockingQueue<>();
- for (int i = 0; i < initialCount; i++) {
- Chunk chunk = this.offheap ? new OffheapChunk(chunkSize) : new OnheapChunk(chunkSize);
- chunk.init();
- reclaimedChunks.add(chunk);
- }
- chunkCount.set(initialCount);
- final String n = Thread.currentThread().getName();
- scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
- .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build());
- this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod,
- statThreadPeriod, TimeUnit.SECONDS);
- }
-
- /**
- * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have
- * not yet created max allowed chunks count. When we have already created max allowed chunks and
- * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk
- * then.
- * Note: Chunks returned by this pool must be put back to the pool after its use.
- * @return a chunk
- * @see #putbackChunk(Chunk)
- * @see #putbackChunks(BlockingQueue)
- */
- Chunk getChunk() {
- Chunk chunk = reclaimedChunks.poll();
- if (chunk != null) {
- chunk.reset();
- reusedChunkCount.incrementAndGet();
- } else {
- // Make a chunk iff we have not yet created the maxCount chunks
- while (true) {
- long created = this.chunkCount.get();
- if (created < this.maxCount) {
- chunk = this.offheap ? new OffheapChunk(this.chunkSize) : new OnheapChunk(this.chunkSize);
- if (this.chunkCount.compareAndSet(created, created + 1)) {
- break;
- }
- } else {
- break;
- }
- }
- }
- return chunk;
- }
-
- /**
- * Add the chunks to the pool, when the pool achieves the max size, it will
- * skip the remaining chunks
- * @param chunks
- */
- synchronized void putbackChunks(BlockingQueue<Chunk> chunks) {
- int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
- Chunk chunk = null;
- while ((chunk = chunks.poll()) != null && toAdd > 0) {
- reclaimedChunks.add(chunk);
- toAdd--;
- }
- }
-
- /**
- * Add the chunk to the pool, if the pool has achieved the max size, it will
- * skip it
- * @param chunk
- */
- synchronized void putbackChunk(Chunk chunk) {
- if (reclaimedChunks.size() < this.maxCount) {
- reclaimedChunks.add(chunk);
- }
- }
-
- int getPoolSize() {
- return this.reclaimedChunks.size();
- }
-
- /*
- * Only used in testing
- */
- void clearChunks() {
- this.reclaimedChunks.clear();
- }
-
- private class StatisticsThread extends Thread {
- StatisticsThread() {
- super("MemStoreChunkPool.StatisticsThread");
- setDaemon(true);
- }
-
- @Override
- public void run() {
- logStats();
- }
-
- private void logStats() {
- if (!LOG.isDebugEnabled()) return;
- long created = chunkCount.get();
- long reused = reusedChunkCount.get();
- long total = created + reused;
- LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
- + ",created chunk count=" + created
- + ",reused chunk count=" + reused
- + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
- (float) reused / (float) total, 2)));
- }
- }
-
- /**
- * @return the global MemStoreChunkPool instance
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC",
- justification = "Method is called by single thread at the starting of RS")
- static MemStoreChunkPool initialize(long globalMemStoreSize, float poolSizePercentage,
- float initialCountPercentage, int chunkSize, boolean offheap) {
- if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE;
- if (chunkPoolDisabled) return null;
-
- if (poolSizePercentage <= 0) {
- chunkPoolDisabled = true;
- return null;
- }
- if (poolSizePercentage > 1.0) {
- throw new IllegalArgumentException(
- MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
- }
- int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize);
- if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
- throw new IllegalArgumentException(
- MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0");
- }
- int initialCount = (int) (initialCountPercentage * maxCount);
- LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
- + ", max count " + maxCount + ", initial count " + initialCount);
- GLOBAL_INSTANCE = new MemStoreChunkPool(chunkSize, maxCount, initialCount, poolSizePercentage,
- offheap);
- return GLOBAL_INSTANCE;
- }
-
- /**
- * @return The singleton instance of this pool.
- */
- static MemStoreChunkPool getPool() {
- return GLOBAL_INSTANCE;
- }
-
- int getMaxCount() {
- return this.maxCount;
- }
-
- @VisibleForTesting
- static void clearDisableFlag() {
- chunkPoolDisabled = false;
- }
-
- @Override
- public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
- // don't do any tuning in case of offheap memstore
- if (this.offheap) {
- LOG.warn("Not tuning the chunk pool as it is offheap");
- return;
- }
- int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize);
- if (newMaxCount != this.maxCount) {
- // We need an adjustment in the chunks numbers
- if (newMaxCount > this.maxCount) {
- // Max chunks getting increased. Just change the variable. Later calls to getChunk() would
- // create and add them to Q
- LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount);
- this.maxCount = newMaxCount;
- } else {
- // Max chunks getting decreased. We may need to clear off some of the pooled chunks now
- // itself. If the extra chunks are serving already, do not pool those when we get them back
- LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount);
- this.maxCount = newMaxCount;
- if (this.reclaimedChunks.size() > newMaxCount) {
- synchronized (this) {
- while (this.reclaimedChunks.size() > newMaxCount) {
- this.reclaimedChunks.poll();
- }
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
index f6d1607..72e937c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
* <p>
* The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) chunks from
* and then doles it out to threads that request slices into the array. These chunks can get pooled
- * as well. See {@link MemStoreChunkPool}.
+ * as well. See {@link ChunkCreator}.
* <p>
* The purpose of this is to combat heap fragmentation in the regionserver. By ensuring that all
* Cells in a given memstore refer only to large chunks of contiguous memory, we ensure that
@@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
* {@link #copyCellInto(Cell)} gets called. This allocates enough size in the chunk to hold this
* cell's data and copies into this area and then recreate a Cell over this copied data.
* <p>
- * @see MemStoreChunkPool
+ * @see ChunkCreator
*/
@InterfaceAudience.Private
public interface MemStoreLAB {
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index 4e87135..4fba82d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -18,23 +18,26 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.nio.ByteBuffer;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-
/**
* A memstore-local allocation buffer.
* <p>
@@ -55,8 +58,8 @@ import com.google.common.base.Preconditions;
* would provide a performance improvement - probably would speed up the
* Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
* anyway.
- * The chunks created by this MemStoreLAB can get pooled at {@link MemStoreChunkPool}.
- * When the Chunk comes pool, it can be either an on heap or an off heap backed chunk. The chunks,
+ * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}.
+ * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks,
* which this MemStoreLAB creates on its own (when no chunk available from pool), those will be
* always on heap backed.
*/
@@ -66,14 +69,15 @@ public class MemStoreLABImpl implements MemStoreLAB {
static final Log LOG = LogFactory.getLog(MemStoreLABImpl.class);
private AtomicReference<Chunk> curChunk = new AtomicReference<>();
- // A queue of chunks from pool contained by this memstore LAB
- // TODO: in the future, it would be better to have List implementation instead of Queue,
- // as FIFO order is not so important here
+ // Lock to manage multiple handlers requesting for a chunk
+ private ReentrantLock lock = new ReentrantLock();
+
+ // A set of chunks contained by this memstore LAB
@VisibleForTesting
- BlockingQueue<Chunk> pooledChunkQueue = null;
+ Set<Integer> chunks = new ConcurrentSkipListSet<Integer>();
private final int chunkSize;
private final int maxAlloc;
- private final MemStoreChunkPool chunkPool;
+ private final ChunkCreator chunkCreator;
// This flag is for closing this instance, its set when clearing snapshot of
// memstore
@@ -92,20 +96,12 @@ public class MemStoreLABImpl implements MemStoreLAB {
public MemStoreLABImpl(Configuration conf) {
chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
- this.chunkPool = MemStoreChunkPool.getPool();
- // currently chunkQueue is only used for chunkPool
- if (this.chunkPool != null) {
- // set queue length to chunk pool max count to avoid keeping reference of
- // too many non-reclaimable chunks
- pooledChunkQueue = new LinkedBlockingQueue<>(chunkPool.getMaxCount());
- }
-
+ this.chunkCreator = ChunkCreator.getInstance();
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
Preconditions.checkArgument(maxAlloc <= chunkSize,
MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
}
-
@Override
public Cell copyCellInto(Cell cell) {
int size = KeyValueUtil.length(cell);
@@ -118,19 +114,52 @@ public class MemStoreLABImpl implements MemStoreLAB {
Chunk c = null;
int allocOffset = 0;
while (true) {
+ // Try to get the chunk
c = getOrMakeChunk();
+ // we may get null because the some other thread succeeded in getting the lock
+ // and so the current thread has to try again to make its chunk or grab the chunk
+ // that the other thread created
// Try to allocate from this chunk
- allocOffset = c.alloc(size);
- if (allocOffset != -1) {
- // We succeeded - this is the common case - small alloc
- // from a big buffer
- break;
+ if (c != null) {
+ allocOffset = c.alloc(size);
+ if (allocOffset != -1) {
+ // We succeeded - this is the common case - small alloc
+ // from a big buffer
+ break;
+ }
+ // not enough space!
+ // try to retire this chunk
+ tryRetireChunk(c);
}
- // not enough space!
- // try to retire this chunk
- tryRetireChunk(c);
}
- return CellUtil.copyCellTo(cell, c.getData(), allocOffset, size);
+ return copyToChunkCell(cell, c.getData(), allocOffset, size);
+ }
+
+ /**
+ * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
+ * out of it
+ */
+ private Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) {
+ int tagsLen = cell.getTagsLength();
+ if (cell instanceof ExtendedCell) {
+ ((ExtendedCell) cell).write(buf, offset);
+ } else {
+ // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
+ // other case also. The data fragments within Cell is copied into buf as in KeyValue
+ // serialization format only.
+ KeyValueUtil.appendTo(cell, buf, offset, true);
+ }
+ // TODO : write the seqid here. For writing seqId we should create a new cell type so
+ // that seqId is not used as the state
+ if (tagsLen == 0) {
+ // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
+ // which directly return tagsLen as 0. So we avoid parsing many length components in
+ // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
+ // call getTagsLength().
+ return new NoTagByteBufferChunkCell(buf, offset, len, cell.getSequenceId());
+ } else {
+ return new ByteBufferChunkCell(buf, offset, len, cell.getSequenceId());
+ }
}
/**
@@ -142,9 +171,9 @@ public class MemStoreLABImpl implements MemStoreLAB {
this.closed = true;
// We could put back the chunks to pool for reusing only when there is no
// opening scanner which will read their data
- if (chunkPool != null && openScannerCount.get() == 0
- && reclaimed.compareAndSet(false, true)) {
- chunkPool.putbackChunks(this.pooledChunkQueue);
+ int count = openScannerCount.get();
+ if(count == 0) {
+ recycleChunks();
}
}
@@ -162,9 +191,14 @@ public class MemStoreLABImpl implements MemStoreLAB {
@Override
public void decScannerCount() {
int count = this.openScannerCount.decrementAndGet();
- if (this.closed && chunkPool != null && count == 0
- && reclaimed.compareAndSet(false, true)) {
- chunkPool.putbackChunks(this.pooledChunkQueue);
+ if (this.closed && count == 0) {
+ recycleChunks();
+ }
+ }
+
+ private void recycleChunks() {
+ if (reclaimed.compareAndSet(false, true)) {
+ chunkCreator.putbackChunks(chunks);
}
}
@@ -190,45 +224,33 @@ public class MemStoreLABImpl implements MemStoreLAB {
* allocate a new one from the JVM.
*/
private Chunk getOrMakeChunk() {
- while (true) {
- // Try to get the chunk
- Chunk c = curChunk.get();
- if (c != null) {
- return c;
- }
-
- // No current chunk, so we want to allocate one. We race
- // against other allocators to CAS in an uninitialized chunk
- // (which is cheap to allocate)
- if (chunkPool != null) {
- c = chunkPool.getChunk();
- }
- boolean pooledChunk = false;
- if (c != null) {
- // This is chunk from pool
- pooledChunk = true;
- } else {
- c = new OnheapChunk(chunkSize);// When chunk is not from pool, always make it as on heap.
- }
- if (curChunk.compareAndSet(null, c)) {
- // we won race - now we need to actually do the expensive
- // allocation step
- c.init();
- if (pooledChunk) {
- if (!this.closed && !this.pooledChunkQueue.offer(c)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
- + pooledChunkQueue.size());
- }
- }
+ // Try to get the chunk
+ Chunk c = curChunk.get();
+ if (c != null) {
+ return c;
+ }
+ // No current chunk, so we want to allocate one. We race
+ // against other allocators to CAS in an uninitialized chunk
+ // (which is cheap to allocate)
+ if (lock.tryLock()) {
+ try {
+ // once again check inside the lock
+ c = curChunk.get();
+ if (c != null) {
+ return c;
}
- return c;
- } else if (pooledChunk) {
- chunkPool.putbackChunk(c);
+ c = this.chunkCreator.getChunk();
+ if (c != null) {
+ // set the curChunk. No need of CAS as only one thread will be here
+ curChunk.set(c);
+ chunks.add(c.getId());
+ return c;
+ }
+ } finally {
+ lock.unlock();
}
- // someone else won race - that's fine, we'll try to grab theirs
- // in the next iteration of the loop.
}
+ return null;
}
@VisibleForTesting
@@ -236,8 +258,15 @@ public class MemStoreLABImpl implements MemStoreLAB {
return this.curChunk.get();
}
-
+ @VisibleForTesting
BlockingQueue<Chunk> getPooledChunks() {
- return this.pooledChunkQueue;
+ BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>();
+ for (Integer id : this.chunks) {
+ Chunk chunk = chunkCreator.getChunk(id);
+ if (chunk != null && chunk.isFromPool()) {
+ pooledChunks.add(chunk);
+ }
+ }
+ return pooledChunks;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java
new file mode 100644
index 0000000..a8ba50c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.NoTagsByteBufferKeyValue;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+
+
+/**
+ * ByteBuffer based cell which has the chunkid at the 0th offset and with no tags
+ * @see MemStoreLAB
+ */
+@InterfaceAudience.Private
+public class NoTagByteBufferChunkCell extends NoTagsByteBufferKeyValue {
+
+ public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length) {
+ super(buf, offset, length);
+ }
+
+ public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) {
+ super(buf, offset, length, seqId);
+ }
+
+ @Override
+ public int getChunkId() {
+ // The chunkId is embedded at the 0th offset of the bytebuffer
+ return ByteBufferUtils.toInt(buf, 0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
index ed98cfa..e244a33 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
@@ -21,34 +21,27 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import com.google.common.base.Preconditions;
-
/**
* An off heap chunk implementation.
*/
@InterfaceAudience.Private
public class OffheapChunk extends Chunk {
- OffheapChunk(int size) {
- super(size);
+ OffheapChunk(int size, int id) {
+ // better if this is always created fromPool. This should not be called
+ super(size, id);
+ }
+
+ OffheapChunk(int size, int id, boolean fromPool) {
+ super(size, id, fromPool);
+ assert fromPool == true;
}
@Override
- public void init() {
- assert nextFreeOffset.get() == UNINITIALIZED;
- try {
- if (data == null) {
- data = ByteBuffer.allocateDirect(this.size);
- }
- } catch (OutOfMemoryError e) {
- boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
- assert failInit; // should be true.
- throw e;
+ void allocateDataBuffer() {
+ if (data == null) {
+ data = ByteBuffer.allocateDirect(this.size);
+ data.putLong(0, this.getId());
}
- // Mark that it's ready for use
- boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
- // We should always succeed the above CAS since only one thread
- // calls init()!
- Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
index bd33cb5..da34e24 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
@@ -21,33 +21,25 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import com.google.common.base.Preconditions;
-
/**
* An on heap chunk implementation.
*/
@InterfaceAudience.Private
public class OnheapChunk extends Chunk {
- OnheapChunk(int size) {
- super(size);
+ OnheapChunk(int size, int id) {
+ super(size, id);
+ }
+
+ OnheapChunk(int size, int id, boolean fromPool) {
+ super(size, id, fromPool);
}
- public void init() {
- assert nextFreeOffset.get() == UNINITIALIZED;
- try {
- if (data == null) {
- data = ByteBuffer.allocate(this.size);
- }
- } catch (OutOfMemoryError e) {
- boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
- assert failInit; // should be true.
- throw e;
+ @Override
+ void allocateDataBuffer() {
+ if (data == null) {
+ data = ByteBuffer.allocate(this.size);
+ data.putLong(0, this.getId());
}
- // Mark that it's ready for use
- boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
- // We should always succeed the above CAS since only one thread
- // calls init()!
- Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index acf2af0..c0ddbfc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -95,6 +95,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -2424,6 +2426,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
final Configuration conf, final HTableDescriptor htd, boolean initialize)
throws IOException {
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
WAL wal = createWal(conf, rootDir, info);
return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
index 422c54b..8d8b6df 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -49,8 +50,10 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -397,6 +400,7 @@ public class TestCoprocessorInterface {
for(byte [] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegionInfo info = new HRegionInfo(tableName, null, null, false);
Path path = new Path(DIR + callingMethod);
Region r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
index 80d0e3a..b99087d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
@@ -47,10 +47,12 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -152,6 +154,7 @@ public class TestRegionObserverScannerOpenHook {
for (byte[] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
Path path = new Path(DIR + callingMethod);
WAL wal = HBaseTestingUtility.createWal(conf, path, info);
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
index 2e44dee..15d449d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
@@ -34,7 +34,9 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
@@ -100,6 +102,7 @@ public class TestRegionObserverStacking extends TestCase {
for(byte [] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
Path path = new Path(DIR + callingMethod);
HRegion r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
index f1775d0..fae7247 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
@@ -40,8 +40,10 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -342,6 +344,7 @@ public class TestScannerFromBucketCache {
private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
String callingMethod, Configuration conf, HBaseTestingUtility test_util, boolean isReadOnly,
byte[]... families) throws IOException {
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
Path logDir = test_util.getDataTestDirOnTestFS(callingMethod + ".log");
HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
index cc73d9d..32bce26 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
@@ -65,6 +65,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActi
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -73,6 +75,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Triple;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -88,6 +91,10 @@ public class TestCatalogJanitor {
@Rule
public TestName name = new TestName();
+ @BeforeClass
+ public static void setup() throws Exception {
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
+ }
/**
* Mock MasterServices for tests below.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
index 418aadf..096c5ef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
@@ -241,7 +241,7 @@ public class TestBulkLoad {
for (byte[] family : families) {
hTableDescriptor.addFamily(new HColumnDescriptor(family));
}
-
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
// TODO We need a way to do this without creating files
return HRegion.createHRegion(hRegionInfo,
new Path(testFolder.newFolder().toURI()),
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
index 3b4d068..09877b0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
@@ -73,7 +73,7 @@ public class TestCellFlatSet extends TestCase {
descCbOnHeap = new CellArrayMap(CellComparator.COMPARATOR,descCells,0,NUM_OF_CELLS,true);
CONF.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
CONF.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
- MemStoreChunkPool.chunkPoolDisabled = false;
+ ChunkCreator.chunkPoolDisabled = false;
}
/* Create and test CellSet based on CellArrayMap */
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index a888c45..9e90f3e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -50,7 +51,7 @@ import static org.junit.Assert.assertTrue;
public class TestCompactingMemStore extends TestDefaultMemStore {
private static final Log LOG = LogFactory.getLog(TestCompactingMemStore.class);
- protected static MemStoreChunkPool chunkPool;
+ protected static ChunkCreator chunkCreator;
protected HRegion region;
protected RegionServicesForStores regionServicesForStores;
protected HStore store;
@@ -65,7 +66,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
@After
public void tearDown() throws Exception {
- chunkPool.clearChunks();
+ chunkCreator.clearChunksInPool();
}
@Override
@@ -84,15 +85,21 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf);
HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
- this.region = hbaseUtility.createTestRegion("foobar", hcd);
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar"));
+ htd.addFamily(hcd);
+ HRegionInfo info =
+ new HRegionInfo(TableName.valueOf("foobar"), null, null, false);
+ WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info);
+ this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true);
+ //this.region = hbaseUtility.createTestRegion("foobar", hcd);
this.regionServicesForStores = region.getRegionServicesForStores();
this.store = new HStore(region, hcd, conf);
long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
.getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
- chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f,
- MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false);
- assertTrue(chunkPool != null);
+ chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
+ globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
+ assertTrue(chunkCreator != null);
}
/**
@@ -390,7 +397,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
}
memstore.clearSnapshot(snapshot.getId());
- int chunkCount = chunkPool.getPoolSize();
+ int chunkCount = chunkCreator.getPoolSize();
assertTrue(chunkCount > 0);
}
@@ -434,16 +441,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
}
memstore.clearSnapshot(snapshot.getId());
- assertTrue(chunkPool.getPoolSize() == 0);
+ assertTrue(chunkCreator.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners;
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
- assertTrue(chunkPool.getPoolSize() > 0);
+ assertTrue(chunkCreator.getPoolSize() > 0);
// clear chunks
- chunkPool.clearChunks();
+ chunkCreator.clearChunksInPool();
// Creating another snapshot
@@ -464,7 +471,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
- assertTrue(chunkPool.getPoolSize() > 0);
+ assertTrue(chunkCreator.getPoolSize() > 0);
}
@Test
@@ -516,16 +523,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
memstore.add(new KeyValue(row, fam, qf1, 3, val), null);
assertEquals(3, memstore.getActive().getCellsCount());
- assertTrue(chunkPool.getPoolSize() == 0);
+ assertTrue(chunkCreator.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners;
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
- assertTrue(chunkPool.getPoolSize() > 0);
+ assertTrue(chunkCreator.getPoolSize() > 0);
// clear chunks
- chunkPool.clearChunks();
+ chunkCreator.clearChunksInPool();
// Creating another snapshot
@@ -553,7 +560,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
- assertTrue(chunkPool.getPoolSize() > 0);
+ assertTrue(chunkCreator.getPoolSize() > 0);
}
//////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
index 5a48455..66e107a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
@@ -44,17 +44,13 @@ import java.util.List;
public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore {
private static final Log LOG = LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class);
- //private static MemStoreChunkPool chunkPool;
- //private HRegion region;
- //private RegionServicesForStores regionServicesForStores;
- //private HStore store;
//////////////////////////////////////////////////////////////////////////////
// Helpers
//////////////////////////////////////////////////////////////////////////////
@Override public void tearDown() throws Exception {
- chunkPool.clearChunks();
+ chunkCreator.clearChunksInPool();
}
@Override public void setUp() throws Exception {
@@ -408,16 +404,16 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
}
memstore.clearSnapshot(snapshot.getId());
- assertTrue(chunkPool.getPoolSize() == 0);
+ assertTrue(chunkCreator.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners;
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
- assertTrue(chunkPool.getPoolSize() > 0);
+ assertTrue(chunkCreator.getPoolSize() > 0);
// clear chunks
- chunkPool.clearChunks();
+ chunkCreator.clearChunksInPool();
// Creating another snapshot
@@ -438,7 +434,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
- assertTrue(chunkPool.getPoolSize() > 0);
+ assertTrue(chunkCreator.getPoolSize() > 0);
}
@Test
@@ -472,7 +468,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
}
memstore.clearSnapshot(snapshot.getId());
- int chunkCount = chunkPool.getPoolSize();
+ int chunkCount = chunkCreator.getPoolSize();
assertTrue(chunkCount > 0);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java
index 8e85730..e320368 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java
@@ -164,6 +164,7 @@ public class TestCompactionArchiveConcurrentClose {
HRegionFileSystem fs = new WaitingHRegionFileSystem(conf, tableDir.getFileSystem(conf),
tableDir, info);
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
final Configuration walConf = new Configuration(conf);
FSUtils.setRootDir(walConf, tableDir);
final WALFactory wals = new WALFactory(walConf, null, "log_" + info.getEncodedName());
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java
index 89b2368..e7fcf18 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java
@@ -174,6 +174,7 @@ public class TestCompactionArchiveIOException {
private HRegion initHRegion(HTableDescriptor htd, HRegionInfo info)
throws IOException {
Configuration conf = testUtil.getConfiguration();
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName());
Path regionDir = new Path(tableDir, info.getEncodedName());
Path storeDir = new Path(regionDir, htd.getColumnFamilies()[0].getNameAsString());
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
index 58dbe8d..543ca6c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
@@ -101,6 +101,7 @@ public class TestCompactionPolicy {
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
hlog = new FSHLog(fs, basedir, logName, conf);
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
region = HRegion.createHRegion(info, basedir, conf, htd, hlog);
region.close();
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 7434eb1..41b304b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -84,6 +85,7 @@ public class TestDefaultMemStore {
protected static final byte[] FAMILY = Bytes.toBytes("column");
protected MultiVersionConcurrencyControl mvcc;
protected AtomicLong startSeqNum = new AtomicLong(0);
+ protected ChunkCreator chunkCreator;
private String getName() {
return this.name.getMethodName();
@@ -92,9 +94,17 @@ public class TestDefaultMemStore {
@Before
public void setUp() throws Exception {
internalSetUp();
+ // no pool
+ this.chunkCreator =
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
this.memstore = new DefaultMemStore();
}
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ ChunkCreator.getInstance().clearChunkIds();
+ }
+
protected void internalSetUp() throws Exception {
this.mvcc = new MultiVersionConcurrencyControl();
}
@@ -129,7 +139,9 @@ public class TestDefaultMemStore {
assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize());
// make sure chunk size increased even when writing the same cell, if using MSLAB
if (msLab instanceof MemStoreLABImpl) {
- assertEquals(2 * Segment.getCellLength(kv),
+ // since we add the chunkID at the 0th offset of the chunk and the
+ // chunkid is a long we need to account for those 8 bytes
+ assertEquals(2 * Segment.getCellLength(kv) + Bytes.SIZEOF_LONG,
((MemStoreLABImpl) msLab).getCurrentChunk().getNextFreeOffset());
}
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index 73fb9cf..24e850d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -266,6 +266,7 @@ public class TestFailedAppendAndSync {
*/
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
throws IOException {
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL,
wal, COLUMN_FAMILY_BYTES);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/972e8c8c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
index b416c7d..0f24a24 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@ -153,7 +153,7 @@ public class TestHMobStore {
htd.addFamily(hcd);
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
-
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
final Configuration walConf = new Configuration(conf);
FSUtils.setRootDir(walConf, basedir);
final WALFactory wals = new WALFactory(walConf, null, methodName);