You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2017/04/17 18:30:47 UTC

[2/2] hbase git commit: Revert "HBASE-16438 Create a cell type so that chunk id is embedded in it (Ram)"

Revert "HBASE-16438 Create a cell type so that chunk id is embedded in it (Ram)"

This reverts commit c2c2178b2eebe4439eadec6b37fae2566944c16b.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ecdfb823
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ecdfb823
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ecdfb823

Branch: refs/heads/master
Commit: ecdfb82326035ad8221940919bbeb3fe16ec2658
Parents: c2c2178
Author: Ramkrishna <ra...@intel.com>
Authored: Tue Apr 18 00:00:12 2017 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Tue Apr 18 00:00:12 2017 +0530

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/CellUtil.java  |  24 ++
 .../org/apache/hadoop/hbase/ExtendedCell.java   |  10 -
 .../org/apache/hadoop/hbase/master/HMaster.java |   2 -
 .../hbase/regionserver/ByteBufferChunkCell.java |  48 ---
 .../apache/hadoop/hbase/regionserver/Chunk.java |  60 +--
 .../hadoop/hbase/regionserver/ChunkCreator.java | 404 -------------------
 .../hbase/regionserver/HRegionServer.java       |  14 +-
 .../hbase/regionserver/MemStoreChunkPool.java   | 265 ++++++++++++
 .../hadoop/hbase/regionserver/MemStoreLAB.java  |   4 +-
 .../hbase/regionserver/MemStoreLABImpl.java     | 171 ++++----
 .../regionserver/NoTagByteBufferChunkCell.java  |  48 ---
 .../hadoop/hbase/regionserver/OffheapChunk.java |  31 +-
 .../hadoop/hbase/regionserver/OnheapChunk.java  |  32 +-
 .../hadoop/hbase/HBaseTestingUtility.java       |   3 -
 .../coprocessor/TestCoprocessorInterface.java   |   4 -
 .../TestRegionObserverScannerOpenHook.java      |   3 -
 .../coprocessor/TestRegionObserverStacking.java |   3 -
 .../io/hfile/TestScannerFromBucketCache.java    |   3 -
 .../hadoop/hbase/master/TestCatalogJanitor.java |   7 -
 .../hadoop/hbase/regionserver/TestBulkLoad.java |   2 +-
 .../hbase/regionserver/TestCellFlatSet.java     |   2 +-
 .../regionserver/TestCompactingMemStore.java    |  37 +-
 .../TestCompactingToCellArrayMapMemStore.java   |  16 +-
 .../TestCompactionArchiveConcurrentClose.java   |   1 -
 .../TestCompactionArchiveIOException.java       |   1 -
 .../regionserver/TestCompactionPolicy.java      |   1 -
 .../hbase/regionserver/TestDefaultMemStore.java |  14 +-
 .../regionserver/TestFailedAppendAndSync.java   |   1 -
 .../hbase/regionserver/TestHMobStore.java       |   2 +-
 .../hadoop/hbase/regionserver/TestHRegion.java  |   2 -
 .../regionserver/TestHRegionReplayEvents.java   |   2 +-
 .../regionserver/TestMemStoreChunkPool.java     |  48 +--
 .../hbase/regionserver/TestMemStoreLAB.java     |  27 +-
 .../TestMemstoreLABWithoutPool.java             | 168 --------
 .../hbase/regionserver/TestRecoveredEdits.java  |   1 -
 .../hbase/regionserver/TestRegionIncrement.java |   1 -
 .../hadoop/hbase/regionserver/TestStore.java    |   1 -
 .../TestStoreFileRefresherChore.java            |   1 -
 .../hbase/regionserver/TestWALLockup.java       |   1 -
 .../TestWALMonotonicallyIncreasingSeqId.java    |   1 -
 .../hbase/regionserver/wal/TestDurability.java  |   3 -
 41 files changed, 479 insertions(+), 990 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 56de21b..e1bc969 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -3135,4 +3135,28 @@ public final class CellUtil {
       return Type.DeleteFamily.getCode();
     }
   }
+
+  /**
+   * Clone the passed cell by copying its data into the passed buf.
+   */
+  public static Cell copyCellTo(Cell cell, ByteBuffer buf, int offset, int len) {
+    int tagsLen = cell.getTagsLength();
+    if (cell instanceof ExtendedCell) {
+      ((ExtendedCell) cell).write(buf, offset);
+    } else {
+      // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
+      // other case also. The data fragments within Cell is copied into buf as in KeyValue
+      // serialization format only.
+      KeyValueUtil.appendTo(cell, buf, offset, true);
+    }
+    if (tagsLen == 0) {
+      // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
+      // which directly return tagsLen as 0. So we avoid parsing many length components in
+      // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
+      // call getTagsLength().
+      return new NoTagsByteBufferKeyValue(buf, offset, len, cell.getSequenceId());
+    } else {
+      return new ByteBufferKeyValue(buf, offset, len, cell.getSequenceId());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
index 10f20ca..517873f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.io.HeapSize;
 public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestamp, HeapSize,
     Cloneable {
 
-  public static int CELL_NOT_BASED_ON_CHUNK = -1;
   /**
    * Write this cell to an OutputStream in a {@link KeyValue} format.
    * <br> KeyValue format <br>
@@ -74,13 +73,4 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam
    * @return The deep cloned cell
    */
   Cell deepClone();
-
-  /**
-   * Extracts the id of the backing bytebuffer of this cell if it was obtained from fixed sized
-   * chunks as in case of MemstoreLAB
-   * @return the chunk id if the cell is backed by fixed sized Chunks, else return -1
-   */
-  default int getChunkId() {
-    return CELL_NOT_BASED_ON_CHUNK;
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index f9670e1..bb9f282 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -748,8 +748,6 @@ public class HMaster extends HRegionServer implements MasterServices {
 
     this.masterActiveTime = System.currentTimeMillis();
     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
-    // Initialize the chunkCreator
-    initializeMemStoreChunkCreator();
     this.fileSystemManager = new MasterFileSystem(this);
     this.walManager = new MasterWalManager(this);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java
deleted file mode 100644
index a8f1000..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.ByteBufferKeyValue;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
-
-/**
- * ByteBuffer based cell which has the chunkid at the 0th offset
- * @see MemStoreLAB
- */
-//TODO : When moving this cell to CellChunkMap we will have the following things
-// to be serialized
-// chunkId (Integer) + offset (Integer) + length (Integer) + seqId (Long) = 20 bytes
-@InterfaceAudience.Private
-public class ByteBufferChunkCell extends ByteBufferKeyValue {
-  public ByteBufferChunkCell(ByteBuffer buf, int offset, int length) {
-    super(buf, offset, length);
-  }
-
-  public ByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) {
-    super(buf, offset, length, seqId);
-  }
-
-  @Override
-  public int getChunkId() {
-    // The chunkId is embedded at the 0th offset of the bytebuffer
-    return ByteBufferUtils.toInt(buf, 0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
index fc4aa0b..2cbf0a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
@@ -21,10 +21,8 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 /**
  * A chunk of memory out of which allocations are sliced.
@@ -48,41 +46,13 @@ public abstract class Chunk {
   /** Size of chunk in bytes */
   protected final int size;
 
-  // The unique id associated with the chunk.
-  private final int id;
-
-  // indicates if the chunk is formed by ChunkCreator#MemstorePool
-  private final boolean fromPool;
-
-  /**
-   * Create an uninitialized chunk. Note that memory is not allocated yet, so
-   * this is cheap.
-   * @param size in bytes
-   * @param id the chunk id
-   */
-  public Chunk(int size, int id) {
-    this(size, id, false);
-  }
-
   /**
-   * Create an uninitialized chunk. Note that memory is not allocated yet, so
-   * this is cheap.
+   * Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap.
+   *
    * @param size in bytes
-   * @param id the chunk id
-   * @param fromPool if the chunk is formed by pool
    */
-  public Chunk(int size, int id, boolean fromPool) {
+  Chunk(int size) {
     this.size = size;
-    this.id = id;
-    this.fromPool = fromPool;
-  }
-
-  int getId() {
-    return this.id;
-  }
-
-  boolean isFromPool() {
-    return this.fromPool;
   }
 
   /**
@@ -90,24 +60,7 @@ public abstract class Chunk {
    * constructed the chunk. It is thread-safe against other threads calling alloc(), who will block
    * until the allocation is complete.
    */
-  public void init() {
-    assert nextFreeOffset.get() == UNINITIALIZED;
-    try {
-      allocateDataBuffer();
-    } catch (OutOfMemoryError e) {
-      boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
-      assert failInit; // should be true.
-      throw e;
-    }
-    // Mark that it's ready for use
-    // Move 8 bytes since the first 8 bytes are having the chunkid in it
-    boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, Bytes.SIZEOF_LONG);
-    // We should always succeed the above CAS since only one thread
-    // calls init()!
-    Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
-  }
-
-  abstract void allocateDataBuffer();
+  public abstract void init();
 
   /**
    * Reset the offset to UNINITIALIZED before before reusing an old chunk
@@ -121,8 +74,7 @@ public abstract class Chunk {
 
   /**
    * Try to allocate <code>size</code> bytes from the chunk.
-   * If a chunk is tried to get allocated before init() call, the thread doing the allocation
-   * will be in busy-wait state as it will keep looping till the nextFreeOffset is set.
+   *
    * @return the offset of the successful allocation, or -1 to indicate not-enough-space
    */
   public int alloc(int size) {
@@ -144,7 +96,7 @@ public abstract class Chunk {
       if (oldOffset + size > data.capacity()) {
         return -1; // alloc doesn't fit
       }
-      // TODO : If seqID is to be written add 8 bytes here for nextFreeOFfset
+
       // Try to atomically claim this chunk
       if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
         // we got the alloc

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
deleted file mode 100644
index 073fb25..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
+++ /dev/null
@@ -1,404 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.lang.ref.SoftReference;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
-import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated
- * with every chunk
- */
-@InterfaceAudience.Private
-public class ChunkCreator {
-  private final Log LOG = LogFactory.getLog(ChunkCreator.class);
-  // monotonically increasing chunkid
-  private AtomicInteger chunkID = new AtomicInteger(1);
-  // maps the chunk against the monotonically increasing chunk id. We need to preserve the
-  // natural ordering of the key
-  // CellChunkMap creation should convert the soft ref to hard reference
-  private Map<Integer, SoftReference<Chunk>> chunkIdMap =
-      new ConcurrentHashMap<Integer, SoftReference<Chunk>>();
-  private final int chunkSize;
-  private final boolean offheap;
-  @VisibleForTesting
-  static ChunkCreator INSTANCE;
-  @VisibleForTesting
-  static boolean chunkPoolDisabled = false;
-  private MemStoreChunkPool pool;
-
-  @VisibleForTesting
-  ChunkCreator(int chunkSize, boolean offheap, long globalMemStoreSize, float poolSizePercentage,
-      float initialCountPercentage, HeapMemoryManager heapMemoryManager) {
-    this.chunkSize = chunkSize;
-    this.offheap = offheap;
-    this.pool = initializePool(globalMemStoreSize, poolSizePercentage, initialCountPercentage);
-    if (heapMemoryManager != null && this.pool != null) {
-      // Register with Heap Memory manager
-      heapMemoryManager.registerTuneObserver(this.pool);
-    }
-  }
-
-  /**
-   * Initializes the instance of MSLABChunkCreator
-   * @param chunkSize the chunkSize
-   * @param offheap indicates if the chunk is to be created offheap or not
-   * @param globalMemStoreSize  the global memstore size
-   * @param poolSizePercentage pool size percentage
-   * @param initialCountPercentage the initial count of the chunk pool if any
-   * @param heapMemoryManager the heapmemory manager
-   * @return singleton MSLABChunkCreator
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC",
-      justification = "Method is called by single thread at the starting of RS")
-  @VisibleForTesting
-  public static ChunkCreator initialize(int chunkSize, boolean offheap, long globalMemStoreSize,
-      float poolSizePercentage, float initialCountPercentage, HeapMemoryManager heapMemoryManager) {
-    if (INSTANCE != null) return INSTANCE;
-    INSTANCE = new ChunkCreator(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
-        initialCountPercentage, heapMemoryManager);
-    return INSTANCE;
-  }
-
-  static ChunkCreator getInstance() {
-    return INSTANCE;
-  }
-
-  /**
-   * Creates and inits a chunk.
-   * @return the chunk that was initialized
-   */
-  Chunk getChunk() {
-    Chunk chunk = null;
-    if (pool != null) {
-      //  the pool creates the chunk internally. The chunk#init() call happens here
-      chunk = this.pool.getChunk();
-      // the pool has run out of maxCount
-      if (chunk == null) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("The chunk pool is full. Reached maxCount= " + this.pool.getMaxCount()
-              + ". Creating chunk onheap.");
-        }
-      }
-    }
-    if (chunk == null) {
-      chunk = createChunk();
-    }
-    // put this chunk into the chunkIdMap
-    this.chunkIdMap.put(chunk.getId(), new SoftReference<>(chunk));
-    // now we need to actually do the expensive memory allocation step in case of a new chunk,
-    // else only the offset is set to the beginning of the chunk to accept allocations
-    chunk.init();
-    return chunk;
-  }
-
-  private Chunk createChunk() {
-    return createChunk(false);
-  }
-
-  /**
-   * Creates the chunk either onheap or offheap
-   * @param pool indicates if the chunks have to be created which will be used by the Pool
-   * @return the chunk
-   */
-  private Chunk createChunk(boolean pool) {
-    int id = chunkID.getAndIncrement();
-    assert id > 0;
-    // do not create offheap chunk on demand
-    if (pool && this.offheap) {
-      return new OffheapChunk(chunkSize, id, pool);
-    } else {
-      return new OnheapChunk(chunkSize, id, pool);
-    }
-  }
-
-  @VisibleForTesting
-  // TODO : To be used by CellChunkMap
-  Chunk getChunk(int id) {
-    SoftReference<Chunk> ref = chunkIdMap.get(id);
-    if (ref != null) {
-      return ref.get();
-    }
-    return null;
-  }
-
-  int getChunkSize() {
-    return this.chunkSize;
-  }
-
-  boolean isOffheap() {
-    return this.offheap;
-  }
-
-  private void removeChunks(Set<Integer> chunkIDs) {
-    this.chunkIdMap.keySet().removeAll(chunkIDs);
-  }
-
-  Chunk removeChunk(int chunkId) {
-    SoftReference<Chunk> ref = this.chunkIdMap.remove(chunkId);
-    if (ref != null) {
-      return ref.get();
-    }
-    return null;
-  }
-
-  @VisibleForTesting
-  int size() {
-    return this.chunkIdMap.size();
-  }
-
-  @VisibleForTesting
-  void clearChunkIds() {
-    this.chunkIdMap.clear();
-  }
-
-  /**
-   * A pool of {@link Chunk} instances.
-   *
-   * MemStoreChunkPool caches a number of retired chunks for reusing, it could
-   * decrease allocating bytes when writing, thereby optimizing the garbage
-   * collection on JVM.
-   */
-  private  class MemStoreChunkPool implements HeapMemoryTuneObserver {
-    private int maxCount;
-
-    // A queue of reclaimed chunks
-    private final BlockingQueue<Chunk> reclaimedChunks;
-    private final float poolSizePercentage;
-
-    /** Statistics thread schedule pool */
-    private final ScheduledExecutorService scheduleThreadPool;
-    /** Statistics thread */
-    private static final int statThreadPeriod = 60 * 5;
-    private final AtomicLong chunkCount = new AtomicLong();
-    private final AtomicLong reusedChunkCount = new AtomicLong();
-
-    MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) {
-      this.maxCount = maxCount;
-      this.poolSizePercentage = poolSizePercentage;
-      this.reclaimedChunks = new LinkedBlockingQueue<>();
-      for (int i = 0; i < initialCount; i++) {
-        Chunk chunk = createChunk(true);
-        chunk.init();
-        reclaimedChunks.add(chunk);
-      }
-      chunkCount.set(initialCount);
-      final String n = Thread.currentThread().getName();
-      scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
-          .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build());
-      this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod,
-          statThreadPeriod, TimeUnit.SECONDS);
-    }
-
-    /**
-     * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have
-     * not yet created max allowed chunks count. When we have already created max allowed chunks and
-     * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk
-     * then.
-     * Note: Chunks returned by this pool must be put back to the pool after its use.
-     * @return a chunk
-     * @see #putbackChunks(Set)
-     */
-    Chunk getChunk() {
-      Chunk chunk = reclaimedChunks.poll();
-      if (chunk != null) {
-        chunk.reset();
-        reusedChunkCount.incrementAndGet();
-      } else {
-        // Make a chunk iff we have not yet created the maxCount chunks
-        while (true) {
-          long created = this.chunkCount.get();
-          if (created < this.maxCount) {
-            if (this.chunkCount.compareAndSet(created, created + 1)) {
-              chunk = createChunk(true);
-              break;
-            }
-          } else {
-            break;
-          }
-        }
-      }
-      return chunk;
-    }
-
-    /**
-     * Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining
-     * chunks
-     * @param chunks
-     */
-    private void putbackChunks(Set<Integer> chunks) {
-      int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
-      Iterator<Integer> iterator = chunks.iterator();
-      while (iterator.hasNext()) {
-        Integer chunkId = iterator.next();
-        // remove the chunks every time though they are from the pool or not
-        Chunk chunk = ChunkCreator.this.removeChunk(chunkId);
-        if (chunk != null) {
-          if (chunk.isFromPool() && toAdd > 0) {
-            reclaimedChunks.add(chunk);
-          }
-          toAdd--;
-        }
-      }
-    }
-
-    private class StatisticsThread extends Thread {
-      StatisticsThread() {
-        super("MemStoreChunkPool.StatisticsThread");
-        setDaemon(true);
-      }
-
-      @Override
-      public void run() {
-        logStats();
-      }
-
-      private void logStats() {
-        if (!LOG.isDebugEnabled()) return;
-        long created = chunkCount.get();
-        long reused = reusedChunkCount.get();
-        long total = created + reused;
-        LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
-            + ",created chunk count=" + created
-            + ",reused chunk count=" + reused
-            + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
-                (float) reused / (float) total, 2)));
-      }
-    }
-
-    private int getMaxCount() {
-      return this.maxCount;
-    }
-
-    @Override
-    public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
-      // don't do any tuning in case of offheap memstore
-      if (isOffheap()) {
-        LOG.warn("Not tuning the chunk pool as it is offheap");
-        return;
-      }
-      int newMaxCount =
-          (int) (newMemstoreSize * poolSizePercentage / getChunkSize());
-      if (newMaxCount != this.maxCount) {
-        // We need an adjustment in the chunks numbers
-        if (newMaxCount > this.maxCount) {
-          // Max chunks getting increased. Just change the variable. Later calls to getChunk() would
-          // create and add them to Q
-          LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount);
-          this.maxCount = newMaxCount;
-        } else {
-          // Max chunks getting decreased. We may need to clear off some of the pooled chunks now
-          // itself. If the extra chunks are serving already, do not pool those when we get them back
-          LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount);
-          this.maxCount = newMaxCount;
-          if (this.reclaimedChunks.size() > newMaxCount) {
-            synchronized (this) {
-              while (this.reclaimedChunks.size() > newMaxCount) {
-                this.reclaimedChunks.poll();
-              }
-            }
-          }
-        }
-      }
-    }
-  }
-
-  @VisibleForTesting
-  static void clearDisableFlag() {
-    chunkPoolDisabled = false;
-  }
-
-  private MemStoreChunkPool initializePool(long globalMemStoreSize, float poolSizePercentage,
-      float initialCountPercentage) {
-    if (poolSizePercentage <= 0) {
-      LOG.info("PoolSizePercentage is less than 0. So not using pool");
-      return null;
-    }
-    if (chunkPoolDisabled) {
-      return null;
-    }
-    if (poolSizePercentage > 1.0) {
-      throw new IllegalArgumentException(
-          MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
-    }
-    int maxCount = (int) (globalMemStoreSize * poolSizePercentage / getChunkSize());
-    if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
-      throw new IllegalArgumentException(
-          MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0");
-    }
-    int initialCount = (int) (initialCountPercentage * maxCount);
-    LOG.info("Allocating MemStoreChunkPool with chunk size "
-        + StringUtils.byteDesc(getChunkSize()) + ", max count " + maxCount
-        + ", initial count " + initialCount);
-    return new MemStoreChunkPool(maxCount, initialCount, poolSizePercentage);
-  }
-
-  @VisibleForTesting
-  int getMaxCount() {
-    if (pool != null) {
-      return pool.getMaxCount();
-    }
-    return 0;
-  }
-
-  @VisibleForTesting
-  int getPoolSize() {
-    if (pool != null) {
-      return pool.reclaimedChunks.size();
-    }
-    return 0;
-  }
-
-  /*
-   * Only used in testing
-   */
-  @VisibleForTesting
-  void clearChunksInPool() {
-    if (pool != null) {
-      pool.reclaimedChunks.clear();
-    }
-  }
-
-  synchronized void putbackChunks(Set<Integer> chunks) {
-    if (pool != null) {
-      pool.putbackChunks(chunks);
-    } else {
-      this.removeChunks(chunks);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 41eb0a3..b3b5113 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1472,7 +1472,7 @@ public class HRegionServer extends HasThread implements
       startServiceThreads();
       startHeapMemoryManager();
       // Call it after starting HeapMemoryManager.
-      initializeMemStoreChunkCreator();
+      initializeMemStoreChunkPool();
       LOG.info("Serving as " + this.serverName +
         ", RpcServer on " + rpcServices.isa +
         ", sessionid=0x" +
@@ -1492,7 +1492,7 @@ public class HRegionServer extends HasThread implements
     }
   }
 
-  protected void initializeMemStoreChunkCreator() {
+  private void initializeMemStoreChunkPool() {
     if (MemStoreLAB.isEnabled(conf)) {
       // MSLAB is enabled. So initialize MemStoreChunkPool
       // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
@@ -1506,10 +1506,12 @@ public class HRegionServer extends HasThread implements
       float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
           MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
       int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
-      // init the chunkCreator
-      ChunkCreator chunkCreator =
-          ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
-            initialCountPercentage, this.hMemManager);
+      MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage,
+          initialCountPercentage, chunkSize, offheap);
+      if (pool != null && this.hMemManager != null) {
+        // Register with Heap Memory manager
+        this.hMemManager.registerTuneObserver(pool);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
new file mode 100644
index 0000000..b7ac212
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
@@ -0,0 +1,265 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A pool of {@link Chunk} instances.
+ * 
+ * MemStoreChunkPool caches a number of retired chunks for reusing, it could
+ * decrease allocating bytes when writing, thereby optimizing the garbage
+ * collection on JVM.
+ * 
+ * The pool instance is globally unique and could be obtained through
+ * {@link MemStoreChunkPool#initialize(long, float, float, int, boolean)}
+ * 
+ * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating
+ * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called
+ * when MemStore clearing snapshot for flush
+ */
+@SuppressWarnings("javadoc")
+@InterfaceAudience.Private
+public class MemStoreChunkPool implements HeapMemoryTuneObserver {
+  private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class);
+
+  // Static reference to the MemStoreChunkPool
+  static MemStoreChunkPool GLOBAL_INSTANCE;
+  /** Boolean whether we have disabled the memstore chunk pool entirely. */
+  static boolean chunkPoolDisabled = false;
+
+  private int maxCount;
+
+  // A queue of reclaimed chunks
+  private final BlockingQueue<Chunk> reclaimedChunks;
+  private final int chunkSize;
+  private final float poolSizePercentage;
+
+  /** Statistics thread schedule pool */
+  private final ScheduledExecutorService scheduleThreadPool;
+  /** Statistics thread */
+  private static final int statThreadPeriod = 60 * 5;
+  private final AtomicLong chunkCount = new AtomicLong();
+  private final AtomicLong reusedChunkCount = new AtomicLong();
+  private final boolean offheap;
+
+  MemStoreChunkPool(int chunkSize, int maxCount, int initialCount, float poolSizePercentage,
+      boolean offheap) {
+    this.maxCount = maxCount;
+    this.chunkSize = chunkSize;
+    this.poolSizePercentage = poolSizePercentage;
+    this.offheap = offheap;
+    this.reclaimedChunks = new LinkedBlockingQueue<>();
+    for (int i = 0; i < initialCount; i++) {
+      Chunk chunk = this.offheap ? new OffheapChunk(chunkSize) : new OnheapChunk(chunkSize);
+      chunk.init();
+      reclaimedChunks.add(chunk);
+    }
+    chunkCount.set(initialCount);
+    final String n = Thread.currentThread().getName();
+    scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
+        .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build());
+    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod,
+        statThreadPeriod, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have
+   * not yet created max allowed chunks count. When we have already created max allowed chunks and
+   * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk
+   * then.
+   * Note: Chunks returned by this pool must be put back to the pool after its use.
+   * @return a chunk
+   * @see #putbackChunk(Chunk)
+   * @see #putbackChunks(BlockingQueue)
+   */
+  Chunk getChunk() {
+    Chunk chunk = reclaimedChunks.poll();
+    if (chunk != null) {
+      chunk.reset();
+      reusedChunkCount.incrementAndGet();
+    } else {
+      // Make a chunk iff we have not yet created the maxCount chunks
+      while (true) {
+        long created = this.chunkCount.get();
+        if (created < this.maxCount) {
+          chunk = this.offheap ? new OffheapChunk(this.chunkSize) : new OnheapChunk(this.chunkSize);
+          if (this.chunkCount.compareAndSet(created, created + 1)) {
+            break;
+          }
+        } else {
+          break;
+        }
+      }
+    }
+    return chunk;
+  }
+
+  /**
+   * Add the chunks to the pool, when the pool achieves the max size, it will
+   * skip the remaining chunks
+   * @param chunks
+   */
+  synchronized void putbackChunks(BlockingQueue<Chunk> chunks) {
+    int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
+    Chunk chunk = null;
+    while ((chunk = chunks.poll()) != null && toAdd > 0) {
+      reclaimedChunks.add(chunk);
+      toAdd--;
+    }
+  }
+
+  /**
+   * Add the chunk to the pool, if the pool has achieved the max size, it will
+   * skip it
+   * @param chunk
+   */
+  synchronized void putbackChunk(Chunk chunk) {
+    if (reclaimedChunks.size() < this.maxCount) {
+      reclaimedChunks.add(chunk);
+    }
+  }
+
+  int getPoolSize() {
+    return this.reclaimedChunks.size();
+  }
+
+  /*
+   * Only used in testing
+   */
+  void clearChunks() {
+    this.reclaimedChunks.clear();
+  }
+
+  private class StatisticsThread extends Thread {
+    StatisticsThread() {
+      super("MemStoreChunkPool.StatisticsThread");
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      logStats();
+    }
+
+    private void logStats() {
+      if (!LOG.isDebugEnabled()) return;
+      long created = chunkCount.get();
+      long reused = reusedChunkCount.get();
+      long total = created + reused;
+      LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
+          + ",created chunk count=" + created
+          + ",reused chunk count=" + reused
+          + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
+              (float) reused / (float) total, 2)));
+    }
+  }
+
+  /**
+   * @return the global MemStoreChunkPool instance
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC",
+      justification = "Method is called by single thread at the starting of RS")
+  static MemStoreChunkPool initialize(long globalMemStoreSize, float poolSizePercentage,
+      float initialCountPercentage, int chunkSize, boolean offheap) {
+    if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE;
+    if (chunkPoolDisabled) return null;
+
+    if (poolSizePercentage <= 0) {
+      chunkPoolDisabled = true;
+      return null;
+    }
+    if (poolSizePercentage > 1.0) {
+      throw new IllegalArgumentException(
+          MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
+    }
+    int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize);
+    if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
+      throw new IllegalArgumentException(
+          MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0");
+    }
+    int initialCount = (int) (initialCountPercentage * maxCount);
+    LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
+        + ", max count " + maxCount + ", initial count " + initialCount);
+    GLOBAL_INSTANCE = new MemStoreChunkPool(chunkSize, maxCount, initialCount, poolSizePercentage,
+        offheap);
+    return GLOBAL_INSTANCE;
+  }
+
+  /**
+   * @return The singleton instance of this pool.
+   */
+  static MemStoreChunkPool getPool() {
+    return GLOBAL_INSTANCE;
+  }
+
+  int getMaxCount() {
+    return this.maxCount;
+  }
+
+  @VisibleForTesting
+  static void clearDisableFlag() {
+    chunkPoolDisabled = false;
+  }
+
+  @Override
+  public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
+    // don't do any tuning in case of offheap memstore
+    if (this.offheap) {
+      LOG.warn("Not tuning the chunk pool as it is offheap");
+      return;
+    }
+    int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize);
+    if (newMaxCount != this.maxCount) {
+      // We need an adjustment in the chunks numbers
+      if (newMaxCount > this.maxCount) {
+        // Max chunks getting increased. Just change the variable. Later calls to getChunk() would
+        // create and add them to Q
+        LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount);
+        this.maxCount = newMaxCount;
+      } else {
+        // Max chunks getting decreased. We may need to clear off some of the pooled chunks now
+        // itself. If the extra chunks are serving already, do not pool those when we get them back
+        LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount);
+        this.maxCount = newMaxCount;
+        if (this.reclaimedChunks.size() > newMaxCount) {
+          synchronized (this) {
+            while (this.reclaimedChunks.size() > newMaxCount) {
+              this.reclaimedChunks.poll();
+            }
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
index 72e937c..f6d1607 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
  * <p>
  * The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) chunks from
  * and then doles it out to threads that request slices into the array. These chunks can get pooled
- * as well. See {@link ChunkCreator}.
+ * as well. See {@link MemStoreChunkPool}.
  * <p>
  * The purpose of this is to combat heap fragmentation in the regionserver. By ensuring that all
  * Cells in a given memstore refer only to large chunks of contiguous memory, we ensure that
@@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
  * {@link #copyCellInto(Cell)} gets called. This allocates enough size in the chunk to hold this
  * cell's data and copies into this area and then recreate a Cell over this copied data.
  * <p>
- * @see ChunkCreator
+ * @see MemStoreChunkPool
  */
 @InterfaceAudience.Private
 public interface MemStoreLAB {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index 4fba82d..4e87135 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -18,26 +18,23 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.nio.ByteBuffer;
-import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+
 /**
  * A memstore-local allocation buffer.
  * <p>
@@ -58,8 +55,8 @@ import com.google.common.base.Preconditions;
  * would provide a performance improvement - probably would speed up the
  * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
  * anyway.
- * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}.
- * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks,
+ * The chunks created by this MemStoreLAB can get pooled at {@link MemStoreChunkPool}.
+ * When the Chunk comes pool, it can be either an on heap or an off heap backed chunk. The chunks,
  * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be
  * always on heap backed.
  */
@@ -69,15 +66,14 @@ public class MemStoreLABImpl implements MemStoreLAB {
   static final Log LOG = LogFactory.getLog(MemStoreLABImpl.class);
 
   private AtomicReference<Chunk> curChunk = new AtomicReference<>();
-  // Lock to manage multiple handlers requesting for a chunk
-  private ReentrantLock lock = new ReentrantLock();
-
-  // A set of chunks contained by this memstore LAB
+  // A queue of chunks from pool contained by this memstore LAB
+  // TODO: in the future, it would be better to have List implementation instead of Queue,
+  // as FIFO order is not so important here
   @VisibleForTesting
-  Set<Integer> chunks = new ConcurrentSkipListSet<Integer>();
+  BlockingQueue<Chunk> pooledChunkQueue = null;
   private final int chunkSize;
   private final int maxAlloc;
-  private final ChunkCreator chunkCreator;
+  private final MemStoreChunkPool chunkPool;
 
   // This flag is for closing this instance, its set when clearing snapshot of
   // memstore
@@ -96,12 +92,20 @@ public class MemStoreLABImpl implements MemStoreLAB {
   public MemStoreLABImpl(Configuration conf) {
     chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
     maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
-    this.chunkCreator = ChunkCreator.getInstance();
+    this.chunkPool = MemStoreChunkPool.getPool();
+    // currently chunkQueue is only used for chunkPool
+    if (this.chunkPool != null) {
+      // set queue length to chunk pool max count to avoid keeping reference of
+      // too many non-reclaimable chunks
+      pooledChunkQueue = new LinkedBlockingQueue<>(chunkPool.getMaxCount());
+    }
+
     // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
     Preconditions.checkArgument(maxAlloc <= chunkSize,
         MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
   }
 
+
   @Override
   public Cell copyCellInto(Cell cell) {
     int size = KeyValueUtil.length(cell);
@@ -114,52 +118,19 @@ public class MemStoreLABImpl implements MemStoreLAB {
     Chunk c = null;
     int allocOffset = 0;
     while (true) {
-      // Try to get the chunk
       c = getOrMakeChunk();
-      // we may get null because the some other thread succeeded in getting the lock
-      // and so the current thread has to try again to make its chunk or grab the chunk
-      // that the other thread created
       // Try to allocate from this chunk
-      if (c != null) {
-        allocOffset = c.alloc(size);
-        if (allocOffset != -1) {
-          // We succeeded - this is the common case - small alloc
-          // from a big buffer
-          break;
-        }
-        // not enough space!
-        // try to retire this chunk
-        tryRetireChunk(c);
+      allocOffset = c.alloc(size);
+      if (allocOffset != -1) {
+        // We succeeded - this is the common case - small alloc
+        // from a big buffer
+        break;
       }
+      // not enough space!
+      // try to retire this chunk
+      tryRetireChunk(c);
     }
-    return copyToChunkCell(cell, c.getData(), allocOffset, size);
-  }
-
-  /**
-   * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
-   * out of it
-   */
-  private Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) {
-    int tagsLen = cell.getTagsLength();
-    if (cell instanceof ExtendedCell) {
-      ((ExtendedCell) cell).write(buf, offset);
-    } else {
-      // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
-      // other case also. The data fragments within Cell is copied into buf as in KeyValue
-      // serialization format only.
-      KeyValueUtil.appendTo(cell, buf, offset, true);
-    }
-    // TODO : write the seqid here. For writing seqId we should create a new cell type so
-    // that seqId is not used as the state
-    if (tagsLen == 0) {
-      // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
-      // which directly return tagsLen as 0. So we avoid parsing many length components in
-      // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
-      // call getTagsLength().
-      return new NoTagByteBufferChunkCell(buf, offset, len, cell.getSequenceId());
-    } else {
-      return new ByteBufferChunkCell(buf, offset, len, cell.getSequenceId());
-    }
+    return CellUtil.copyCellTo(cell, c.getData(), allocOffset, size);
   }
 
   /**
@@ -171,9 +142,9 @@ public class MemStoreLABImpl implements MemStoreLAB {
     this.closed = true;
     // We could put back the chunks to pool for reusing only when there is no
     // opening scanner which will read their data
-    int count  = openScannerCount.get();
-    if(count == 0) {
-      recycleChunks();
+    if (chunkPool != null && openScannerCount.get() == 0
+        && reclaimed.compareAndSet(false, true)) {
+      chunkPool.putbackChunks(this.pooledChunkQueue);
     }
   }
 
@@ -191,14 +162,9 @@ public class MemStoreLABImpl implements MemStoreLAB {
   @Override
   public void decScannerCount() {
     int count = this.openScannerCount.decrementAndGet();
-    if (this.closed && count == 0) {
-      recycleChunks();
-    }
-  }
-
-  private void recycleChunks() {
-    if (reclaimed.compareAndSet(false, true)) {
-      chunkCreator.putbackChunks(chunks);
+    if (this.closed && chunkPool != null && count == 0
+        && reclaimed.compareAndSet(false, true)) {
+      chunkPool.putbackChunks(this.pooledChunkQueue);
     }
   }
 
@@ -224,33 +190,45 @@ public class MemStoreLABImpl implements MemStoreLAB {
    * allocate a new one from the JVM.
    */
   private Chunk getOrMakeChunk() {
-    // Try to get the chunk
-    Chunk c = curChunk.get();
-    if (c != null) {
-      return c;
-    }
-    // No current chunk, so we want to allocate one. We race
-    // against other allocators to CAS in an uninitialized chunk
-    // (which is cheap to allocate)
-    if (lock.tryLock()) {
-      try {
-        // once again check inside the lock
-        c = curChunk.get();
-        if (c != null) {
-          return c;
-        }
-        c = this.chunkCreator.getChunk();
-        if (c != null) {
-          // set the curChunk. No need of CAS as only one thread will be here
-          curChunk.set(c);
-          chunks.add(c.getId());
-          return c;
+    while (true) {
+      // Try to get the chunk
+      Chunk c = curChunk.get();
+      if (c != null) {
+        return c;
+      }
+
+      // No current chunk, so we want to allocate one. We race
+      // against other allocators to CAS in an uninitialized chunk
+      // (which is cheap to allocate)
+      if (chunkPool != null) {
+        c = chunkPool.getChunk();
+      }
+      boolean pooledChunk = false;
+      if (c != null) {
+        // This is chunk from pool
+        pooledChunk = true;
+      } else {
+        c = new OnheapChunk(chunkSize);// When chunk is not from pool, always make it as on heap.
+      }
+      if (curChunk.compareAndSet(null, c)) {
+        // we won race - now we need to actually do the expensive
+        // allocation step
+        c.init();
+        if (pooledChunk) {
+          if (!this.closed && !this.pooledChunkQueue.offer(c)) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
+                  + pooledChunkQueue.size());
+            }
+          }
         }
-      } finally {
-        lock.unlock();
+        return c;
+      } else if (pooledChunk) {
+        chunkPool.putbackChunk(c);
       }
+      // someone else won race - that's fine, we'll try to grab theirs
+      // in the next iteration of the loop.
     }
-    return null;
   }
 
   @VisibleForTesting
@@ -258,15 +236,8 @@ public class MemStoreLABImpl implements MemStoreLAB {
     return this.curChunk.get();
   }
 
-  @VisibleForTesting
+
   BlockingQueue<Chunk> getPooledChunks() {
-    BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>();
-    for (Integer id : this.chunks) {
-      Chunk chunk = chunkCreator.getChunk(id);
-      if (chunk != null && chunk.isFromPool()) {
-        pooledChunks.add(chunk);
-      }
-    }
-    return pooledChunks;
+    return this.pooledChunkQueue;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java
deleted file mode 100644
index a8ba50c..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.NoTagsByteBufferKeyValue;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
-
-
-/**
- * ByteBuffer based cell which has the chunkid at the 0th offset and with no tags
- * @see MemStoreLAB
- */
-@InterfaceAudience.Private
-public class NoTagByteBufferChunkCell extends NoTagsByteBufferKeyValue {
-
-  public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length) {
-    super(buf, offset, length);
-  }
-
-  public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) {
-    super(buf, offset, length, seqId);
-  }
-
-  @Override
-  public int getChunkId() {
-    // The chunkId is embedded at the 0th offset of the bytebuffer
-    return ByteBufferUtils.toInt(buf, 0);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
index e244a33..ed98cfa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
@@ -21,27 +21,34 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
+import com.google.common.base.Preconditions;
+
 /**
  * An off heap chunk implementation.
  */
 @InterfaceAudience.Private
 public class OffheapChunk extends Chunk {
 
-  OffheapChunk(int size, int id) {
-    // better if this is always created fromPool. This should not be called
-    super(size, id);
-  }
-
-  OffheapChunk(int size, int id, boolean fromPool) {
-    super(size, id, fromPool);
-    assert fromPool == true;
+  OffheapChunk(int size) {
+    super(size);
   }
 
   @Override
-  void allocateDataBuffer() {
-    if (data == null) {
-      data = ByteBuffer.allocateDirect(this.size);
-      data.putLong(0, this.getId());
+  public void init() {
+    assert nextFreeOffset.get() == UNINITIALIZED;
+    try {
+      if (data == null) {
+        data = ByteBuffer.allocateDirect(this.size);
+      }
+    } catch (OutOfMemoryError e) {
+      boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
+      assert failInit; // should be true.
+      throw e;
     }
+    // Mark that it's ready for use
+    boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
+    // We should always succeed the above CAS since only one thread
+    // calls init()!
+    Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
index da34e24..bd33cb5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
@@ -21,25 +21,33 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
+import com.google.common.base.Preconditions;
+
 /**
  * An on heap chunk implementation.
  */
 @InterfaceAudience.Private
 public class OnheapChunk extends Chunk {
 
-  OnheapChunk(int size, int id) {
-    super(size, id);
-  }
-
-  OnheapChunk(int size, int id, boolean fromPool) {
-    super(size, id, fromPool);
+  OnheapChunk(int size) {
+    super(size);
   }
 
-  @Override
-  void allocateDataBuffer() {
-    if (data == null) {
-      data = ByteBuffer.allocate(this.size);
-      data.putLong(0, this.getId());
+  public void init() {
+    assert nextFreeOffset.get() == UNINITIALIZED;
+    try {
+      if (data == null) {
+        data = ByteBuffer.allocate(this.size);
+      }
+    } catch (OutOfMemoryError e) {
+      boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
+      assert failInit; // should be true.
+      throw e;
     }
+    // Mark that it's ready for use
+    boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
+    // We should always succeed the above CAS since only one thread
+    // calls init()!
+    Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 6563122..82c2eab 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -96,8 +96,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.ChunkCreator;
-import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -2428,7 +2426,6 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
   public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
       final Configuration conf, final HTableDescriptor htd, boolean initialize)
       throws IOException {
-    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     WAL wal = createWal(conf, rootDir, info);
     return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
index 8d8b6df..422c54b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -50,10 +49,8 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.ChunkCreator;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -400,7 +397,6 @@ public class TestCoprocessorInterface {
     for(byte [] family : families) {
       htd.addFamily(new HColumnDescriptor(family));
     }
-    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     HRegionInfo info = new HRegionInfo(tableName, null, null, false);
     Path path = new Path(DIR + callingMethod);
     Region r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
index b99087d..80d0e3a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
@@ -47,12 +47,10 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.regionserver.ChunkCreator;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -154,7 +152,6 @@ public class TestRegionObserverScannerOpenHook {
     for (byte[] family : families) {
       htd.addFamily(new HColumnDescriptor(family));
     }
-    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
     Path path = new Path(DIR + callingMethod);
     WAL wal = HBaseTestingUtility.createWal(conf, path, info);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
index 15d449d..2e44dee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
@@ -34,9 +34,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.regionserver.ChunkCreator;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
@@ -102,7 +100,6 @@ public class TestRegionObserverStacking extends TestCase {
     for(byte [] family : families) {
       htd.addFamily(new HColumnDescriptor(family));
     }
-    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
     Path path = new Path(DIR + callingMethod);
     HRegion r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
index fae7247..f1775d0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
@@ -40,10 +40,8 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.ChunkCreator;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -344,7 +342,6 @@ public class TestScannerFromBucketCache {
   private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
       String callingMethod, Configuration conf, HBaseTestingUtility test_util, boolean isReadOnly,
       byte[]... families) throws IOException {
-    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     Path logDir = test_util.getDataTestDirOnTestFS(callingMethod + ".log");
     HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
     final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
index 32bce26..cc73d9d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
@@ -65,8 +65,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActi
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
 import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.ChunkCreator;
-import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -75,7 +73,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.hadoop.hbase.util.Triple;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -91,10 +88,6 @@ public class TestCatalogJanitor {
   @Rule
   public TestName name = new TestName();
 
-  @BeforeClass
-  public static void setup() throws Exception {
-    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
-  }
   /**
    * Mock MasterServices for tests below.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
index 096c5ef..418aadf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
@@ -241,7 +241,7 @@ public class TestBulkLoad {
     for (byte[] family : families) {
       hTableDescriptor.addFamily(new HColumnDescriptor(family));
     }
-    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
+
     // TODO We need a way to do this without creating files
     return HRegion.createHRegion(hRegionInfo,
         new Path(testFolder.newFolder().toURI()),

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
index 09877b0..3b4d068 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
@@ -73,7 +73,7 @@ public class TestCellFlatSet extends TestCase {
     descCbOnHeap = new CellArrayMap(CellComparator.COMPARATOR,descCells,0,NUM_OF_CELLS,true);
     CONF.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
     CONF.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
-    ChunkCreator.chunkPoolDisabled = false;
+    MemStoreChunkPool.chunkPoolDisabled = false;
   }
 
   /* Create and test CellSet based on CellArrayMap */

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 9e90f3e..a888c45 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -51,7 +50,7 @@ import static org.junit.Assert.assertTrue;
 public class TestCompactingMemStore extends TestDefaultMemStore {
 
   private static final Log LOG = LogFactory.getLog(TestCompactingMemStore.class);
-  protected static ChunkCreator chunkCreator;
+  protected static MemStoreChunkPool chunkPool;
   protected HRegion region;
   protected RegionServicesForStores regionServicesForStores;
   protected HStore store;
@@ -66,7 +65,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
   @After
   public void tearDown() throws Exception {
-    chunkCreator.clearChunksInPool();
+    chunkPool.clearChunks();
   }
 
   @Override
@@ -85,21 +84,15 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
     HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf);
     HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
-    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar"));
-    htd.addFamily(hcd);
-    HRegionInfo info =
-        new HRegionInfo(TableName.valueOf("foobar"), null, null, false);
-    WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info);
-    this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true);
-    //this.region = hbaseUtility.createTestRegion("foobar", hcd);
+    this.region = hbaseUtility.createTestRegion("foobar", hcd);
     this.regionServicesForStores = region.getRegionServicesForStores();
     this.store = new HStore(region, hcd, conf);
 
     long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
         .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
-    chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
-      globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
-    assertTrue(chunkCreator != null);
+    chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f,
+        MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false);
+    assertTrue(chunkPool != null);
   }
 
   /**
@@ -397,7 +390,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     }
     memstore.clearSnapshot(snapshot.getId());
 
-    int chunkCount = chunkCreator.getPoolSize();
+    int chunkCount = chunkPool.getPoolSize();
     assertTrue(chunkCount > 0);
 
   }
@@ -441,16 +434,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     }
     memstore.clearSnapshot(snapshot.getId());
 
-    assertTrue(chunkCreator.getPoolSize() == 0);
+    assertTrue(chunkPool.getPoolSize() == 0);
 
     // Chunks will be put back to pool after close scanners;
     for (KeyValueScanner scanner : scanners) {
       scanner.close();
     }
-    assertTrue(chunkCreator.getPoolSize() > 0);
+    assertTrue(chunkPool.getPoolSize() > 0);
 
     // clear chunks
-    chunkCreator.clearChunksInPool();
+    chunkPool.clearChunks();
 
     // Creating another snapshot
 
@@ -471,7 +464,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       scanner.close();
     }
     memstore.clearSnapshot(snapshot.getId());
-    assertTrue(chunkCreator.getPoolSize() > 0);
+    assertTrue(chunkPool.getPoolSize() > 0);
   }
 
   @Test
@@ -523,16 +516,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     memstore.add(new KeyValue(row, fam, qf1, 3, val), null);
     assertEquals(3, memstore.getActive().getCellsCount());
 
-    assertTrue(chunkCreator.getPoolSize() == 0);
+    assertTrue(chunkPool.getPoolSize() == 0);
 
     // Chunks will be put back to pool after close scanners;
     for (KeyValueScanner scanner : scanners) {
       scanner.close();
     }
-    assertTrue(chunkCreator.getPoolSize() > 0);
+    assertTrue(chunkPool.getPoolSize() > 0);
 
     // clear chunks
-    chunkCreator.clearChunksInPool();
+    chunkPool.clearChunks();
 
     // Creating another snapshot
 
@@ -560,7 +553,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       scanner.close();
     }
     memstore.clearSnapshot(snapshot.getId());
-    assertTrue(chunkCreator.getPoolSize() > 0);
+    assertTrue(chunkPool.getPoolSize() > 0);
   }
 
   //////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
index 66e107a..5a48455 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
@@ -44,13 +44,17 @@ import java.util.List;
 public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore {
 
   private static final Log LOG = LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class);
+  //private static MemStoreChunkPool chunkPool;
+  //private HRegion region;
+  //private RegionServicesForStores regionServicesForStores;
+  //private HStore store;
 
   //////////////////////////////////////////////////////////////////////////////
   // Helpers
   //////////////////////////////////////////////////////////////////////////////
 
   @Override public void tearDown() throws Exception {
-    chunkCreator.clearChunksInPool();
+    chunkPool.clearChunks();
   }
 
   @Override public void setUp() throws Exception {
@@ -404,16 +408,16 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
     }
     memstore.clearSnapshot(snapshot.getId());
 
-    assertTrue(chunkCreator.getPoolSize() == 0);
+    assertTrue(chunkPool.getPoolSize() == 0);
 
     // Chunks will be put back to pool after close scanners;
     for (KeyValueScanner scanner : scanners) {
       scanner.close();
     }
-    assertTrue(chunkCreator.getPoolSize() > 0);
+    assertTrue(chunkPool.getPoolSize() > 0);
 
     // clear chunks
-    chunkCreator.clearChunksInPool();
+    chunkPool.clearChunks();
 
     // Creating another snapshot
 
@@ -434,7 +438,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
       scanner.close();
     }
     memstore.clearSnapshot(snapshot.getId());
-    assertTrue(chunkCreator.getPoolSize() > 0);
+    assertTrue(chunkPool.getPoolSize() > 0);
   }
 
   @Test
@@ -468,7 +472,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
     }
     memstore.clearSnapshot(snapshot.getId());
 
-    int chunkCount = chunkCreator.getPoolSize();
+    int chunkCount = chunkPool.getPoolSize();
     assertTrue(chunkCount > 0);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java
index e320368..8e85730 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java
@@ -164,7 +164,6 @@ public class TestCompactionArchiveConcurrentClose {
 
     HRegionFileSystem fs = new WaitingHRegionFileSystem(conf, tableDir.getFileSystem(conf),
         tableDir, info);
-    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     final Configuration walConf = new Configuration(conf);
     FSUtils.setRootDir(walConf, tableDir);
     final WALFactory wals = new WALFactory(walConf, null, "log_" + info.getEncodedName());

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java
index e7fcf18..89b2368 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java
@@ -174,7 +174,6 @@ public class TestCompactionArchiveIOException {
   private HRegion initHRegion(HTableDescriptor htd, HRegionInfo info)
       throws IOException {
     Configuration conf = testUtil.getConfiguration();
-    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName());
     Path regionDir = new Path(tableDir, info.getEncodedName());
     Path storeDir = new Path(regionDir, htd.getColumnFamilies()[0].getNameAsString());

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
index bff5bec..7154511 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
@@ -104,7 +104,6 @@ public class TestCompactionPolicy {
     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
 
     hlog = new FSHLog(fs, basedir, logName, conf);
-    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     region = HRegion.createHRegion(info, basedir, conf, htd, hlog);
     region.close();
     Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 41b304b..7434eb1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.wal.WALFactory;
-import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -85,7 +84,6 @@ public class TestDefaultMemStore {
   protected static final byte[] FAMILY = Bytes.toBytes("column");
   protected MultiVersionConcurrencyControl mvcc;
   protected AtomicLong startSeqNum = new AtomicLong(0);
-  protected ChunkCreator chunkCreator;
 
   private String getName() {
     return this.name.getMethodName();
@@ -94,17 +92,9 @@ public class TestDefaultMemStore {
   @Before
   public void setUp() throws Exception {
     internalSetUp();
-    // no pool
-    this.chunkCreator =
-        ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     this.memstore = new DefaultMemStore();
   }
 
-  @AfterClass
-  public static void tearDownClass() throws Exception {
-    ChunkCreator.getInstance().clearChunkIds();
-  }
-
   protected void internalSetUp() throws Exception {
     this.mvcc = new MultiVersionConcurrencyControl();
   }
@@ -139,9 +129,7 @@ public class TestDefaultMemStore {
       assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize());
       // make sure chunk size increased even when writing the same cell, if using MSLAB
       if (msLab instanceof MemStoreLABImpl) {
-        // since we add the chunkID at the 0th offset of the chunk and the
-        // chunkid is a long we need to account for those 8 bytes
-        assertEquals(2 * Segment.getCellLength(kv) + Bytes.SIZEOF_LONG,
+        assertEquals(2 * Segment.getCellLength(kv),
           ((MemStoreLABImpl) msLab).getCurrentChunk().getNextFreeOffset());
       }
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index 24e850d..73fb9cf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -266,7 +266,6 @@ public class TestFailedAppendAndSync {
    */
   public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
   throws IOException {
-    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL,
       wal, COLUMN_FAMILY_BYTES);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecdfb823/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
index 0f24a24..b416c7d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@ -153,7 +153,7 @@ public class TestHMobStore {
 
     htd.addFamily(hcd);
     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
-    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
+
     final Configuration walConf = new Configuration(conf);
     FSUtils.setRootDir(walConf, basedir);
     final WALFactory wals = new WALFactory(walConf, null, methodName);