You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2017/07/05 09:56:59 UTC

hbase git commit: HBASE-18010: CellChunkMap integration into CompactingMemStore. Continuation of the previous commit

Repository: hbase
Updated Branches:
  refs/heads/master 8ac430841 -> 284321485


HBASE-18010: CellChunkMap integration into CompactingMemStore. Continuation of the previous commit


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/28432148
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/28432148
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/28432148

Branch: refs/heads/master
Commit: 2843214857f69698c0fc95ae48461ed1ec8f94de
Parents: 8ac4308
Author: anastas <an...@yahoo-inc.com>
Authored: Wed Jul 5 12:56:45 2017 +0300
Committer: anastas <an...@yahoo-inc.com>
Committed: Wed Jul 5 12:56:45 2017 +0300

----------------------------------------------------------------------
 .../regionserver/CSLMImmutableSegment.java      |  53 ++
 .../regionserver/CellArrayImmutableSegment.java | 132 +++++
 .../regionserver/CellChunkImmutableSegment.java | 198 +++++++
 .../TestCompactingToCellFlatMapMemStore.java    | 566 +++++++++++++++++++
 4 files changed, 949 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/28432148/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java
new file mode 100644
index 0000000..2f1724a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ClassSize;
+
+/**
+ * CSLMImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
+ * and {@link ImmutableSegment}. This immutable segment is working with CellSet with
+ * ConcurrentSkipListMap (CSLM) delegatee.
+ */
+@InterfaceAudience.Private
+public class CSLMImmutableSegment extends ImmutableSegment {
+  public static final long DEEP_OVERHEAD_CSLM =
+      ImmutableSegment.DEEP_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP;
+
+  /**------------------------------------------------------------------------
+   * Copy C-tor to be used when new CSLMImmutableSegment is being built from a Mutable one.
+   * This C-tor should be used when active MutableSegment is pushed into the compaction
+   * pipeline and becomes an ImmutableSegment.
+   */
+  protected CSLMImmutableSegment(Segment segment) {
+    super(segment);
+    // update the segment metadata heap size
+    incSize(0, -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM);
+  }
+
+  @Override
+  protected long indexEntrySize() {
+    return ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY;
+  }
+
+  @Override protected boolean canBeFlattened() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/28432148/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
new file mode 100644
index 0000000..fb5f780
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
@@ -0,0 +1,132 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ClassSize;
+
+import java.io.IOException;
+
+/**
+ * CellArrayImmutableSegment extends the API supported by a {@link Segment},
+ * and {@link ImmutableSegment}. This immutable segment is working with CellSet with
+ * CellArrayMap delegatee.
+ */
+@InterfaceAudience.Private
+public class CellArrayImmutableSegment extends ImmutableSegment {
+
+  public static final long DEEP_OVERHEAD_CAM = DEEP_OVERHEAD + ClassSize.CELL_ARRAY_MAP;
+
+  /////////////////////  CONSTRUCTORS  /////////////////////
+  /**------------------------------------------------------------------------
+   * C-tor to be used when new CellArrayImmutableSegment is a result of compaction of a
+   * list of older ImmutableSegments.
+   * The given iterator returns the Cells that "survived" the compaction.
+   */
+  protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
+      MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) {
+    super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL
+    incSize(0, DEEP_OVERHEAD_CAM);
+    // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
+    initializeCellSet(numOfCells, iterator, action);
+  }
+
+  /**------------------------------------------------------------------------
+   * C-tor to be used when new CellChunkImmutableSegment is built as a result of flattening
+   * of CSLMImmutableSegment
+   * The given iterator returns the Cells that "survived" the compaction.
+   */
+  protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemstoreSize memstoreSize) {
+    super(segment); // initiailize the upper class
+    incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM);
+    int numOfCells = segment.getCellsCount();
+    // build the new CellSet based on CellChunkMap and update the CellSet of this Segment
+    reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet());
+    // arrange the meta-data size, decrease all meta-data sizes related to SkipList;
+    // add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes)
+    long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    incSize(0, newSegmentSizeDelta);
+    memstoreSize.incMemstoreSize(0, newSegmentSizeDelta);
+  }
+
+  @Override
+  protected long indexEntrySize() {
+    return ClassSize.CELL_ARRAY_MAP_ENTRY;
+  }
+
+  @Override
+  protected boolean canBeFlattened() {
+    return false;
+  }
+
+  /////////////////////  PRIVATE METHODS  /////////////////////
+  /*------------------------------------------------------------------------*/
+  // Create CellSet based on CellArrayMap from compacting iterator
+  private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator,
+      MemStoreCompactor.Action action) {
+
+    Cell[] cells = new Cell[numOfCells];   // build the Cell Array
+    int i = 0;
+    while (iterator.hasNext()) {
+      Cell c = iterator.next();
+      // The scanner behind the iterator is doing all the elimination logic
+      if (action == MemStoreCompactor.Action.MERGE) {
+        // if this is merge we just move the Cell object without copying MSLAB
+        // the sizes still need to be updated in the new segment
+        cells[i] = c;
+      } else {
+        // now we just copy it to the new segment (also MSLAB copy)
+        cells[i] = maybeCloneWithAllocator(c);
+      }
+      // second parameter true, because in compaction/merge the addition of the cell to new segment
+      // is always successful
+      updateMetaInfo(c, true, null); // updates the size per cell
+      i++;
+    }
+    // build the immutable CellSet
+    CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, i, false);
+    this.setCellSet(null, new CellSet(cam));   // update the CellSet of this Segment
+  }
+
+  /*------------------------------------------------------------------------*/
+  // Create CellSet based on CellChunkMap from current ConcurrentSkipListMap based CellSet
+  // (without compacting iterator)
+  // We do not consider cells bigger than chunks!
+  private void reinitializeCellSet(
+      int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet) {
+    Cell[] cells = new Cell[numOfCells];   // build the Cell Array
+    Cell curCell;
+    int idx = 0;
+    try {
+      while ((curCell = segmentScanner.next()) != null) {
+        cells[idx++] = curCell;
+      }
+    } catch (IOException ie) {
+      throw new IllegalStateException(ie);
+    } finally {
+      segmentScanner.close();
+    }
+    // build the immutable CellSet
+    CellArrayMap cam = new CellArrayMap(CellComparator.COMPARATOR, cells, 0, idx, false);
+    this.setCellSet(oldCellSet, new CellSet(cam));   // update the CellSet of this Segment
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/28432148/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
new file mode 100644
index 0000000..cdda279
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
@@ -0,0 +1,198 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.ByteBufferKeyValue;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.ClassSize;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * CellChunkImmutableSegment extends the API supported by a {@link Segment},
+ * and {@link ImmutableSegment}. This immutable segment is working with CellSet with
+ * CellChunkMap delegatee.
+ */
+@InterfaceAudience.Private
+public class CellChunkImmutableSegment extends ImmutableSegment {
+
+  public static final long DEEP_OVERHEAD_CCM =
+      ImmutableSegment.DEEP_OVERHEAD + ClassSize.CELL_CHUNK_MAP;
+
+  /////////////////////  CONSTRUCTORS  /////////////////////
+  /**------------------------------------------------------------------------
+   * C-tor to be used when new CellChunkImmutableSegment is built as a result of compaction/merge
+   * of a list of older ImmutableSegments.
+   * The given iterator returns the Cells that "survived" the compaction.
+   */
+  protected CellChunkImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
+      MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) {
+    super(null, comparator, memStoreLAB); // initialize the CellSet with NULL
+    incSize(0, DEEP_OVERHEAD_CCM); // initiate the heapSize with the size of the segment metadata
+    // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
+    initializeCellSet(numOfCells, iterator, action);
+  }
+
+  /**------------------------------------------------------------------------
+   * C-tor to be used when new CellChunkImmutableSegment is built as a result of flattening
+   * of CSLMImmutableSegment
+   * The given iterator returns the Cells that "survived" the compaction.
+   */
+  protected CellChunkImmutableSegment(CSLMImmutableSegment segment, MemstoreSize memstoreSize) {
+    super(segment); // initiailize the upper class
+    incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM+ CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
+    int numOfCells = segment.getCellsCount();
+    // build the new CellSet based on CellChunkMap
+    reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet());
+    // arrange the meta-data size, decrease all meta-data sizes related to SkipList;
+    // add sizes of CellChunkMap entry, decrease also Cell object sizes
+    // (reinitializeCellSet doesn't take the care for the sizes)
+    long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+
+    incSize(0, newSegmentSizeDelta);
+    memstoreSize.incMemstoreSize(0, newSegmentSizeDelta);
+  }
+
+  @Override
+  protected long indexEntrySize() {
+    return (ClassSize.CELL_CHUNK_MAP_ENTRY - KeyValue.FIXED_OVERHEAD);
+  }
+
+  @Override
+  protected boolean canBeFlattened() {
+    return false;
+  }
+
+  /////////////////////  PRIVATE METHODS  /////////////////////
+  /*------------------------------------------------------------------------*/
+  // Create CellSet based on CellChunkMap from compacting iterator
+  private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator,
+      MemStoreCompactor.Action action) {
+
+    // calculate how many chunks we will need for index
+    int chunkSize = ChunkCreator.getInstance().getChunkSize();
+    int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK;
+    int numberOfChunks = calculateNumberOfChunks(numOfCells,numOfCellsInChunk);
+    int numOfCellsAfterCompaction = 0;
+    int currentChunkIdx = 0;
+    int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
+    // all index Chunks are allocated from ChunkCreator
+    Chunk[] chunks = new Chunk[numberOfChunks];
+    for (int i=0; i < numberOfChunks; i++) {
+      chunks[i] = this.getMemStoreLAB().getNewExternalChunk();
+    }
+    while (iterator.hasNext()) {        // the iterator hides the elimination logic for compaction
+      Cell c = iterator.next();
+      numOfCellsAfterCompaction++;
+      assert (c instanceof ByteBufferKeyValue); // shouldn't get here anything but ByteBufferKeyValue
+      if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkSize) {
+        currentChunkIdx++;              // continue to the next index chunk
+        offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
+      }
+      if (action == MemStoreCompactor.Action.COMPACT) {
+        c = maybeCloneWithAllocator(c); // for compaction copy cell to the new segment (MSLAB copy)
+      }
+      offsetInCurentChunk = // add the Cell reference to the index chunk
+          createCellReference((ByteBufferKeyValue)c, chunks[currentChunkIdx].getData(),
+              offsetInCurentChunk);
+      // the sizes still need to be updated in the new segment
+      // second parameter true, because in compaction/merge the addition of the cell to new segment
+      // is always successful
+      updateMetaInfo(c, true, null); // updates the size per cell
+    }
+    // build the immutable CellSet
+    CellChunkMap ccm =
+        new CellChunkMap(CellComparator.COMPARATOR,chunks,0,numOfCellsAfterCompaction,false);
+    this.setCellSet(null, new CellSet(ccm));  // update the CellSet of this Segment
+  }
+
+  /*------------------------------------------------------------------------*/
+  // Create CellSet based on CellChunkMap from current ConcurrentSkipListMap based CellSet
+  // (without compacting iterator)
+  // This is a service for not-flat immutable segments
+  // Assumption: cells do not exceed chunk size!
+  private void reinitializeCellSet(
+      int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet) {
+    Cell curCell;
+    // calculate how many chunks we will need for metadata
+    int chunkSize = ChunkCreator.getInstance().getChunkSize();
+    int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK;
+    int numberOfChunks = calculateNumberOfChunks(numOfCells,numOfCellsInChunk);
+    // all index Chunks are allocated from ChunkCreator
+    Chunk[] chunks = new Chunk[numberOfChunks];
+    for (int i=0; i < numberOfChunks; i++) {
+      chunks[i] = this.getMemStoreLAB().getNewExternalChunk();
+    }
+
+    int currentChunkIdx = 0;
+    int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
+
+    try {
+      while ((curCell = segmentScanner.next()) != null) {
+        assert (curCell instanceof ByteBufferKeyValue); // shouldn't get here anything but ByteBufferKeyValue
+        if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkSize) {
+          // continue to the next metadata chunk
+          currentChunkIdx++;
+          offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
+        }
+        offsetInCurentChunk =
+            createCellReference((ByteBufferKeyValue) curCell, chunks[currentChunkIdx].getData(),
+                offsetInCurentChunk);
+      }
+    } catch (IOException ie) {
+      throw new IllegalStateException(ie);
+    } finally {
+      segmentScanner.close();
+    }
+
+    CellChunkMap ccm = new CellChunkMap(CellComparator.COMPARATOR,chunks,0,numOfCells,false);
+    this.setCellSet(oldCellSet, new CellSet(ccm)); // update the CellSet of this Segment
+  }
+
+  /*------------------------------------------------------------------------*/
+  // for a given cell, write the cell representation on the index chunk
+  private int createCellReference(ByteBufferKeyValue cell, ByteBuffer idxBuffer, int idxOffset) {
+    int offset = idxOffset;
+    int dataChunkID = cell.getChunkId();
+    // ensure strong pointer to data chunk, as index is no longer directly points to it
+    Chunk c = ChunkCreator.getInstance().saveChunkFromGC(dataChunkID);
+    // if c is null, it means that this cell chunks was already released shouldn't happen
+    assert (c!=null);
+    offset = ByteBufferUtils.putInt(idxBuffer, offset, dataChunkID);    // write data chunk id
+    offset = ByteBufferUtils.putInt(idxBuffer, offset, cell.getOffset());          // offset
+    offset = ByteBufferUtils.putInt(idxBuffer, offset, KeyValueUtil.length(cell)); // length
+    offset = ByteBufferUtils.putLong(idxBuffer, offset, cell.getSequenceId());     // seqId
+
+    return offset;
+  }
+
+  private int calculateNumberOfChunks(int numOfCells, int numOfCellsInChunk) {
+    int numberOfChunks = numOfCells/numOfCellsInChunk;
+    if(numOfCells%numOfCellsInChunk!=0) { // if cells cannot be divided evenly between chunks
+      numberOfChunks++;                   // add one additional chunk
+    }
+    return numberOfChunks;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/28432148/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
new file mode 100644
index 0000000..6011af7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
@@ -0,0 +1,566 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.Threads;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * compacted memstore test case
+ */
+@Category({RegionServerTests.class, MediumTests.class})
+@RunWith(Parameterized.class)
+public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore {
+  @Parameterized.Parameters
+  public static Object[] data() {
+    return new Object[] { "CHUNK_MAP", "ARRAY_MAP" }; // test different immutable indexes
+  }
+  private static final Log LOG = LogFactory.getLog(TestCompactingToCellFlatMapMemStore.class);
+  public final boolean toCellChunkMap;
+  Configuration conf;
+  //////////////////////////////////////////////////////////////////////////////
+  // Helpers
+  //////////////////////////////////////////////////////////////////////////////
+  public TestCompactingToCellFlatMapMemStore(String type){
+    if (type == "CHUNK_MAP") {
+      toCellChunkMap = true;
+    } else {
+      toCellChunkMap = false;
+    }
+  }
+
+  @Override public void tearDown() throws Exception {
+    chunkCreator.clearChunksInPool();
+  }
+
+  @Override public void setUp() throws Exception {
+
+    compactingSetUp();
+    this.conf = HBaseConfiguration.create();
+
+    // set memstore to do data compaction
+    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
+        String.valueOf(MemoryCompactionPolicy.EAGER));
+
+    this.memstore =
+        new CompactingMemStore(conf, CellComparator.COMPARATOR, store,
+            regionServicesForStores, MemoryCompactionPolicy.EAGER);
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Compaction tests
+  //////////////////////////////////////////////////////////////////////////////
+  public void testCompaction1Bucket() throws IOException {
+    int counter = 0;
+    String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
+    if (toCellChunkMap) {
+      // set memstore to flat into CellChunkMap
+      conf.set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
+          String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
+      ((CompactingMemStore)memstore).setIndexType();
+    }
+
+    // test 1 bucket
+    long totalCellsLen = addRowsByKeys(memstore, keys1);
+    long cellBeforeFlushSize = cellBeforeFlushSize();
+    long cellAfterFlushSize  = cellAfterFlushSize();
+    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
+
+    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
+
+    assertEquals(4, memstore.getActive().getCellsCount());
+    ((CompactingMemStore) memstore).flushInMemory();    // push keys to pipeline and compact
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
+    // totalCellsLen
+    totalCellsLen = (totalCellsLen * 3) / 4;
+    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
+
+    totalHeapSize =
+        3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
+            + (toCellChunkMap ?
+            CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
+            CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
+    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
+    for ( Segment s : memstore.getSegments()) {
+      counter += s.getCellsCount();
+    }
+    assertEquals(3, counter);
+    MemstoreSize size = memstore.getFlushableSize();
+    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
+    region.decrMemstoreSize(size);  // simulate flusher
+    ImmutableSegment s = memstore.getSnapshot();
+    assertEquals(3, s.getCellsCount());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
+
+    memstore.clearSnapshot(snapshot.getId());
+  }
+
+  public void testCompaction2Buckets() throws IOException {
+    if (toCellChunkMap) {
+      // set memstore to flat into CellChunkMap
+      conf.set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
+          String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
+      ((CompactingMemStore)memstore).setIndexType();
+    }
+    String[] keys1 = { "A", "A", "B", "C" };
+    String[] keys2 = { "A", "B", "D" };
+
+    long totalCellsLen1 = addRowsByKeys(memstore, keys1);     // INSERT 4
+    long cellBeforeFlushSize = cellBeforeFlushSize();
+    long cellAfterFlushSize = cellAfterFlushSize();
+    long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
+    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
+
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
+    int counter = 0;                                          // COMPACT 4->3
+    for ( Segment s : memstore.getSegments()) {
+      counter += s.getCellsCount();
+    }
+    assertEquals(3,counter);
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
+    // totalCellsLen
+    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
+    totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
+        + (toCellChunkMap ?
+        CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
+        CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
+    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
+
+    long totalCellsLen2 = addRowsByKeys(memstore, keys2);   // INSERT 3 (3+3=6)
+    long totalHeapSize2 = 3 * cellBeforeFlushSize;
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
+
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
+    assertEquals(0, memstore.getSnapshot().getCellsCount());// COMPACT 6->4
+    counter = 0;
+    for ( Segment s : memstore.getSegments()) {
+      counter += s.getCellsCount();
+    }
+    assertEquals(4,counter);
+    totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    totalHeapSize2 = 1 * cellAfterFlushSize;
+    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
+
+    MemstoreSize size = memstore.getFlushableSize();
+    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
+    region.decrMemstoreSize(size);  // simulate flusher
+    ImmutableSegment s = memstore.getSnapshot();
+    assertEquals(4, s.getCellsCount());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
+
+    memstore.clearSnapshot(snapshot.getId());
+  }
+
+  public void testCompaction3Buckets() throws IOException {
+    if (toCellChunkMap) {
+      // set memstore to flat into CellChunkMap
+      conf.set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
+          String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
+      ((CompactingMemStore)memstore).setIndexType();
+    }
+    String[] keys1 = { "A", "A", "B", "C" };
+    String[] keys2 = { "A", "B", "D" };
+    String[] keys3 = { "D", "B", "B" };
+
+    long totalCellsLen1 = addRowsByKeys(memstore, keys1);
+    long cellBeforeFlushSize = cellBeforeFlushSize();
+    long cellAfterFlushSize = cellAfterFlushSize();
+    long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
+    assertEquals(totalCellsLen1, region.getMemstoreSize());
+    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
+
+    MemstoreSize size = memstore.getFlushableSize();
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
+
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
+    // totalCellsLen
+    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
+    totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
+        + (toCellChunkMap ?
+        CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
+        CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
+    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
+
+    long totalCellsLen2 = addRowsByKeys(memstore, keys2);
+    long totalHeapSize2 = 3 * cellBeforeFlushSize;
+
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
+
+    ((CompactingMemStore) memstore).disableCompaction();
+    size = memstore.getFlushableSize();
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
+    totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
+
+    long totalCellsLen3 = addRowsByKeys(memstore, keys3);
+    long totalHeapSize3 = 3 * cellBeforeFlushSize;
+    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
+        regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3,
+        ((CompactingMemStore) memstore).heapSize());
+
+    ((CompactingMemStore) memstore).enableCompaction();
+    size = memstore.getFlushableSize();
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
+    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
+    // Out of total 10, only 4 cells are unique
+    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
+    totalCellsLen3 = 0;// All duplicated cells.
+    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
+        regionServicesForStores.getMemstoreSize());
+    // Only 4 unique cells left
+    long totalHeapSize4 = 4 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
+        + (toCellChunkMap ?
+        CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
+        CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
+    assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize());
+
+    size = memstore.getFlushableSize();
+    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
+    region.decrMemstoreSize(size);  // simulate flusher
+    ImmutableSegment s = memstore.getSnapshot();
+    assertEquals(4, s.getCellsCount());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
+
+    memstore.clearSnapshot(snapshot.getId());
+
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Merging tests
+  //////////////////////////////////////////////////////////////////////////////
+  @Test
+  public void testMerging() throws IOException {
+    if (toCellChunkMap) {
+      // set memstore to flat into CellChunkMap
+      conf.set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
+          String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
+      ((CompactingMemStore)memstore).setIndexType();
+    }
+    String[] keys1 = { "A", "A", "B", "C", "F", "H"};
+    String[] keys2 = { "A", "B", "D", "G", "I", "J"};
+    String[] keys3 = { "D", "B", "B", "E" };
+
+    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
+    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
+        String.valueOf(compactionType));
+    ((CompactingMemStore)memstore).initiateType(compactionType);
+    addRowsByKeys(memstore, keys1);
+
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
+
+    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+
+    addRowsByKeys(memstore, keys2); // also should only flatten
+
+    int counter2 = 0;
+    for ( Segment s : memstore.getSegments()) {
+      counter2 += s.getCellsCount();
+    }
+    assertEquals(12, counter2);
+
+    ((CompactingMemStore) memstore).disableCompaction();
+
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+
+    int counter3 = 0;
+    for ( Segment s : memstore.getSegments()) {
+      counter3 += s.getCellsCount();
+    }
+    assertEquals(12, counter3);
+
+    addRowsByKeys(memstore, keys3);
+
+    int counter4 = 0;
+    for ( Segment s : memstore.getSegments()) {
+      counter4 += s.getCellsCount();
+    }
+    assertEquals(16, counter4);
+
+    ((CompactingMemStore) memstore).enableCompaction();
+
+
+    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
+    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+
+    int counter = 0;
+    for ( Segment s : memstore.getSegments()) {
+      counter += s.getCellsCount();
+    }
+    assertEquals(16,counter);
+
+    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
+    ImmutableSegment s = memstore.getSnapshot();
+    memstore.clearSnapshot(snapshot.getId());
+  }
+
+  @Test
+  public void testCountOfCellsAfterFlatteningByScan() throws IOException {
+    String[] keys1 = { "A", "B", "C" }; // A, B, C
+    addRowsByKeysWith50Cols(memstore, keys1);
+    // this should only flatten as there are no duplicates
+    ((CompactingMemStore) memstore).flushInMemory();
+    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
+    // seek
+    int count = 0;
+    for(int i = 0; i < scanners.size(); i++) {
+      scanners.get(i).seek(KeyValue.LOWESTKEY);
+      while (scanners.get(i).next() != null) {
+        count++;
+      }
+    }
+    assertEquals("the count should be ", count, 150);
+    for(int i = 0; i < scanners.size(); i++) {
+      scanners.get(i).close();
+    }
+  }
+
+  @Test
+  public void testCountOfCellsAfterFlatteningByIterator() throws IOException {
+    String[] keys1 = { "A", "B", "C" }; // A, B, C
+    addRowsByKeysWith50Cols(memstore, keys1);
+    // this should only flatten as there are no duplicates
+    ((CompactingMemStore) memstore).flushInMemory();
+    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    // Just doing the cnt operation here
+    MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator(
+        ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(),
+        CellComparator.COMPARATOR, 10);
+    int cnt = 0;
+    try {
+      while (itr.next() != null) {
+        cnt++;
+      }
+    } finally {
+      itr.close();
+    }
+    assertEquals("the count should be ", cnt, 150);
+  }
+
+  private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) {
+    byte[] fam = Bytes.toBytes("testfamily");
+    for (int i = 0; i < keys.length; i++) {
+      long timestamp = System.currentTimeMillis();
+      Threads.sleep(1); // to make sure each kv gets a different ts
+      byte[] row = Bytes.toBytes(keys[i]);
+      for(int  j =0 ;j < 50; j++) {
+        byte[] qf = Bytes.toBytes("testqualifier"+j);
+        byte[] val = Bytes.toBytes(keys[i] + j);
+        KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
+        hmc.add(kv, null);
+      }
+    }
+  }
+
+  @Override
+  @Test
+  public void testPuttingBackChunksWithOpeningScanner() throws IOException {
+    byte[] row = Bytes.toBytes("testrow");
+    byte[] fam = Bytes.toBytes("testfamily");
+    byte[] qf1 = Bytes.toBytes("testqualifier1");
+    byte[] qf2 = Bytes.toBytes("testqualifier2");
+    byte[] qf3 = Bytes.toBytes("testqualifier3");
+    byte[] qf4 = Bytes.toBytes("testqualifier4");
+    byte[] qf5 = Bytes.toBytes("testqualifier5");
+    byte[] qf6 = Bytes.toBytes("testqualifier6");
+    byte[] qf7 = Bytes.toBytes("testqualifier7");
+    byte[] val = Bytes.toBytes("testval");
+
+    // Setting up memstore
+    memstore.add(new KeyValue(row, fam, qf1, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, val), null);
+
+    // Creating a snapshot
+    MemStoreSnapshot snapshot = memstore.snapshot();
+    assertEquals(3, memstore.getSnapshot().getCellsCount());
+
+    // Adding value to "new" memstore
+    assertEquals(0, memstore.getActive().getCellsCount());
+    memstore.add(new KeyValue(row, fam, qf4, val), null);
+    memstore.add(new KeyValue(row, fam, qf5, val), null);
+    assertEquals(2, memstore.getActive().getCellsCount());
+
+    // opening scanner before clear the snapshot
+    List<KeyValueScanner> scanners = memstore.getScanners(0);
+    // Shouldn't putting back the chunks to pool,since some scanners are opening
+    // based on their data
+    // close the scanners
+    for(KeyValueScanner scanner : snapshot.getScanners()) {
+      scanner.close();
+    }
+    memstore.clearSnapshot(snapshot.getId());
+
+    assertTrue(chunkCreator.getPoolSize() == 0);
+
+    // Chunks will be put back to pool after close scanners;
+    for (KeyValueScanner scanner : scanners) {
+      scanner.close();
+    }
+    assertTrue(chunkCreator.getPoolSize() > 0);
+
+    // clear chunks
+    chunkCreator.clearChunksInPool();
+
+    // Creating another snapshot
+
+    snapshot = memstore.snapshot();
+    // Adding more value
+    memstore.add(new KeyValue(row, fam, qf6, val), null);
+    memstore.add(new KeyValue(row, fam, qf7, val), null);
+    // opening scanners
+    scanners = memstore.getScanners(0);
+    // close scanners before clear the snapshot
+    for (KeyValueScanner scanner : scanners) {
+      scanner.close();
+    }
+    // Since no opening scanner, the chunks of snapshot should be put back to
+    // pool
+    // close the scanners
+    for(KeyValueScanner scanner : snapshot.getScanners()) {
+      scanner.close();
+    }
+    memstore.clearSnapshot(snapshot.getId());
+    assertTrue(chunkCreator.getPoolSize() > 0);
+  }
+
+  @Test
+  public void testPuttingBackChunksAfterFlushing() throws IOException {
+    byte[] row = Bytes.toBytes("testrow");
+    byte[] fam = Bytes.toBytes("testfamily");
+    byte[] qf1 = Bytes.toBytes("testqualifier1");
+    byte[] qf2 = Bytes.toBytes("testqualifier2");
+    byte[] qf3 = Bytes.toBytes("testqualifier3");
+    byte[] qf4 = Bytes.toBytes("testqualifier4");
+    byte[] qf5 = Bytes.toBytes("testqualifier5");
+    byte[] val = Bytes.toBytes("testval");
+
+    // Setting up memstore
+    memstore.add(new KeyValue(row, fam, qf1, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, val), null);
+
+    // Creating a snapshot
+    MemStoreSnapshot snapshot = memstore.snapshot();
+    assertEquals(3, memstore.getSnapshot().getCellsCount());
+
+    // Adding value to "new" memstore
+    assertEquals(0, memstore.getActive().getCellsCount());
+    memstore.add(new KeyValue(row, fam, qf4, val), null);
+    memstore.add(new KeyValue(row, fam, qf5, val), null);
+    assertEquals(2, memstore.getActive().getCellsCount());
+    // close the scanners
+    for(KeyValueScanner scanner : snapshot.getScanners()) {
+      scanner.close();
+    }
+    memstore.clearSnapshot(snapshot.getId());
+
+    int chunkCount = chunkCreator.getPoolSize();
+    assertTrue(chunkCount > 0);
+  }
+
+
+  private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
+    byte[] fam = Bytes.toBytes("testfamily");
+    byte[] qf = Bytes.toBytes("testqualifier");
+    MemstoreSize memstoreSize = new MemstoreSize();
+    for (int i = 0; i < keys.length; i++) {
+      long timestamp = System.currentTimeMillis();
+      Threads.sleep(1); // to make sure each kv gets a different ts
+      byte[] row = Bytes.toBytes(keys[i]);
+      byte[] val = Bytes.toBytes(keys[i] + i);
+      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
+      hmc.add(kv, memstoreSize);
+      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
+    }
+    regionServicesForStores.addMemstoreSize(memstoreSize);
+    return memstoreSize.getDataSize();
+  }
+
+  private long cellBeforeFlushSize() {
+    // make one cell
+    byte[] row = Bytes.toBytes("A");
+    byte[] val = Bytes.toBytes("A" + 0);
+    KeyValue kv =
+        new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
+            System.currentTimeMillis(), val);
+    return ClassSize.align(
+        ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil.length(kv));
+  }
+
+  private long cellAfterFlushSize() {
+    // make one cell
+    byte[] row = Bytes.toBytes("A");
+    byte[] val = Bytes.toBytes("A" + 0);
+    KeyValue kv =
+        new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
+            System.currentTimeMillis(), val);
+
+    return toCellChunkMap ?
+        ClassSize.align(
+        ClassSize.CELL_CHUNK_MAP_ENTRY + KeyValueUtil.length(kv)) :
+        ClassSize.align(
+        ClassSize.CELL_ARRAY_MAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil.length(kv));
+  }
+}