You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/04/17 20:55:27 UTC

[48/50] [abbrv] hbase git commit: HBASE-16438 Create a cell type so that chunk id is embedded in it (Ram)

HBASE-16438 Create a cell type so that chunk id is embedded in it (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c2c2178b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c2c2178b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c2c2178b

Branch: refs/heads/hbase-12439
Commit: c2c2178b2eebe4439eadec6b37fae2566944c16b
Parents: c8cd921
Author: Ramkrishna <ra...@intel.com>
Authored: Mon Apr 17 09:10:59 2017 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Mon Apr 17 09:28:24 2017 +0530

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/CellUtil.java  |  24 --
 .../org/apache/hadoop/hbase/ExtendedCell.java   |  10 +
 .../org/apache/hadoop/hbase/master/HMaster.java |   2 +
 .../hbase/regionserver/ByteBufferChunkCell.java |  48 +++
 .../apache/hadoop/hbase/regionserver/Chunk.java |  60 ++-
 .../hadoop/hbase/regionserver/ChunkCreator.java | 404 +++++++++++++++++++
 .../hbase/regionserver/HRegionServer.java       |  14 +-
 .../hbase/regionserver/MemStoreChunkPool.java   | 265 ------------
 .../hadoop/hbase/regionserver/MemStoreLAB.java  |   4 +-
 .../hbase/regionserver/MemStoreLABImpl.java     | 171 ++++----
 .../regionserver/NoTagByteBufferChunkCell.java  |  48 +++
 .../hadoop/hbase/regionserver/OffheapChunk.java |  31 +-
 .../hadoop/hbase/regionserver/OnheapChunk.java  |  32 +-
 .../hadoop/hbase/HBaseTestingUtility.java       |   3 +
 .../coprocessor/TestCoprocessorInterface.java   |   4 +
 .../TestRegionObserverScannerOpenHook.java      |   3 +
 .../coprocessor/TestRegionObserverStacking.java |   3 +
 .../io/hfile/TestScannerFromBucketCache.java    |   3 +
 .../hadoop/hbase/master/TestCatalogJanitor.java |   7 +
 .../hadoop/hbase/regionserver/TestBulkLoad.java |   2 +-
 .../hbase/regionserver/TestCellFlatSet.java     |   2 +-
 .../regionserver/TestCompactingMemStore.java    |  37 +-
 .../TestCompactingToCellArrayMapMemStore.java   |  16 +-
 .../TestCompactionArchiveConcurrentClose.java   |   1 +
 .../TestCompactionArchiveIOException.java       |   1 +
 .../regionserver/TestCompactionPolicy.java      |   1 +
 .../hbase/regionserver/TestDefaultMemStore.java |  14 +-
 .../regionserver/TestFailedAppendAndSync.java   |   1 +
 .../hbase/regionserver/TestHMobStore.java       |   2 +-
 .../hadoop/hbase/regionserver/TestHRegion.java  |   2 +
 .../regionserver/TestHRegionReplayEvents.java   |   2 +-
 .../regionserver/TestMemStoreChunkPool.java     |  48 +--
 .../hbase/regionserver/TestMemStoreLAB.java     |  27 +-
 .../TestMemstoreLABWithoutPool.java             | 168 ++++++++
 .../hbase/regionserver/TestRecoveredEdits.java  |   1 +
 .../hbase/regionserver/TestRegionIncrement.java |   1 +
 .../hadoop/hbase/regionserver/TestStore.java    |   1 +
 .../TestStoreFileRefresherChore.java            |   1 +
 .../hbase/regionserver/TestWALLockup.java       |   1 +
 .../TestWALMonotonicallyIncreasingSeqId.java    |   1 +
 .../hbase/regionserver/wal/TestDurability.java  |   3 +
 41 files changed, 990 insertions(+), 479 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index e1bc969..56de21b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -3135,28 +3135,4 @@ public final class CellUtil {
       return Type.DeleteFamily.getCode();
     }
   }
-
-  /**
-   * Clone the passed cell by copying its data into the passed buf.
-   */
-  public static Cell copyCellTo(Cell cell, ByteBuffer buf, int offset, int len) {
-    int tagsLen = cell.getTagsLength();
-    if (cell instanceof ExtendedCell) {
-      ((ExtendedCell) cell).write(buf, offset);
-    } else {
-      // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
-      // other case also. The data fragments within Cell is copied into buf as in KeyValue
-      // serialization format only.
-      KeyValueUtil.appendTo(cell, buf, offset, true);
-    }
-    if (tagsLen == 0) {
-      // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
-      // which directly return tagsLen as 0. So we avoid parsing many length components in
-      // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
-      // call getTagsLength().
-      return new NoTagsByteBufferKeyValue(buf, offset, len, cell.getSequenceId());
-    } else {
-      return new ByteBufferKeyValue(buf, offset, len, cell.getSequenceId());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
index 517873f..10f20ca 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.io.HeapSize;
 public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestamp, HeapSize,
     Cloneable {
 
+  public static int CELL_NOT_BASED_ON_CHUNK = -1;
   /**
    * Write this cell to an OutputStream in a {@link KeyValue} format.
    * <br> KeyValue format <br>
@@ -73,4 +74,13 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam
    * @return The deep cloned cell
    */
   Cell deepClone();
+
+  /**
+   * Extracts the id of the backing bytebuffer of this cell if it was obtained from fixed sized
+   * chunks as in case of MemstoreLAB
+   * @return the chunk id if the cell is backed by fixed sized Chunks, else return -1
+   */
+  default int getChunkId() {
+    return CELL_NOT_BASED_ON_CHUNK;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index bb9f282..f9670e1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -748,6 +748,8 @@ public class HMaster extends HRegionServer implements MasterServices {
 
     this.masterActiveTime = System.currentTimeMillis();
     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
+    // Initialize the chunkCreator
+    initializeMemStoreChunkCreator();
     this.fileSystemManager = new MasterFileSystem(this);
     this.walManager = new MasterWalManager(this);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java
new file mode 100644
index 0000000..a8f1000
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.ByteBufferKeyValue;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+
+/**
+ * ByteBuffer based cell which has the chunkid at the 0th offset
+ * @see MemStoreLAB
+ */
+//TODO : When moving this cell to CellChunkMap we will have the following things
+// to be serialized
+// chunkId (Integer) + offset (Integer) + length (Integer) + seqId (Long) = 20 bytes
+@InterfaceAudience.Private
+public class ByteBufferChunkCell extends ByteBufferKeyValue {
+  public ByteBufferChunkCell(ByteBuffer buf, int offset, int length) {
+    super(buf, offset, length);
+  }
+
+  public ByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) {
+    super(buf, offset, length, seqId);
+  }
+
+  @Override
+  public int getChunkId() {
+    // The chunkId is embedded at the 0th offset of the bytebuffer
+    return ByteBufferUtils.toInt(buf, 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
index 2cbf0a3..fc4aa0b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
@@ -21,8 +21,10 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 /**
  * A chunk of memory out of which allocations are sliced.
@@ -46,13 +48,41 @@ public abstract class Chunk {
   /** Size of chunk in bytes */
   protected final int size;
 
+  // The unique id associated with the chunk.
+  private final int id;
+
+  // indicates if the chunk is formed by ChunkCreator#MemstorePool
+  private final boolean fromPool;
+
+  /**
+   * Create an uninitialized chunk. Note that memory is not allocated yet, so
+   * this is cheap.
+   * @param size in bytes
+   * @param id the chunk id
+   */
+  public Chunk(int size, int id) {
+    this(size, id, false);
+  }
+
   /**
-   * Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap.
-   *
+   * Create an uninitialized chunk. Note that memory is not allocated yet, so
+   * this is cheap.
    * @param size in bytes
+   * @param id the chunk id
+   * @param fromPool if the chunk is formed by pool
    */
-  Chunk(int size) {
+  public Chunk(int size, int id, boolean fromPool) {
     this.size = size;
+    this.id = id;
+    this.fromPool = fromPool;
+  }
+
+  int getId() {
+    return this.id;
+  }
+
+  boolean isFromPool() {
+    return this.fromPool;
   }
 
   /**
@@ -60,7 +90,24 @@ public abstract class Chunk {
    * constructed the chunk. It is thread-safe against other threads calling alloc(), who will block
    * until the allocation is complete.
    */
-  public abstract void init();
+  public void init() {
+    assert nextFreeOffset.get() == UNINITIALIZED;
+    try {
+      allocateDataBuffer();
+    } catch (OutOfMemoryError e) {
+      boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
+      assert failInit; // should be true.
+      throw e;
+    }
+    // Mark that it's ready for use
+    // Move 8 bytes since the first 8 bytes are having the chunkid in it
+    boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, Bytes.SIZEOF_LONG);
+    // We should always succeed the above CAS since only one thread
+    // calls init()!
+    Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
+  }
+
+  abstract void allocateDataBuffer();
 
   /**
    * Reset the offset to UNINITIALIZED before before reusing an old chunk
@@ -74,7 +121,8 @@ public abstract class Chunk {
 
   /**
    * Try to allocate <code>size</code> bytes from the chunk.
-   *
+   * If a chunk is tried to get allocated before init() call, the thread doing the allocation
+   * will be in busy-wait state as it will keep looping till the nextFreeOffset is set.
    * @return the offset of the successful allocation, or -1 to indicate not-enough-space
    */
   public int alloc(int size) {
@@ -96,7 +144,7 @@ public abstract class Chunk {
       if (oldOffset + size > data.capacity()) {
         return -1; // alloc doesn't fit
       }
-
+      // TODO : If seqID is to be written add 8 bytes here for nextFreeOFfset
       // Try to atomically claim this chunk
       if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
         // we got the alloc

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
new file mode 100644
index 0000000..073fb25
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
@@ -0,0 +1,404 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.lang.ref.SoftReference;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated
+ * with every chunk
+ */
+@InterfaceAudience.Private
+public class ChunkCreator {
+  private final Log LOG = LogFactory.getLog(ChunkCreator.class);
+  // monotonically increasing chunkid
+  private AtomicInteger chunkID = new AtomicInteger(1);
+  // maps the chunk against the monotonically increasing chunk id. We need to preserve the
+  // natural ordering of the key
+  // CellChunkMap creation should convert the soft ref to hard reference
+  private Map<Integer, SoftReference<Chunk>> chunkIdMap =
+      new ConcurrentHashMap<Integer, SoftReference<Chunk>>();
+  private final int chunkSize;
+  private final boolean offheap;
+  @VisibleForTesting
+  static ChunkCreator INSTANCE;
+  @VisibleForTesting
+  static boolean chunkPoolDisabled = false;
+  private MemStoreChunkPool pool;
+
+  @VisibleForTesting
+  ChunkCreator(int chunkSize, boolean offheap, long globalMemStoreSize, float poolSizePercentage,
+      float initialCountPercentage, HeapMemoryManager heapMemoryManager) {
+    this.chunkSize = chunkSize;
+    this.offheap = offheap;
+    this.pool = initializePool(globalMemStoreSize, poolSizePercentage, initialCountPercentage);
+    if (heapMemoryManager != null && this.pool != null) {
+      // Register with Heap Memory manager
+      heapMemoryManager.registerTuneObserver(this.pool);
+    }
+  }
+
+  /**
+   * Initializes the instance of MSLABChunkCreator
+   * @param chunkSize the chunkSize
+   * @param offheap indicates if the chunk is to be created offheap or not
+   * @param globalMemStoreSize  the global memstore size
+   * @param poolSizePercentage pool size percentage
+   * @param initialCountPercentage the initial count of the chunk pool if any
+   * @param heapMemoryManager the heapmemory manager
+   * @return singleton MSLABChunkCreator
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC",
+      justification = "Method is called by single thread at the starting of RS")
+  @VisibleForTesting
+  public static ChunkCreator initialize(int chunkSize, boolean offheap, long globalMemStoreSize,
+      float poolSizePercentage, float initialCountPercentage, HeapMemoryManager heapMemoryManager) {
+    if (INSTANCE != null) return INSTANCE;
+    INSTANCE = new ChunkCreator(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
+        initialCountPercentage, heapMemoryManager);
+    return INSTANCE;
+  }
+
+  static ChunkCreator getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Creates and inits a chunk.
+   * @return the chunk that was initialized
+   */
+  Chunk getChunk() {
+    Chunk chunk = null;
+    if (pool != null) {
+      //  the pool creates the chunk internally. The chunk#init() call happens here
+      chunk = this.pool.getChunk();
+      // the pool has run out of maxCount
+      if (chunk == null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("The chunk pool is full. Reached maxCount= " + this.pool.getMaxCount()
+              + ". Creating chunk onheap.");
+        }
+      }
+    }
+    if (chunk == null) {
+      chunk = createChunk();
+    }
+    // put this chunk into the chunkIdMap
+    this.chunkIdMap.put(chunk.getId(), new SoftReference<>(chunk));
+    // now we need to actually do the expensive memory allocation step in case of a new chunk,
+    // else only the offset is set to the beginning of the chunk to accept allocations
+    chunk.init();
+    return chunk;
+  }
+
+  private Chunk createChunk() {
+    return createChunk(false);
+  }
+
+  /**
+   * Creates the chunk either onheap or offheap
+   * @param pool indicates if the chunks have to be created which will be used by the Pool
+   * @return the chunk
+   */
+  private Chunk createChunk(boolean pool) {
+    int id = chunkID.getAndIncrement();
+    assert id > 0;
+    // do not create offheap chunk on demand
+    if (pool && this.offheap) {
+      return new OffheapChunk(chunkSize, id, pool);
+    } else {
+      return new OnheapChunk(chunkSize, id, pool);
+    }
+  }
+
+  @VisibleForTesting
+  // TODO : To be used by CellChunkMap
+  Chunk getChunk(int id) {
+    SoftReference<Chunk> ref = chunkIdMap.get(id);
+    if (ref != null) {
+      return ref.get();
+    }
+    return null;
+  }
+
+  int getChunkSize() {
+    return this.chunkSize;
+  }
+
+  boolean isOffheap() {
+    return this.offheap;
+  }
+
+  private void removeChunks(Set<Integer> chunkIDs) {
+    this.chunkIdMap.keySet().removeAll(chunkIDs);
+  }
+
+  Chunk removeChunk(int chunkId) {
+    SoftReference<Chunk> ref = this.chunkIdMap.remove(chunkId);
+    if (ref != null) {
+      return ref.get();
+    }
+    return null;
+  }
+
+  @VisibleForTesting
+  int size() {
+    return this.chunkIdMap.size();
+  }
+
+  @VisibleForTesting
+  void clearChunkIds() {
+    this.chunkIdMap.clear();
+  }
+
+  /**
+   * A pool of {@link Chunk} instances.
+   *
+   * MemStoreChunkPool caches a number of retired chunks for reusing, it could
+   * decrease allocating bytes when writing, thereby optimizing the garbage
+   * collection on JVM.
+   */
+  private  class MemStoreChunkPool implements HeapMemoryTuneObserver {
+    private int maxCount;
+
+    // A queue of reclaimed chunks
+    private final BlockingQueue<Chunk> reclaimedChunks;
+    private final float poolSizePercentage;
+
+    /** Statistics thread schedule pool */
+    private final ScheduledExecutorService scheduleThreadPool;
+    /** Statistics thread */
+    private static final int statThreadPeriod = 60 * 5;
+    private final AtomicLong chunkCount = new AtomicLong();
+    private final AtomicLong reusedChunkCount = new AtomicLong();
+
+    MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) {
+      this.maxCount = maxCount;
+      this.poolSizePercentage = poolSizePercentage;
+      this.reclaimedChunks = new LinkedBlockingQueue<>();
+      for (int i = 0; i < initialCount; i++) {
+        Chunk chunk = createChunk(true);
+        chunk.init();
+        reclaimedChunks.add(chunk);
+      }
+      chunkCount.set(initialCount);
+      final String n = Thread.currentThread().getName();
+      scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
+          .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build());
+      this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod,
+          statThreadPeriod, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have
+     * not yet created max allowed chunks count. When we have already created max allowed chunks and
+     * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk
+     * then.
+     * Note: Chunks returned by this pool must be put back to the pool after its use.
+     * @return a chunk
+     * @see #putbackChunks(Set)
+     */
+    Chunk getChunk() {
+      Chunk chunk = reclaimedChunks.poll();
+      if (chunk != null) {
+        chunk.reset();
+        reusedChunkCount.incrementAndGet();
+      } else {
+        // Make a chunk iff we have not yet created the maxCount chunks
+        while (true) {
+          long created = this.chunkCount.get();
+          if (created < this.maxCount) {
+            if (this.chunkCount.compareAndSet(created, created + 1)) {
+              chunk = createChunk(true);
+              break;
+            }
+          } else {
+            break;
+          }
+        }
+      }
+      return chunk;
+    }
+
+    /**
+     * Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining
+     * chunks
+     * @param chunks
+     */
+    private void putbackChunks(Set<Integer> chunks) {
+      int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
+      Iterator<Integer> iterator = chunks.iterator();
+      while (iterator.hasNext()) {
+        Integer chunkId = iterator.next();
+        // remove the chunks every time though they are from the pool or not
+        Chunk chunk = ChunkCreator.this.removeChunk(chunkId);
+        if (chunk != null) {
+          if (chunk.isFromPool() && toAdd > 0) {
+            reclaimedChunks.add(chunk);
+          }
+          toAdd--;
+        }
+      }
+    }
+
+    private class StatisticsThread extends Thread {
+      StatisticsThread() {
+        super("MemStoreChunkPool.StatisticsThread");
+        setDaemon(true);
+      }
+
+      @Override
+      public void run() {
+        logStats();
+      }
+
+      private void logStats() {
+        if (!LOG.isDebugEnabled()) return;
+        long created = chunkCount.get();
+        long reused = reusedChunkCount.get();
+        long total = created + reused;
+        LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
+            + ",created chunk count=" + created
+            + ",reused chunk count=" + reused
+            + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
+                (float) reused / (float) total, 2)));
+      }
+    }
+
+    private int getMaxCount() {
+      return this.maxCount;
+    }
+
+    @Override
+    public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
+      // don't do any tuning in case of offheap memstore
+      if (isOffheap()) {
+        LOG.warn("Not tuning the chunk pool as it is offheap");
+        return;
+      }
+      int newMaxCount =
+          (int) (newMemstoreSize * poolSizePercentage / getChunkSize());
+      if (newMaxCount != this.maxCount) {
+        // We need an adjustment in the chunks numbers
+        if (newMaxCount > this.maxCount) {
+          // Max chunks getting increased. Just change the variable. Later calls to getChunk() would
+          // create and add them to Q
+          LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount);
+          this.maxCount = newMaxCount;
+        } else {
+          // Max chunks getting decreased. We may need to clear off some of the pooled chunks now
+          // itself. If the extra chunks are serving already, do not pool those when we get them back
+          LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount);
+          this.maxCount = newMaxCount;
+          if (this.reclaimedChunks.size() > newMaxCount) {
+            synchronized (this) {
+              while (this.reclaimedChunks.size() > newMaxCount) {
+                this.reclaimedChunks.poll();
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  static void clearDisableFlag() {
+    chunkPoolDisabled = false;
+  }
+
+  private MemStoreChunkPool initializePool(long globalMemStoreSize, float poolSizePercentage,
+      float initialCountPercentage) {
+    if (poolSizePercentage <= 0) {
+      LOG.info("PoolSizePercentage is less than 0. So not using pool");
+      return null;
+    }
+    if (chunkPoolDisabled) {
+      return null;
+    }
+    if (poolSizePercentage > 1.0) {
+      throw new IllegalArgumentException(
+          MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
+    }
+    int maxCount = (int) (globalMemStoreSize * poolSizePercentage / getChunkSize());
+    if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
+      throw new IllegalArgumentException(
+          MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0");
+    }
+    int initialCount = (int) (initialCountPercentage * maxCount);
+    LOG.info("Allocating MemStoreChunkPool with chunk size "
+        + StringUtils.byteDesc(getChunkSize()) + ", max count " + maxCount
+        + ", initial count " + initialCount);
+    return new MemStoreChunkPool(maxCount, initialCount, poolSizePercentage);
+  }
+
+  @VisibleForTesting
+  int getMaxCount() {
+    if (pool != null) {
+      return pool.getMaxCount();
+    }
+    return 0;
+  }
+
+  @VisibleForTesting
+  int getPoolSize() {
+    if (pool != null) {
+      return pool.reclaimedChunks.size();
+    }
+    return 0;
+  }
+
+  /*
+   * Only used in testing
+   */
+  @VisibleForTesting
+  void clearChunksInPool() {
+    if (pool != null) {
+      pool.reclaimedChunks.clear();
+    }
+  }
+
+  synchronized void putbackChunks(Set<Integer> chunks) {
+    if (pool != null) {
+      pool.putbackChunks(chunks);
+    } else {
+      this.removeChunks(chunks);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index b3b5113..41eb0a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1472,7 +1472,7 @@ public class HRegionServer extends HasThread implements
       startServiceThreads();
       startHeapMemoryManager();
       // Call it after starting HeapMemoryManager.
-      initializeMemStoreChunkPool();
+      initializeMemStoreChunkCreator();
       LOG.info("Serving as " + this.serverName +
         ", RpcServer on " + rpcServices.isa +
         ", sessionid=0x" +
@@ -1492,7 +1492,7 @@ public class HRegionServer extends HasThread implements
     }
   }
 
-  private void initializeMemStoreChunkPool() {
+  protected void initializeMemStoreChunkCreator() {
     if (MemStoreLAB.isEnabled(conf)) {
       // MSLAB is enabled. So initialize MemStoreChunkPool
       // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
@@ -1506,12 +1506,10 @@ public class HRegionServer extends HasThread implements
       float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
           MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
       int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
-      MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage,
-          initialCountPercentage, chunkSize, offheap);
-      if (pool != null && this.hMemManager != null) {
-        // Register with Heap Memory manager
-        this.hMemManager.registerTuneObserver(pool);
-      }
+      // init the chunkCreator
+      ChunkCreator chunkCreator =
+          ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
+            initialCountPercentage, this.hMemManager);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
deleted file mode 100644
index b7ac212..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
-import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * A pool of {@link Chunk} instances.
- * 
- * MemStoreChunkPool caches a number of retired chunks for reusing, it could
- * decrease allocating bytes when writing, thereby optimizing the garbage
- * collection on JVM.
- * 
- * The pool instance is globally unique and could be obtained through
- * {@link MemStoreChunkPool#initialize(long, float, float, int, boolean)}
- * 
- * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating
- * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called
- * when MemStore clearing snapshot for flush
- */
-@SuppressWarnings("javadoc")
-@InterfaceAudience.Private
-public class MemStoreChunkPool implements HeapMemoryTuneObserver {
-  private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class);
-
-  // Static reference to the MemStoreChunkPool
-  static MemStoreChunkPool GLOBAL_INSTANCE;
-  /** Boolean whether we have disabled the memstore chunk pool entirely. */
-  static boolean chunkPoolDisabled = false;
-
-  private int maxCount;
-
-  // A queue of reclaimed chunks
-  private final BlockingQueue<Chunk> reclaimedChunks;
-  private final int chunkSize;
-  private final float poolSizePercentage;
-
-  /** Statistics thread schedule pool */
-  private final ScheduledExecutorService scheduleThreadPool;
-  /** Statistics thread */
-  private static final int statThreadPeriod = 60 * 5;
-  private final AtomicLong chunkCount = new AtomicLong();
-  private final AtomicLong reusedChunkCount = new AtomicLong();
-  private final boolean offheap;
-
-  MemStoreChunkPool(int chunkSize, int maxCount, int initialCount, float poolSizePercentage,
-      boolean offheap) {
-    this.maxCount = maxCount;
-    this.chunkSize = chunkSize;
-    this.poolSizePercentage = poolSizePercentage;
-    this.offheap = offheap;
-    this.reclaimedChunks = new LinkedBlockingQueue<>();
-    for (int i = 0; i < initialCount; i++) {
-      Chunk chunk = this.offheap ? new OffheapChunk(chunkSize) : new OnheapChunk(chunkSize);
-      chunk.init();
-      reclaimedChunks.add(chunk);
-    }
-    chunkCount.set(initialCount);
-    final String n = Thread.currentThread().getName();
-    scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
-        .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build());
-    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod,
-        statThreadPeriod, TimeUnit.SECONDS);
-  }
-
-  /**
-   * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have
-   * not yet created max allowed chunks count. When we have already created max allowed chunks and
-   * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk
-   * then.
-   * Note: Chunks returned by this pool must be put back to the pool after its use.
-   * @return a chunk
-   * @see #putbackChunk(Chunk)
-   * @see #putbackChunks(BlockingQueue)
-   */
-  Chunk getChunk() {
-    Chunk chunk = reclaimedChunks.poll();
-    if (chunk != null) {
-      chunk.reset();
-      reusedChunkCount.incrementAndGet();
-    } else {
-      // Make a chunk iff we have not yet created the maxCount chunks
-      while (true) {
-        long created = this.chunkCount.get();
-        if (created < this.maxCount) {
-          chunk = this.offheap ? new OffheapChunk(this.chunkSize) : new OnheapChunk(this.chunkSize);
-          if (this.chunkCount.compareAndSet(created, created + 1)) {
-            break;
-          }
-        } else {
-          break;
-        }
-      }
-    }
-    return chunk;
-  }
-
-  /**
-   * Add the chunks to the pool, when the pool achieves the max size, it will
-   * skip the remaining chunks
-   * @param chunks
-   */
-  synchronized void putbackChunks(BlockingQueue<Chunk> chunks) {
-    int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
-    Chunk chunk = null;
-    while ((chunk = chunks.poll()) != null && toAdd > 0) {
-      reclaimedChunks.add(chunk);
-      toAdd--;
-    }
-  }
-
-  /**
-   * Add the chunk to the pool, if the pool has achieved the max size, it will
-   * skip it
-   * @param chunk
-   */
-  synchronized void putbackChunk(Chunk chunk) {
-    if (reclaimedChunks.size() < this.maxCount) {
-      reclaimedChunks.add(chunk);
-    }
-  }
-
-  int getPoolSize() {
-    return this.reclaimedChunks.size();
-  }
-
-  /*
-   * Only used in testing
-   */
-  void clearChunks() {
-    this.reclaimedChunks.clear();
-  }
-
-  private class StatisticsThread extends Thread {
-    StatisticsThread() {
-      super("MemStoreChunkPool.StatisticsThread");
-      setDaemon(true);
-    }
-
-    @Override
-    public void run() {
-      logStats();
-    }
-
-    private void logStats() {
-      if (!LOG.isDebugEnabled()) return;
-      long created = chunkCount.get();
-      long reused = reusedChunkCount.get();
-      long total = created + reused;
-      LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
-          + ",created chunk count=" + created
-          + ",reused chunk count=" + reused
-          + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
-              (float) reused / (float) total, 2)));
-    }
-  }
-
-  /**
-   * @return the global MemStoreChunkPool instance
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC",
-      justification = "Method is called by single thread at the starting of RS")
-  static MemStoreChunkPool initialize(long globalMemStoreSize, float poolSizePercentage,
-      float initialCountPercentage, int chunkSize, boolean offheap) {
-    if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE;
-    if (chunkPoolDisabled) return null;
-
-    if (poolSizePercentage <= 0) {
-      chunkPoolDisabled = true;
-      return null;
-    }
-    if (poolSizePercentage > 1.0) {
-      throw new IllegalArgumentException(
-          MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
-    }
-    int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize);
-    if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
-      throw new IllegalArgumentException(
-          MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0");
-    }
-    int initialCount = (int) (initialCountPercentage * maxCount);
-    LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
-        + ", max count " + maxCount + ", initial count " + initialCount);
-    GLOBAL_INSTANCE = new MemStoreChunkPool(chunkSize, maxCount, initialCount, poolSizePercentage,
-        offheap);
-    return GLOBAL_INSTANCE;
-  }
-
-  /**
-   * @return The singleton instance of this pool.
-   */
-  static MemStoreChunkPool getPool() {
-    return GLOBAL_INSTANCE;
-  }
-
-  int getMaxCount() {
-    return this.maxCount;
-  }
-
-  @VisibleForTesting
-  static void clearDisableFlag() {
-    chunkPoolDisabled = false;
-  }
-
-  @Override
-  public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
-    // don't do any tuning in case of offheap memstore
-    if (this.offheap) {
-      LOG.warn("Not tuning the chunk pool as it is offheap");
-      return;
-    }
-    int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize);
-    if (newMaxCount != this.maxCount) {
-      // We need an adjustment in the chunks numbers
-      if (newMaxCount > this.maxCount) {
-        // Max chunks getting increased. Just change the variable. Later calls to getChunk() would
-        // create and add them to Q
-        LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount);
-        this.maxCount = newMaxCount;
-      } else {
-        // Max chunks getting decreased. We may need to clear off some of the pooled chunks now
-        // itself. If the extra chunks are serving already, do not pool those when we get them back
-        LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount);
-        this.maxCount = newMaxCount;
-        if (this.reclaimedChunks.size() > newMaxCount) {
-          synchronized (this) {
-            while (this.reclaimedChunks.size() > newMaxCount) {
-              this.reclaimedChunks.poll();
-            }
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
index f6d1607..72e937c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
  * <p>
  * The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) chunks from
  * and then doles it out to threads that request slices into the array. These chunks can get pooled
- * as well. See {@link MemStoreChunkPool}.
+ * as well. See {@link ChunkCreator}.
  * <p>
  * The purpose of this is to combat heap fragmentation in the regionserver. By ensuring that all
  * Cells in a given memstore refer only to large chunks of contiguous memory, we ensure that
@@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
  * {@link #copyCellInto(Cell)} gets called. This allocates enough size in the chunk to hold this
  * cell's data and copies into this area and then recreate a Cell over this copied data.
  * <p>
- * @see MemStoreChunkPool
+ * @see ChunkCreator
  */
 @InterfaceAudience.Private
 public interface MemStoreLAB {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index 4e87135..4fba82d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -18,23 +18,26 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.nio.ByteBuffer;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ExtendedCell;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-
 /**
  * A memstore-local allocation buffer.
  * <p>
@@ -55,8 +58,8 @@ import com.google.common.base.Preconditions;
  * would provide a performance improvement - probably would speed up the
  * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
  * anyway.
- * The chunks created by this MemStoreLAB can get pooled at {@link MemStoreChunkPool}.
- * When the Chunk comes pool, it can be either an on heap or an off heap backed chunk. The chunks,
+ * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}.
+ * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks,
  * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be
  * always on heap backed.
  */
@@ -66,14 +69,15 @@ public class MemStoreLABImpl implements MemStoreLAB {
   static final Log LOG = LogFactory.getLog(MemStoreLABImpl.class);
 
   private AtomicReference<Chunk> curChunk = new AtomicReference<>();
-  // A queue of chunks from pool contained by this memstore LAB
-  // TODO: in the future, it would be better to have List implementation instead of Queue,
-  // as FIFO order is not so important here
+  // Lock to manage multiple handlers requesting for a chunk
+  private ReentrantLock lock = new ReentrantLock();
+
+  // A set of chunks contained by this memstore LAB
   @VisibleForTesting
-  BlockingQueue<Chunk> pooledChunkQueue = null;
+  Set<Integer> chunks = new ConcurrentSkipListSet<Integer>();
   private final int chunkSize;
   private final int maxAlloc;
-  private final MemStoreChunkPool chunkPool;
+  private final ChunkCreator chunkCreator;
 
   // This flag is for closing this instance, its set when clearing snapshot of
   // memstore
@@ -92,20 +96,12 @@ public class MemStoreLABImpl implements MemStoreLAB {
   public MemStoreLABImpl(Configuration conf) {
     chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
     maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
-    this.chunkPool = MemStoreChunkPool.getPool();
-    // currently chunkQueue is only used for chunkPool
-    if (this.chunkPool != null) {
-      // set queue length to chunk pool max count to avoid keeping reference of
-      // too many non-reclaimable chunks
-      pooledChunkQueue = new LinkedBlockingQueue<>(chunkPool.getMaxCount());
-    }
-
+    this.chunkCreator = ChunkCreator.getInstance();
     // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
     Preconditions.checkArgument(maxAlloc <= chunkSize,
         MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
   }
 
-
   @Override
   public Cell copyCellInto(Cell cell) {
     int size = KeyValueUtil.length(cell);
@@ -118,19 +114,52 @@ public class MemStoreLABImpl implements MemStoreLAB {
     Chunk c = null;
     int allocOffset = 0;
     while (true) {
+      // Try to get the chunk
       c = getOrMakeChunk();
+      // we may get null because the some other thread succeeded in getting the lock
+      // and so the current thread has to try again to make its chunk or grab the chunk
+      // that the other thread created
       // Try to allocate from this chunk
-      allocOffset = c.alloc(size);
-      if (allocOffset != -1) {
-        // We succeeded - this is the common case - small alloc
-        // from a big buffer
-        break;
+      if (c != null) {
+        allocOffset = c.alloc(size);
+        if (allocOffset != -1) {
+          // We succeeded - this is the common case - small alloc
+          // from a big buffer
+          break;
+        }
+        // not enough space!
+        // try to retire this chunk
+        tryRetireChunk(c);
       }
-      // not enough space!
-      // try to retire this chunk
-      tryRetireChunk(c);
     }
-    return CellUtil.copyCellTo(cell, c.getData(), allocOffset, size);
+    return copyToChunkCell(cell, c.getData(), allocOffset, size);
+  }
+
+  /**
+   * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
+   * out of it
+   */
+  private Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) {
+    int tagsLen = cell.getTagsLength();
+    if (cell instanceof ExtendedCell) {
+      ((ExtendedCell) cell).write(buf, offset);
+    } else {
+      // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
+      // other case also. The data fragments within Cell is copied into buf as in KeyValue
+      // serialization format only.
+      KeyValueUtil.appendTo(cell, buf, offset, true);
+    }
+    // TODO : write the seqid here. For writing seqId we should create a new cell type so
+    // that seqId is not used as the state
+    if (tagsLen == 0) {
+      // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
+      // which directly return tagsLen as 0. So we avoid parsing many length components in
+      // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
+      // call getTagsLength().
+      return new NoTagByteBufferChunkCell(buf, offset, len, cell.getSequenceId());
+    } else {
+      return new ByteBufferChunkCell(buf, offset, len, cell.getSequenceId());
+    }
   }
 
   /**
@@ -142,9 +171,9 @@ public class MemStoreLABImpl implements MemStoreLAB {
     this.closed = true;
     // We could put back the chunks to pool for reusing only when there is no
     // opening scanner which will read their data
-    if (chunkPool != null && openScannerCount.get() == 0
-        && reclaimed.compareAndSet(false, true)) {
-      chunkPool.putbackChunks(this.pooledChunkQueue);
+    int count  = openScannerCount.get();
+    if(count == 0) {
+      recycleChunks();
     }
   }
 
@@ -162,9 +191,14 @@ public class MemStoreLABImpl implements MemStoreLAB {
   @Override
   public void decScannerCount() {
     int count = this.openScannerCount.decrementAndGet();
-    if (this.closed && chunkPool != null && count == 0
-        && reclaimed.compareAndSet(false, true)) {
-      chunkPool.putbackChunks(this.pooledChunkQueue);
+    if (this.closed && count == 0) {
+      recycleChunks();
+    }
+  }
+
+  private void recycleChunks() {
+    if (reclaimed.compareAndSet(false, true)) {
+      chunkCreator.putbackChunks(chunks);
     }
   }
 
@@ -190,45 +224,33 @@ public class MemStoreLABImpl implements MemStoreLAB {
    * allocate a new one from the JVM.
    */
   private Chunk getOrMakeChunk() {
-    while (true) {
-      // Try to get the chunk
-      Chunk c = curChunk.get();
-      if (c != null) {
-        return c;
-      }
-
-      // No current chunk, so we want to allocate one. We race
-      // against other allocators to CAS in an uninitialized chunk
-      // (which is cheap to allocate)
-      if (chunkPool != null) {
-        c = chunkPool.getChunk();
-      }
-      boolean pooledChunk = false;
-      if (c != null) {
-        // This is chunk from pool
-        pooledChunk = true;
-      } else {
-        c = new OnheapChunk(chunkSize);// When chunk is not from pool, always make it as on heap.
-      }
-      if (curChunk.compareAndSet(null, c)) {
-        // we won race - now we need to actually do the expensive
-        // allocation step
-        c.init();
-        if (pooledChunk) {
-          if (!this.closed && !this.pooledChunkQueue.offer(c)) {
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
-                  + pooledChunkQueue.size());
-            }
-          }
+    // Try to get the chunk
+    Chunk c = curChunk.get();
+    if (c != null) {
+      return c;
+    }
+    // No current chunk, so we want to allocate one. We race
+    // against other allocators to CAS in an uninitialized chunk
+    // (which is cheap to allocate)
+    if (lock.tryLock()) {
+      try {
+        // once again check inside the lock
+        c = curChunk.get();
+        if (c != null) {
+          return c;
         }
-        return c;
-      } else if (pooledChunk) {
-        chunkPool.putbackChunk(c);
+        c = this.chunkCreator.getChunk();
+        if (c != null) {
+          // set the curChunk. No need of CAS as only one thread will be here
+          curChunk.set(c);
+          chunks.add(c.getId());
+          return c;
+        }
+      } finally {
+        lock.unlock();
       }
-      // someone else won race - that's fine, we'll try to grab theirs
-      // in the next iteration of the loop.
     }
+    return null;
   }
 
   @VisibleForTesting
@@ -236,8 +258,15 @@ public class MemStoreLABImpl implements MemStoreLAB {
     return this.curChunk.get();
   }
 
-
+  @VisibleForTesting
   BlockingQueue<Chunk> getPooledChunks() {
-    return this.pooledChunkQueue;
+    BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>();
+    for (Integer id : this.chunks) {
+      Chunk chunk = chunkCreator.getChunk(id);
+      if (chunk != null && chunk.isFromPool()) {
+        pooledChunks.add(chunk);
+      }
+    }
+    return pooledChunks;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java
new file mode 100644
index 0000000..a8ba50c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.NoTagsByteBufferKeyValue;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+
+
+/**
+ * ByteBuffer based cell which has the chunkid at the 0th offset and with no tags
+ * @see MemStoreLAB
+ */
+@InterfaceAudience.Private
+public class NoTagByteBufferChunkCell extends NoTagsByteBufferKeyValue {
+
+  public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length) {
+    super(buf, offset, length);
+  }
+
+  public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) {
+    super(buf, offset, length, seqId);
+  }
+
+  @Override
+  public int getChunkId() {
+    // The chunkId is embedded at the 0th offset of the bytebuffer
+    return ByteBufferUtils.toInt(buf, 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
index ed98cfa..e244a33 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
@@ -21,34 +21,27 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
-import com.google.common.base.Preconditions;
-
 /**
  * An off heap chunk implementation.
  */
 @InterfaceAudience.Private
 public class OffheapChunk extends Chunk {
 
-  OffheapChunk(int size) {
-    super(size);
+  OffheapChunk(int size, int id) {
+    // better if this is always created fromPool. This should not be called
+    super(size, id);
+  }
+
+  OffheapChunk(int size, int id, boolean fromPool) {
+    super(size, id, fromPool);
+    assert fromPool == true;
   }
 
   @Override
-  public void init() {
-    assert nextFreeOffset.get() == UNINITIALIZED;
-    try {
-      if (data == null) {
-        data = ByteBuffer.allocateDirect(this.size);
-      }
-    } catch (OutOfMemoryError e) {
-      boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
-      assert failInit; // should be true.
-      throw e;
+  void allocateDataBuffer() {
+    if (data == null) {
+      data = ByteBuffer.allocateDirect(this.size);
+      data.putLong(0, this.getId());
     }
-    // Mark that it's ready for use
-    boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
-    // We should always succeed the above CAS since only one thread
-    // calls init()!
-    Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
index bd33cb5..da34e24 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
@@ -21,33 +21,25 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
-import com.google.common.base.Preconditions;
-
 /**
  * An on heap chunk implementation.
  */
 @InterfaceAudience.Private
 public class OnheapChunk extends Chunk {
 
-  OnheapChunk(int size) {
-    super(size);
+  OnheapChunk(int size, int id) {
+    super(size, id);
+  }
+
+  OnheapChunk(int size, int id, boolean fromPool) {
+    super(size, id, fromPool);
   }
 
-  public void init() {
-    assert nextFreeOffset.get() == UNINITIALIZED;
-    try {
-      if (data == null) {
-        data = ByteBuffer.allocate(this.size);
-      }
-    } catch (OutOfMemoryError e) {
-      boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
-      assert failInit; // should be true.
-      throw e;
+  @Override
+  void allocateDataBuffer() {
+    if (data == null) {
+      data = ByteBuffer.allocate(this.size);
+      data.putLong(0, this.getId());
     }
-    // Mark that it's ready for use
-    boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
-    // We should always succeed the above CAS since only one thread
-    // calls init()!
-    Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 82c2eab..6563122 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -96,6 +96,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -2426,6 +2428,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
   public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
       final Configuration conf, final HTableDescriptor htd, boolean initialize)
       throws IOException {
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     WAL wal = createWal(conf, rootDir, info);
     return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
index 422c54b..8d8b6df 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -49,8 +50,10 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -397,6 +400,7 @@ public class TestCoprocessorInterface {
     for(byte [] family : families) {
       htd.addFamily(new HColumnDescriptor(family));
     }
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     HRegionInfo info = new HRegionInfo(tableName, null, null, false);
     Path path = new Path(DIR + callingMethod);
     Region r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
index 80d0e3a..b99087d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
@@ -47,10 +47,12 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -152,6 +154,7 @@ public class TestRegionObserverScannerOpenHook {
     for (byte[] family : families) {
       htd.addFamily(new HColumnDescriptor(family));
     }
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
     Path path = new Path(DIR + callingMethod);
     WAL wal = HBaseTestingUtility.createWal(conf, path, info);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
index 2e44dee..15d449d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
@@ -34,7 +34,9 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
@@ -100,6 +102,7 @@ public class TestRegionObserverStacking extends TestCase {
     for(byte [] family : families) {
       htd.addFamily(new HColumnDescriptor(family));
     }
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
     Path path = new Path(DIR + callingMethod);
     HRegion r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
index f1775d0..fae7247 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
@@ -40,8 +40,10 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -342,6 +344,7 @@ public class TestScannerFromBucketCache {
   private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
       String callingMethod, Configuration conf, HBaseTestingUtility test_util, boolean isReadOnly,
       byte[]... families) throws IOException {
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     Path logDir = test_util.getDataTestDirOnTestFS(callingMethod + ".log");
     HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
     final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
index cc73d9d..32bce26 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
@@ -65,6 +65,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActi
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
 import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -73,6 +75,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.hadoop.hbase.util.Triple;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -88,6 +91,10 @@ public class TestCatalogJanitor {
   @Rule
   public TestName name = new TestName();
 
+  @BeforeClass
+  public static void setup() throws Exception {
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
+  }
   /**
    * Mock MasterServices for tests below.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
index 418aadf..096c5ef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
@@ -241,7 +241,7 @@ public class TestBulkLoad {
     for (byte[] family : families) {
       hTableDescriptor.addFamily(new HColumnDescriptor(family));
     }
-
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     // TODO We need a way to do this without creating files
     return HRegion.createHRegion(hRegionInfo,
         new Path(testFolder.newFolder().toURI()),

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
index 3b4d068..09877b0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
@@ -73,7 +73,7 @@ public class TestCellFlatSet extends TestCase {
     descCbOnHeap = new CellArrayMap(CellComparator.COMPARATOR,descCells,0,NUM_OF_CELLS,true);
     CONF.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
     CONF.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
-    MemStoreChunkPool.chunkPoolDisabled = false;
+    ChunkCreator.chunkPoolDisabled = false;
   }
 
   /* Create and test CellSet based on CellArrayMap */

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index a888c45..9e90f3e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,7 +51,7 @@ import static org.junit.Assert.assertTrue;
 public class TestCompactingMemStore extends TestDefaultMemStore {
 
   private static final Log LOG = LogFactory.getLog(TestCompactingMemStore.class);
-  protected static MemStoreChunkPool chunkPool;
+  protected static ChunkCreator chunkCreator;
   protected HRegion region;
   protected RegionServicesForStores regionServicesForStores;
   protected HStore store;
@@ -65,7 +66,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
   @After
   public void tearDown() throws Exception {
-    chunkPool.clearChunks();
+    chunkCreator.clearChunksInPool();
   }
 
   @Override
@@ -84,15 +85,21 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
     HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf);
     HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
-    this.region = hbaseUtility.createTestRegion("foobar", hcd);
+    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar"));
+    htd.addFamily(hcd);
+    HRegionInfo info =
+        new HRegionInfo(TableName.valueOf("foobar"), null, null, false);
+    WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info);
+    this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true);
+    //this.region = hbaseUtility.createTestRegion("foobar", hcd);
     this.regionServicesForStores = region.getRegionServicesForStores();
     this.store = new HStore(region, hcd, conf);
 
     long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
         .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
-    chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f,
-        MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false);
-    assertTrue(chunkPool != null);
+    chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
+      globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
+    assertTrue(chunkCreator != null);
   }
 
   /**
@@ -390,7 +397,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     }
     memstore.clearSnapshot(snapshot.getId());
 
-    int chunkCount = chunkPool.getPoolSize();
+    int chunkCount = chunkCreator.getPoolSize();
     assertTrue(chunkCount > 0);
 
   }
@@ -434,16 +441,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     }
     memstore.clearSnapshot(snapshot.getId());
 
-    assertTrue(chunkPool.getPoolSize() == 0);
+    assertTrue(chunkCreator.getPoolSize() == 0);
 
     // Chunks will be put back to pool after close scanners;
     for (KeyValueScanner scanner : scanners) {
       scanner.close();
     }
-    assertTrue(chunkPool.getPoolSize() > 0);
+    assertTrue(chunkCreator.getPoolSize() > 0);
 
     // clear chunks
-    chunkPool.clearChunks();
+    chunkCreator.clearChunksInPool();
 
     // Creating another snapshot
 
@@ -464,7 +471,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       scanner.close();
     }
     memstore.clearSnapshot(snapshot.getId());
-    assertTrue(chunkPool.getPoolSize() > 0);
+    assertTrue(chunkCreator.getPoolSize() > 0);
   }
 
   @Test
@@ -516,16 +523,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     memstore.add(new KeyValue(row, fam, qf1, 3, val), null);
     assertEquals(3, memstore.getActive().getCellsCount());
 
-    assertTrue(chunkPool.getPoolSize() == 0);
+    assertTrue(chunkCreator.getPoolSize() == 0);
 
     // Chunks will be put back to pool after close scanners;
     for (KeyValueScanner scanner : scanners) {
       scanner.close();
     }
-    assertTrue(chunkPool.getPoolSize() > 0);
+    assertTrue(chunkCreator.getPoolSize() > 0);
 
     // clear chunks
-    chunkPool.clearChunks();
+    chunkCreator.clearChunksInPool();
 
     // Creating another snapshot
 
@@ -553,7 +560,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       scanner.close();
     }
     memstore.clearSnapshot(snapshot.getId());
-    assertTrue(chunkPool.getPoolSize() > 0);
+    assertTrue(chunkCreator.getPoolSize() > 0);
   }
 
   //////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
index 5a48455..66e107a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
@@ -44,17 +44,13 @@ import java.util.List;
 public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore {
 
   private static final Log LOG = LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class);
-  //private static MemStoreChunkPool chunkPool;
-  //private HRegion region;
-  //private RegionServicesForStores regionServicesForStores;
-  //private HStore store;
 
   //////////////////////////////////////////////////////////////////////////////
   // Helpers
   //////////////////////////////////////////////////////////////////////////////
 
   @Override public void tearDown() throws Exception {
-    chunkPool.clearChunks();
+    chunkCreator.clearChunksInPool();
   }
 
   @Override public void setUp() throws Exception {
@@ -408,16 +404,16 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
     }
     memstore.clearSnapshot(snapshot.getId());
 
-    assertTrue(chunkPool.getPoolSize() == 0);
+    assertTrue(chunkCreator.getPoolSize() == 0);
 
     // Chunks will be put back to pool after close scanners;
     for (KeyValueScanner scanner : scanners) {
       scanner.close();
     }
-    assertTrue(chunkPool.getPoolSize() > 0);
+    assertTrue(chunkCreator.getPoolSize() > 0);
 
     // clear chunks
-    chunkPool.clearChunks();
+    chunkCreator.clearChunksInPool();
 
     // Creating another snapshot
 
@@ -438,7 +434,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
       scanner.close();
     }
     memstore.clearSnapshot(snapshot.getId());
-    assertTrue(chunkPool.getPoolSize() > 0);
+    assertTrue(chunkCreator.getPoolSize() > 0);
   }
 
   @Test
@@ -472,7 +468,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
     }
     memstore.clearSnapshot(snapshot.getId());
 
-    int chunkCount = chunkPool.getPoolSize();
+    int chunkCount = chunkCreator.getPoolSize();
     assertTrue(chunkCount > 0);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java
index 8e85730..e320368 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java
@@ -164,6 +164,7 @@ public class TestCompactionArchiveConcurrentClose {
 
     HRegionFileSystem fs = new WaitingHRegionFileSystem(conf, tableDir.getFileSystem(conf),
         tableDir, info);
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     final Configuration walConf = new Configuration(conf);
     FSUtils.setRootDir(walConf, tableDir);
     final WALFactory wals = new WALFactory(walConf, null, "log_" + info.getEncodedName());

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java
index 89b2368..e7fcf18 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java
@@ -174,6 +174,7 @@ public class TestCompactionArchiveIOException {
   private HRegion initHRegion(HTableDescriptor htd, HRegionInfo info)
       throws IOException {
     Configuration conf = testUtil.getConfiguration();
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName());
     Path regionDir = new Path(tableDir, info.getEncodedName());
     Path storeDir = new Path(regionDir, htd.getColumnFamilies()[0].getNameAsString());

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
index 7154511..bff5bec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
@@ -104,6 +104,7 @@ public class TestCompactionPolicy {
     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
 
     hlog = new FSHLog(fs, basedir, logName, conf);
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     region = HRegion.createHRegion(info, basedir, conf, htd, hlog);
     region.close();
     Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 7434eb1..41b304b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.wal.WALFactory;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -84,6 +85,7 @@ public class TestDefaultMemStore {
   protected static final byte[] FAMILY = Bytes.toBytes("column");
   protected MultiVersionConcurrencyControl mvcc;
   protected AtomicLong startSeqNum = new AtomicLong(0);
+  protected ChunkCreator chunkCreator;
 
   private String getName() {
     return this.name.getMethodName();
@@ -92,9 +94,17 @@ public class TestDefaultMemStore {
   @Before
   public void setUp() throws Exception {
     internalSetUp();
+    // no pool
+    this.chunkCreator =
+        ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     this.memstore = new DefaultMemStore();
   }
 
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    ChunkCreator.getInstance().clearChunkIds();
+  }
+
   protected void internalSetUp() throws Exception {
     this.mvcc = new MultiVersionConcurrencyControl();
   }
@@ -129,7 +139,9 @@ public class TestDefaultMemStore {
       assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize());
       // make sure chunk size increased even when writing the same cell, if using MSLAB
       if (msLab instanceof MemStoreLABImpl) {
-        assertEquals(2 * Segment.getCellLength(kv),
+        // since we add the chunkID at the 0th offset of the chunk and the
+        // chunkid is a long we need to account for those 8 bytes
+        assertEquals(2 * Segment.getCellLength(kv) + Bytes.SIZEOF_LONG,
           ((MemStoreLABImpl) msLab).getCurrentChunk().getNextFreeOffset());
       }
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index 73fb9cf..24e850d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -266,6 +266,7 @@ public class TestFailedAppendAndSync {
    */
   public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
   throws IOException {
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL,
       wal, COLUMN_FAMILY_BYTES);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2c2178b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
index b416c7d..0f24a24 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@ -153,7 +153,7 @@ public class TestHMobStore {
 
     htd.addFamily(hcd);
     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
-
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     final Configuration walConf = new Configuration(conf);
     FSUtils.setRootDir(walConf, basedir);
     final WALFactory wals = new WALFactory(walConf, null, methodName);