You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2016/10/30 06:51:09 UTC

[1/4] hbase git commit: HBASE-16747 Track memstore data size and heap overhead separately.

Repository: hbase
Updated Branches:
  refs/heads/master 6127753b6 -> ba6d95232


http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
index 277eb48..35159b6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
@@ -127,10 +127,10 @@ public class TestWalAndCompactingMemStoreFlush {
 
     // Set up the configuration
     Configuration conf = HBaseConfiguration.create();
-    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
     conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
         FlushNonSloppyStoresFirstPolicy.class.getName());
-    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024);
+    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
     conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
     // set memstore to do data compaction
     conf.set("hbase.hregion.compacting.memstore.type", "data-compaction");
@@ -164,9 +164,9 @@ public class TestWalAndCompactingMemStoreFlush {
     long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
 
     // Find the sizes of the memstores of each CF.
-    long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
-    long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
-    long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
+    MemstoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getSizeOfMemStore();
+    MemstoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getSizeOfMemStore();
+    MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore();
 
     // Get the overall smallest LSN in the region's memstores.
     long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region)
@@ -188,22 +188,18 @@ public class TestWalAndCompactingMemStoreFlush {
     // Some other sanity checks.
     assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
     assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
-    assertTrue(cf1MemstoreSizePhaseI > 0);
-    assertTrue(cf2MemstoreSizePhaseI > 0);
-    assertTrue(cf3MemstoreSizePhaseI > 0);
+    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
+    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
+    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
 
     // The total memstore size should be the same as the sum of the sizes of
     // memstores of CF1, CF2 and CF3.
     String msg = "totalMemstoreSize="+totalMemstoreSize +
-        " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
-        " CompactingMemStore.DEEP_OVERHEAD="+CompactingMemStore.DEEP_OVERHEAD +
         " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
         " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
         " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
-    assertEquals(msg,
-        totalMemstoreSize + 2 * (CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)
-            + (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD),
-        cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
+    assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
+        + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
 
     // Flush!!!!!!!!!!!!!!!!!!!!!!
     // We have big compacting memstore CF1 and two small memstores:
@@ -219,9 +215,9 @@ public class TestWalAndCompactingMemStoreFlush {
     region.flush(false);
 
     // Recalculate everything
-    long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
-    long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
-    long cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
+    MemstoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getSizeOfMemStore();
+    MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore();
+    MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore();
 
     long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region)
         .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
@@ -230,29 +226,21 @@ public class TestWalAndCompactingMemStoreFlush {
     long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
     long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
 
-    s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD
-        + ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD
-        + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n"
+    s = s + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n"
         + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII
         + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n";
 
-    // CF1 was flushed to memory, but there is nothing to compact, and CF! was flattened
-    assertTrue(cf1MemstoreSizePhaseII < cf1MemstoreSizePhaseI);
+    // CF1 was flushed to memory, but there is nothing to compact, and CF1 was flattened
+    assertTrue(cf1MemstoreSizePhaseII.getDataSize() == cf1MemstoreSizePhaseI.getDataSize());
+    assertTrue(cf1MemstoreSizePhaseII.getHeapOverhead() < cf1MemstoreSizePhaseI.getHeapOverhead());
 
     // CF2 should become empty
-    assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
-        cf2MemstoreSizePhaseII);
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseII);
 
     // verify that CF3 was flushed to memory and was compacted (this is approximation check)
-    assertTrue(cf3MemstoreSizePhaseI / 2 + CompactingMemStore.DEEP_OVERHEAD
-        + ImmutableSegment.DEEP_OVERHEAD_CAM
-        + CompactionPipeline.ENTRY_OVERHEAD > cf3MemstoreSizePhaseII);
-
-    // CF3 was compacted and flattened!
-    assertTrue("\n<<< Size of CF3 in phase I - " + cf3MemstoreSizePhaseI
-            + ", size of CF3 in phase II - " + cf3MemstoreSizePhaseII + "\n",
-        cf3MemstoreSizePhaseI / 2 > cf3MemstoreSizePhaseII);
-
+    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > cf3MemstoreSizePhaseII.getDataSize());
+    assertTrue(
+        cf3MemstoreSizePhaseI.getHeapOverhead() / 2 > cf3MemstoreSizePhaseII.getHeapOverhead());
 
     // Now the smallest LSN in the region should be the same as the smallest
     // LSN in the memstore of CF1.
@@ -270,7 +258,7 @@ public class TestWalAndCompactingMemStoreFlush {
         + smallestSeqCF2PhaseII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseII + "\n";
 
     // How much does the CF1 memstore occupy? Will be used later.
-    long cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
+    MemstoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getSizeOfMemStore();
     long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
 
     s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII
@@ -284,9 +272,9 @@ public class TestWalAndCompactingMemStoreFlush {
     region.flush(false);
 
     // Recalculate everything
-    long cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
-    long cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
-    long cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
+    MemstoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getSizeOfMemStore();
+    MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore();
+    MemstoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getSizeOfMemStore();
 
     long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region)
         .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
@@ -306,9 +294,8 @@ public class TestWalAndCompactingMemStoreFlush {
 
     // CF1's pipeline component (inserted before first flush) should be flushed to disk
     // CF2 should be flushed to disk
-    assertTrue(cf1MemstoreSizePhaseIII > cf1MemstoreSizePhaseIV);
-    assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
-        cf2MemstoreSizePhaseIV);
+    assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseIV);
 
     // CF3 shouldn't have been touched.
     assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
@@ -322,34 +309,25 @@ public class TestWalAndCompactingMemStoreFlush {
     // Flush!!!!!!!!!!!!!!!!!!!!!!
     // Trying to clean the existing memstores, CF2 all flushed to disk. The single
     // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk.
-    // Note that active set of CF3 is empty
-    // But active set of CF1 is not yet empty
     region.flush(true);
 
     // Recalculate everything
-    long cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
-    long cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
-    long cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
+    MemstoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getSizeOfMemStore();
+    MemstoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getSizeOfMemStore();
+    MemstoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getSizeOfMemStore();
     long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
         .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
 
-    assertTrue(
-        CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD <= cf1MemstoreSizePhaseV);
-    assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
-        cf2MemstoreSizePhaseV);
-    assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
-        cf3MemstoreSizePhaseV);
-
-    region.flush(true); // flush once again in order to be sure that everything is empty
-    assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
-        region.getStore(FAMILY1).getMemStoreSize());
+    assertEquals(MemstoreSize.EMPTY_SIZE , cf1MemstoreSizePhaseV);
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseV);
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf3MemstoreSizePhaseV);
 
     // What happens when we hit the memstore limit, but we are not able to find
     // any Column Family above the threshold?
     // In that case, we should flush all the CFs.
 
-    // The memstore limit is 200*1024 and the column family flush threshold is
-    // around 50*1024. We try to just hit the memstore limit with each CF's
+    // The memstore limit is 100*1024 and the column family flush threshold is
+    // around 25*1024. We try to just hit the memstore limit with each CF's
     // memstore being below the CF flush threshold.
     for (int i = 1; i <= 300; i++) {
       region.put(createPut(1, i));
@@ -384,10 +362,10 @@ public class TestWalAndCompactingMemStoreFlush {
     /* SETUP */
     // Set up the configuration
     Configuration conf = HBaseConfiguration.create();
-    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
     conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
         FlushNonSloppyStoresFirstPolicy.class.getName());
-    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024);
+    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
     conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
     // set memstore to index-compaction
     conf.set("hbase.hregion.compacting.memstore.type", "index-compaction");
@@ -421,9 +399,9 @@ public class TestWalAndCompactingMemStoreFlush {
     long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
     long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
     // Find the sizes of the memstores of each CF.
-    long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
-    long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
-    long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
+    MemstoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getSizeOfMemStore();
+    MemstoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getSizeOfMemStore();
+    MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore();
     // Get the overall smallest LSN in the region's memstores.
     long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region)
         .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
@@ -436,18 +414,14 @@ public class TestWalAndCompactingMemStoreFlush {
     // Some other sanity checks.
     assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
     assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
-    assertTrue(cf1MemstoreSizePhaseI > 0);
-    assertTrue(cf2MemstoreSizePhaseI > 0);
-    assertTrue(cf3MemstoreSizePhaseI > 0);
+    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
+    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
+    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
 
     // The total memstore size should be the same as the sum of the sizes of
     // memstores of CF1, CF2 and CF3.
-    assertEquals(
-        totalMemstoreSizePhaseI
-            + 1 * DefaultMemStore.DEEP_OVERHEAD
-            + 2 * CompactingMemStore.DEEP_OVERHEAD
-            + 3 * MutableSegment.DEEP_OVERHEAD,
-        cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
+    assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize()
+        + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
 
     /*------------------------------------------------------------------------------*/
     /* PHASE I - Flush */
@@ -475,9 +449,9 @@ public class TestWalAndCompactingMemStoreFlush {
     /*------------------------------------------------------------------------------*/
     /* PHASE II - collect sizes */
     // Recalculate everything
-    long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
-    long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
-    long cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
+    MemstoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getSizeOfMemStore();
+    MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore();
+    MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore();
     long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region)
         .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
     // Find the smallest LSNs for edits wrt to each CF.
@@ -487,13 +461,15 @@ public class TestWalAndCompactingMemStoreFlush {
     /*------------------------------------------------------------------------------*/
     /* PHASE II - validation */
     // CF1 was flushed to memory, should be flattened and take less space
-    assertTrue(cf1MemstoreSizePhaseII < cf1MemstoreSizePhaseI);
+    assertEquals(cf1MemstoreSizePhaseII.getDataSize() , cf1MemstoreSizePhaseI.getDataSize());
+    assertTrue(cf1MemstoreSizePhaseII.getHeapOverhead() < cf1MemstoreSizePhaseI.getHeapOverhead());
     // CF2 should become empty
-    assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
-        cf2MemstoreSizePhaseII);
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseII);
     // verify that CF3 was flushed to memory and was not compacted (this is an approximation check)
     // if compacted CF# should be at least twice less because its every key was duplicated
-    assertTrue(cf3MemstoreSizePhaseI / 2 < cf3MemstoreSizePhaseII);
+    assertEquals(cf3MemstoreSizePhaseII.getDataSize() , cf3MemstoreSizePhaseI.getDataSize());
+    assertTrue(
+        cf3MemstoreSizePhaseI.getHeapOverhead() / 2 < cf3MemstoreSizePhaseII.getHeapOverhead());
 
     // Now the smallest LSN in the region should be the same as the smallest
     // LSN in the memstore of CF1.
@@ -501,14 +477,8 @@ public class TestWalAndCompactingMemStoreFlush {
     // The total memstore size should be the same as the sum of the sizes of
     // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
     // items in CF1/2
-    assertEquals(
-        totalMemstoreSizePhaseII
-            + 1 * DefaultMemStore.DEEP_OVERHEAD
-            + 2 * CompactingMemStore.DEEP_OVERHEAD
-            + 3 * MutableSegment.DEEP_OVERHEAD
-            + 2 * CompactionPipeline.ENTRY_OVERHEAD
-            + 2 * ImmutableSegment.DEEP_OVERHEAD_CAM,
-        cf1MemstoreSizePhaseII + cf2MemstoreSizePhaseII + cf3MemstoreSizePhaseII);
+    assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize()
+        + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
 
     /*------------------------------------------------------------------------------*/
     /*------------------------------------------------------------------------------*/
@@ -528,7 +498,7 @@ public class TestWalAndCompactingMemStoreFlush {
     /*------------------------------------------------------------------------------*/
     /* PHASE III - collect sizes */
     // How much does the CF1 memstore occupy now? Will be used later.
-    long cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
+    MemstoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getSizeOfMemStore();
     long totalMemstoreSizePhaseIII = region.getMemstoreSize();
 
     /*------------------------------------------------------------------------------*/
@@ -536,14 +506,8 @@ public class TestWalAndCompactingMemStoreFlush {
     // The total memstore size should be the same as the sum of the sizes of
     // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
     // items in CF1/2
-    assertEquals(
-        totalMemstoreSizePhaseIII
-            + 1 * DefaultMemStore.DEEP_OVERHEAD
-            + 2 * CompactingMemStore.DEEP_OVERHEAD
-            + 3 * MutableSegment.DEEP_OVERHEAD
-            + 2 * CompactionPipeline.ENTRY_OVERHEAD
-            + 2 * ImmutableSegment.DEEP_OVERHEAD_CAM,
-        cf1MemstoreSizePhaseIII + cf2MemstoreSizePhaseII + cf3MemstoreSizePhaseII);
+    assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize()
+        + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
 
     /*------------------------------------------------------------------------------*/
     /* PHASE III - Flush */
@@ -556,9 +520,9 @@ public class TestWalAndCompactingMemStoreFlush {
     /*------------------------------------------------------------------------------*/
     /* PHASE IV - collect sizes */
     // Recalculate everything
-    long cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
-    long cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
-    long cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
+    MemstoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getSizeOfMemStore();
+    MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore();
+    MemstoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getSizeOfMemStore();
     long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region)
         .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
     long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
@@ -567,9 +531,8 @@ public class TestWalAndCompactingMemStoreFlush {
     /* PHASE IV - validation */
     // CF1's biggest pipeline component (inserted before first flush) should be flushed to disk
     // CF2 should remain empty
-    assertTrue(cf1MemstoreSizePhaseIII > cf1MemstoreSizePhaseIV);
-    assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
-        cf2MemstoreSizePhaseIV);
+    assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseIV);
     // CF3 shouldn't have been touched.
     assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
     // the smallest LSN of CF3 shouldn't change
@@ -588,23 +551,20 @@ public class TestWalAndCompactingMemStoreFlush {
     /*------------------------------------------------------------------------------*/
     /* PHASE V - collect sizes */
     // Recalculate everything
-    long cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
-    long cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
-    long cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
+    MemstoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getSizeOfMemStore();
+    MemstoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getSizeOfMemStore();
+    MemstoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getSizeOfMemStore();
     long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
         .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
     long totalMemstoreSizePhaseV = region.getMemstoreSize();
 
     /*------------------------------------------------------------------------------*/
     /* PHASE V - validation */
-    assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
-        cf1MemstoreSizePhaseV);
-    assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
-        cf2MemstoreSizePhaseV);
-    assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
-        cf3MemstoreSizePhaseV);
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf1MemstoreSizePhaseV);
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseV);
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf3MemstoreSizePhaseV);
     // The total memstores size should be empty
-    assertEquals(totalMemstoreSizePhaseV, 0);
+    assertEquals(0, totalMemstoreSizePhaseV);
     // Because there is nothing in any memstore the WAL's LSN should be -1
     assertEquals(smallestSeqInRegionCurrentMemstorePhaseV, HConstants.NO_SEQNUM);
 
@@ -626,9 +586,9 @@ public class TestWalAndCompactingMemStoreFlush {
       region.put(createPut(5, i));
     }
 
-    long cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize();
-    long cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize();
-    long cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize();
+    MemstoreSize cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getSizeOfMemStore();
+    MemstoreSize cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getSizeOfMemStore();
+    MemstoreSize cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getSizeOfMemStore();
 
     /*------------------------------------------------------------------------------*/
     /* PHASE VI - Flush */
@@ -639,13 +599,13 @@ public class TestWalAndCompactingMemStoreFlush {
     // Since we won't find any CF above the threshold, and hence no specific
     // store to flush, we should flush all the memstores
     // Also compacted memstores are flushed to disk, but not entirely emptied
-    long cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize();
-    long cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize();
-    long cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getMemStoreSize();
+    MemstoreSize cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getSizeOfMemStore();
+    MemstoreSize cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getSizeOfMemStore();
+    MemstoreSize cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getSizeOfMemStore();
 
-    assertTrue(cf1ActiveSizePhaseVII < cf1ActiveSizePhaseVI);
-    assertTrue(cf3ActiveSizePhaseVII < cf3ActiveSizePhaseVI);
-    assertTrue(cf5ActiveSizePhaseVII < cf5ActiveSizePhaseVI);
+    assertTrue(cf1ActiveSizePhaseVII.getDataSize() < cf1ActiveSizePhaseVI.getDataSize());
+    assertTrue(cf3ActiveSizePhaseVII.getDataSize() < cf3ActiveSizePhaseVI.getDataSize());
+    assertTrue(cf5ActiveSizePhaseVII.getDataSize() < cf5ActiveSizePhaseVI.getDataSize());
 
     HBaseTestingUtility.closeRegionAndWAL(region);
   }
@@ -654,10 +614,10 @@ public class TestWalAndCompactingMemStoreFlush {
   public void testSelectiveFlushAndWALinDataCompaction() throws IOException {
     // Set up the configuration
     Configuration conf = HBaseConfiguration.create();
-    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
     conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushNonSloppyStoresFirstPolicy.class
         .getName());
-    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 *
+    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 *
         1024);
     conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
     // set memstore to do data compaction and not to use the speculative scan
@@ -683,14 +643,14 @@ public class TestWalAndCompactingMemStoreFlush {
     long totalMemstoreSize = region.getMemstoreSize();
 
     // Find the sizes of the memstores of each CF.
-    long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
-    long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
-    long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
+    MemstoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getSizeOfMemStore();
+    MemstoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getSizeOfMemStore();
+    MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore();
 
     // Some other sanity checks.
-    assertTrue(cf1MemstoreSizePhaseI > 0);
-    assertTrue(cf2MemstoreSizePhaseI > 0);
-    assertTrue(cf3MemstoreSizePhaseI > 0);
+    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
+    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
+    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
 
     // The total memstore size should be the same as the sum of the sizes of
     // memstores of CF1, CF2 and CF3.
@@ -699,10 +659,8 @@ public class TestWalAndCompactingMemStoreFlush {
         " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
         " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
         " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
-    assertEquals(msg,
-        totalMemstoreSize + 2 * (CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)
-            + (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD),
-        cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
+    assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
+        + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
 
     // Flush!
     CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
@@ -711,7 +669,7 @@ public class TestWalAndCompactingMemStoreFlush {
     cms3.flushInMemory();
     region.flush(false);
 
-    long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
+    MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore();
 
     long smallestSeqInRegionCurrentMemstorePhaseII =
         region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
@@ -720,8 +678,7 @@ public class TestWalAndCompactingMemStoreFlush {
     long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
 
     // CF2 should have been cleared
-    assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
-        cf2MemstoreSizePhaseII);
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseII);
 
     String s = "\n\n----------------------------------\n"
         + "Upon initial insert and flush, LSN of CF1 is:"
@@ -816,23 +773,19 @@ public class TestWalAndCompactingMemStoreFlush {
     long totalMemstoreSize = region.getMemstoreSize();
 
     // Find the sizes of the memstores of each CF.
-    long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
-    long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
-    long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
+    MemstoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getSizeOfMemStore();
+    MemstoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getSizeOfMemStore();
+    MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore();
 
     // Some other sanity checks.
-    assertTrue(cf1MemstoreSizePhaseI > 0);
-    assertTrue(cf2MemstoreSizePhaseI > 0);
-    assertTrue(cf3MemstoreSizePhaseI > 0);
+    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
+    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
+    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
 
     // The total memstore size should be the same as the sum of the sizes of
     // memstores of CF1, CF2 and CF3.
-    assertEquals(
-        totalMemstoreSize
-            + 1 * DefaultMemStore.DEEP_OVERHEAD
-            + 2 * CompactingMemStore.DEEP_OVERHEAD
-            + 3 * MutableSegment.DEEP_OVERHEAD,
-        cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
+    assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
+        + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
 
     // Flush!
     ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
@@ -848,7 +801,7 @@ public class TestWalAndCompactingMemStoreFlush {
     }
     region.flush(false);
 
-    long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
+    MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore();
 
     long smallestSeqInRegionCurrentMemstorePhaseII = region.getWAL()
         .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
@@ -857,8 +810,7 @@ public class TestWalAndCompactingMemStoreFlush {
     long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
 
     // CF2 should have been cleared
-    assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
-        cf2MemstoreSizePhaseII);
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseII);
 
     // Add same entries to compact them later
     for (int i = 1; i <= 1200; i++) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index d3bbaa5..f9d962c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
+import org.apache.hadoop.hbase.regionserver.MemstoreSize;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -550,7 +551,7 @@ public abstract class AbstractTestWALReplay {
     final Configuration newConf = HBaseConfiguration.create(this.conf);
     User user = HBaseTestingUtility.getDifferentUser(newConf,
       tableName.getNameAsString());
-    user.runAs(new PrivilegedExceptionAction() {
+    user.runAs(new PrivilegedExceptionAction<Object>() {
       @Override
       public Object run() throws Exception {
         runWALSplit(newConf);
@@ -560,10 +561,9 @@ public abstract class AbstractTestWALReplay {
         final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
         HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
           @Override
-          protected boolean restoreEdit(HStore s, Cell cell) {
-            boolean b = super.restoreEdit(s, cell);
+          protected void restoreEdit(HStore s, Cell cell, MemstoreSize memstoreSize) {
+            super.restoreEdit(s, cell, memstoreSize);
             countOfRestoredEdits.incrementAndGet();
-            return b;
           }
         };
         long seqid3 = region3.initialize();


[4/4] hbase git commit: HBASE-16747 Track memstore data size and heap overhead separately.

Posted by an...@apache.org.
HBASE-16747 Track memstore data size and heap overhead separately.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ba6d9523
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ba6d9523
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ba6d9523

Branch: refs/heads/master
Commit: ba6d95232401ce533fb8c121ade7f4a864d06f12
Parents: 6127753
Author: anoopsamjohn <an...@gmail.com>
Authored: Sun Oct 30 12:20:46 2016 +0530
Committer: anoopsamjohn <an...@gmail.com>
Committed: Sun Oct 30 12:20:46 2016 +0530

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/CellUtil.java  |   6 +
 .../org/apache/hadoop/hbase/ExtendedCell.java   |   5 +
 .../java/org/apache/hadoop/hbase/KeyValue.java  |  22 +-
 .../apache/hadoop/hbase/OffheapKeyValue.java    |   9 +-
 .../apache/hadoop/hbase/SizeCachedKeyValue.java |  11 +-
 .../io/encoding/BufferedDataBlockEncoder.java   |  10 +
 .../hbase/mob/DefaultMobStoreFlusher.java       |   2 +-
 .../hbase/regionserver/AbstractMemStore.java    | 209 ++-------------
 .../hbase/regionserver/CompactingMemStore.java  |  51 ++--
 .../hbase/regionserver/CompactionPipeline.java  |  43 +--
 .../hbase/regionserver/DefaultMemStore.java     |  39 ++-
 .../hbase/regionserver/DefaultStoreFlusher.java |   2 +-
 .../regionserver/FlushLargeStoresPolicy.java    |   5 +-
 .../hadoop/hbase/regionserver/HRegion.java      | 250 ++++++++++--------
 .../hadoop/hbase/regionserver/HStore.java       |  92 +++----
 .../hbase/regionserver/HeapMemoryManager.java   |   6 +-
 .../hbase/regionserver/ImmutableSegment.java    |  46 ++--
 .../hadoop/hbase/regionserver/MemStore.java     |  57 ++--
 .../hbase/regionserver/MemStoreFlusher.java     |  29 +-
 .../hbase/regionserver/MemStoreSnapshot.java    |  14 +-
 .../hadoop/hbase/regionserver/MemstoreSize.java |  91 +++++++
 .../MetricsRegionServerWrapperImpl.java         |   2 +-
 .../regionserver/MetricsRegionWrapperImpl.java  |   2 +-
 .../hbase/regionserver/MutableSegment.java      |  67 ++++-
 .../hadoop/hbase/regionserver/Region.java       |   6 +-
 .../regionserver/RegionServerAccounting.java    |  54 ++--
 .../regionserver/RegionServicesForStores.java   |   9 +-
 .../hadoop/hbase/regionserver/Segment.java      |  71 +++--
 .../apache/hadoop/hbase/regionserver/Store.java |  23 ++
 .../hadoop/hbase/client/TestClientPushback.java |   6 +-
 .../regionserver/TestCompactingMemStore.java    | 263 +++++++++----------
 .../TestCompactingToCellArrayMapMemStore.java   | 131 +++++----
 .../hbase/regionserver/TestDefaultMemStore.java | 174 +++++-------
 .../hbase/regionserver/TestHMobStore.java       |  87 +++---
 .../hadoop/hbase/regionserver/TestHRegion.java  |  19 +-
 .../regionserver/TestHRegionReplayEvents.java   |   4 +-
 .../regionserver/TestMemStoreChunkPool.java     |  24 +-
 .../regionserver/TestPerColumnFamilyFlush.java  | 104 ++++----
 .../TestRegionMergeTransaction.java             |   5 +-
 .../regionserver/TestReversibleScanners.java    |   6 +-
 .../regionserver/TestSplitTransaction.java      |   5 +-
 .../hadoop/hbase/regionserver/TestStore.java    | 258 +++---------------
 .../TestWalAndCompactingMemStoreFlush.java      | 252 +++++++-----------
 .../regionserver/wal/AbstractTestWALReplay.java |   8 +-
 44 files changed, 1225 insertions(+), 1354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 7988352..484eebd 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -381,6 +381,7 @@ public final class CellUtil {
   private static class TagRewriteCell implements ExtendedCell {
     protected Cell cell;
     protected byte[] tags;
+    private static final long HEAP_SIZE_OVERHEAD = 2 * ClassSize.REFERENCE + ClassSize.ARRAY;
 
     /**
      * @param cell The original Cell which it rewrites
@@ -552,6 +553,11 @@ public final class CellUtil {
       offset = Bytes.putAsShort(buf, offset, tagsLen);
       System.arraycopy(this.tags, 0, buf, offset, tagsLen);
     }
+
+    @Override
+    public long heapOverhead() {
+      return ((ExtendedCell) this.cell).heapOverhead() + HEAP_SIZE_OVERHEAD;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
index 420a5f9..f60da14 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
@@ -66,4 +66,9 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam
    * @param offset The offset within buffer, to write the Cell.
    */
   void write(byte[] buf, int offset);
+
+  /**
+   * @return The heap size overhead associated with this Cell.
+   */
+  long heapOverhead();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index f9a621a..b00ca1b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -85,6 +85,11 @@ public class KeyValue implements ExtendedCell {
 
   private static final Log LOG = LogFactory.getLog(KeyValue.class);
 
+  public static final long FIXED_OVERHEAD = ClassSize.OBJECT + // the KeyValue object itself
+      ClassSize.REFERENCE + // pointer to "bytes"
+      2 * Bytes.SIZEOF_INT + // offset, length
+      Bytes.SIZEOF_LONG;// memstoreTS
+
   /**
    * Colon character in UTF-8
    */
@@ -2603,12 +2608,7 @@ public class KeyValue implements ExtendedCell {
    */
   @Override
   public long heapSize() {
-    int sum = 0;
-    sum += ClassSize.OBJECT;// the KeyValue object itself
-    sum += ClassSize.REFERENCE;// pointer to "bytes"
-    sum += 2 * Bytes.SIZEOF_INT;// offset, length
-    sum += Bytes.SIZEOF_LONG;// memstoreTS
-
+    long sum = FIXED_OVERHEAD;
     /*
      * Deep object overhead for this KV consists of two parts. The first part is the KV object
      * itself, while the second part is the backing byte[]. We will only count the array overhead
@@ -2812,5 +2812,15 @@ public class KeyValue implements ExtendedCell {
       // of Cell to be returned back over the RPC
       throw new IllegalStateException("A reader should never return this type of a Cell");
     }
+
+    @Override
+    public long heapOverhead() {
+      return super.heapOverhead() + Bytes.SIZEOF_SHORT;
+    }
+  }
+
+  @Override
+  public long heapOverhead() {
+    return FIXED_OVERHEAD;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
index ae2496b..2165362 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
@@ -42,7 +42,7 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
   private final boolean hasTags;
   // TODO : See if famLen can be cached or not?
 
-  private static final int FIXED_HEAP_SIZE_OVERHEAD = ClassSize.OBJECT + ClassSize.REFERENCE
+  private static final int FIXED_OVERHEAD = ClassSize.OBJECT + ClassSize.REFERENCE
       + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_SHORT
       + Bytes.SIZEOF_BOOLEAN + Bytes.SIZEOF_LONG;
 
@@ -235,7 +235,7 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
 
   @Override
   public long heapSize() {
-    return ClassSize.align(FIXED_HEAP_SIZE_OVERHEAD + ClassSize.align(length));
+    return ClassSize.align(FIXED_OVERHEAD + ClassSize.align(length));
   }
 
   @Override
@@ -276,4 +276,9 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
     // TODO when doing HBASE-15179
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public long heapOverhead() {
+    return FIXED_OVERHEAD;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java
index cccbae0..60c4cbf 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java
@@ -32,7 +32,9 @@ import org.apache.hadoop.hbase.util.Bytes;
 @InterfaceAudience.Private
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS")
 public class SizeCachedKeyValue extends KeyValue {
-  private static final int HEAP_SIZE_OVERHEAD = Bytes.SIZEOF_SHORT + Bytes.SIZEOF_INT;
+  // Overhead in this class alone. Parent's overhead will be considered in usage places by calls to
+  // super. methods
+  private static final int FIXED_OVERHEAD = Bytes.SIZEOF_SHORT + Bytes.SIZEOF_INT;
 
   private short rowLen;
   private int keyLen;
@@ -58,6 +60,11 @@ public class SizeCachedKeyValue extends KeyValue {
 
   @Override
   public long heapSize() {
-    return super.heapSize() + HEAP_SIZE_OVERHEAD;
+    return super.heapSize() + FIXED_OVERHEAD;
+  }
+
+  @Override
+  public long heapOverhead() {
+    return super.heapOverhead() + FIXED_OVERHEAD;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index edecd9a..216a82d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -468,6 +468,11 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
       // This is not used in actual flow. Throwing UnsupportedOperationException
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public long heapOverhead() {
+      return FIXED_OVERHEAD;
+    }
   }
 
   protected static class OffheapDecodedCell extends ByteBufferedCell implements ExtendedCell {
@@ -707,6 +712,11 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
       // This is not used in actual flow. Throwing UnsupportedOperationException
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public long heapOverhead() {
+      return FIXED_OVERHEAD;
+    }
   }
 
   protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index eb2564d..a5229b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -129,7 +129,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
       scanner.close();
     }
     LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize="
-        + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getSize(), "", 1) +
+        + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getDataSize(), "", 1) +
         ", hasBloomFilter=" + writer.hasGeneralBloom() +
         ", into tmp file " + writer.getPath());
     result.add(writer.getPath());

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index 5544251..a4ea3ee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.SortedSet;
@@ -30,9 +28,6 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.ShareableMemory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
@@ -78,18 +73,6 @@ public abstract class AbstractMemStore implements MemStore {
     this.timeOfOldestEdit = Long.MAX_VALUE;
   }
 
-  /*
-  * Calculate how the MemStore size has changed.  Includes overhead of the
-  * backing Map.
-  * @param cell
-  * @param notPresent True if the cell was NOT present in the set.
-  * @return change in size
-  */
-  static long heapSizeChange(final Cell cell, final boolean notPresent) {
-    return notPresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY
-        + CellUtil.estimatedHeapSizeOf(cell)) : 0;
-  }
-
   /**
    * Updates the wal with the lowest sequence id (oldest entry) that is still in memory
    * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or
@@ -98,22 +81,14 @@ public abstract class AbstractMemStore implements MemStore {
   public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent);
 
   @Override
-  public long add(Iterable<Cell> cells) {
-    long size = 0;
+  public void add(Iterable<Cell> cells, MemstoreSize memstoreSize) {
     for (Cell cell : cells) {
-      size += add(cell);
+      add(cell, memstoreSize);
     }
-    return size;
   }
-  
-  /**
-   * Write an update
-   * @param cell the cell to be added
-   * @return approximate size of the passed cell & newly added cell which maybe different than the
-   *         passed-in cell
-   */
+
   @Override
-  public long add(Cell cell) {
+  public void add(Cell cell, MemstoreSize memstoreSize) {
     Cell toAdd = maybeCloneWithAllocator(cell);
     boolean mslabUsed = (toAdd != cell);
     // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By
@@ -128,7 +103,7 @@ public abstract class AbstractMemStore implements MemStore {
     if (!mslabUsed) {
       toAdd = deepCopyIfNeeded(toAdd);
     }
-    return internalAdd(toAdd, mslabUsed);
+    internalAdd(toAdd, mslabUsed, memstoreSize);
   }
 
   private static Cell deepCopyIfNeeded(Cell cell) {
@@ -141,31 +116,11 @@ public abstract class AbstractMemStore implements MemStore {
     return cell;
   }
 
-  /**
-   * Update or insert the specified Cells.
-   * <p>
-   * For each Cell, insert into MemStore.  This will atomically upsert the
-   * value for that row/family/qualifier.  If a Cell did already exist,
-   * it will then be removed.
-   * <p>
-   * Currently the memstoreTS is kept at 0 so as each insert happens, it will
-   * be immediately visible.  May want to change this so it is atomic across
-   * all Cells.
-   * <p>
-   * This is called under row lock, so Get operations will still see updates
-   * atomically.  Scans will only see each Cell update as atomic.
-   *
-   * @param cells the cells to be updated
-   * @param readpoint readpoint below which we can safely remove duplicate KVs
-   * @return change in memstore size
-   */
   @Override
-  public long upsert(Iterable<Cell> cells, long readpoint) {
-    long size = 0;
+  public void upsert(Iterable<Cell> cells, long readpoint, MemstoreSize memstoreSize) {
     for (Cell cell : cells) {
-      size += upsert(cell, readpoint);
+      upsert(cell, readpoint, memstoreSize);
     }
-    return size;
   }
 
   /**
@@ -176,18 +131,6 @@ public abstract class AbstractMemStore implements MemStore {
     return timeOfOldestEdit;
   }
 
-
-  /**
-   * Write a delete
-   * @param deleteCell the cell to be deleted
-   * @return approximate size of the passed key and value.
-   */
-  @Override
-  public long delete(Cell deleteCell) {
-    // Delete operation just adds the delete marker cell coming here.
-    return add(deleteCell);
-  }
-
   /**
    * The passed snapshot was successfully persisted; it can be let go.
    * @param id Id of the snapshot to clean out.
@@ -210,18 +153,9 @@ public abstract class AbstractMemStore implements MemStore {
     oldSnapshot.close();
   }
 
-  /**
-   * Get the entire heap usage for this MemStore not including keys in the
-   * snapshot.
-   */
-  @Override
-  public long heapSize() {
-    return size();
-  }
-
   @Override
-  public long getSnapshotSize() {
-    return this.snapshot.keySize();
+  public MemstoreSize getSnapshotSize() {
+    return new MemstoreSize(this.snapshot.keySize(), this.snapshot.heapOverhead());
   }
 
   @Override
@@ -249,7 +183,7 @@ public abstract class AbstractMemStore implements MemStore {
   }
 
 
-  /**
+  /*
    * Inserts the specified Cell into MemStore and deletes any existing
    * versions of the same row/family/qualifier as the specified Cell.
    * <p>
@@ -262,9 +196,9 @@ public abstract class AbstractMemStore implements MemStore {
    *
    * @param cell the cell to be updated
    * @param readpoint readpoint below which we can safely remove duplicate KVs
-   * @return change in size of MemStore
+   * @param memstoreSize
    */
-  private long upsert(Cell cell, long readpoint) {
+  private void upsert(Cell cell, long readpoint, MemstoreSize memstoreSize) {
     // Add the Cell to the MemStore
     // Use the internalAdd method here since we (a) already have a lock
     // and (b) cannot safely use the MSLAB here without potentially
@@ -275,50 +209,9 @@ public abstract class AbstractMemStore implements MemStore {
     // must do below deep copy. Or else we will keep referring to the bigger chunk of memory and
     // prevent it from getting GCed.
     cell = deepCopyIfNeeded(cell);
-    long addedSize = internalAdd(cell, false);
-
-    // Get the Cells for the row/family/qualifier regardless of timestamp.
-    // For this case we want to clean up any other puts
-    Cell firstCell = CellUtil.createFirstOnRow(
-        cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
-        cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
-        cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
-    SortedSet<Cell> ss = active.tailSet(firstCell);
-    Iterator<Cell> it = ss.iterator();
-    // versions visible to oldest scanner
-    int versionsVisible = 0;
-    while (it.hasNext()) {
-      Cell cur = it.next();
-
-      if (cell == cur) {
-        // ignore the one just put in
-        continue;
-      }
-      // check that this is the row and column we are interested in, otherwise bail
-      if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
-        // only remove Puts that concurrent scanners cannot possibly see
-        if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
-            cur.getSequenceId() <= readpoint) {
-          if (versionsVisible >= 1) {
-            // if we get here we have seen at least one version visible to the oldest scanner,
-            // which means we can prove that no scanner will see this version
-
-            // false means there was a change, so give us the size.
-            long delta = heapSizeChange(cur, true);
-            addedSize -= delta;
-            active.incSize(-delta);
-            it.remove();
-            setOldestEditTimeToNow();
-          } else {
-            versionsVisible++;
-          }
-        }
-      } else {
-        // past the row or column, done
-        break;
-      }
-    }
-    return addedSize;
+    this.active.upsert(cell, readpoint, memstoreSize);
+    setOldestEditTimeToNow();
+    checkActiveSize();
   }
 
   /*
@@ -359,75 +252,23 @@ public abstract class AbstractMemStore implements MemStore {
     return result;
   }
 
-  /**
-   * Given the specs of a column, update it, first by inserting a new record,
-   * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
-   * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
-   * store will ensure that the insert/delete each are atomic. A scanner/reader will either
-   * get the new value, or the old value and all readers will eventually only see the new
-   * value after the old was removed.
-   */
-  @VisibleForTesting
-  @Override
-  public long updateColumnValue(byte[] row, byte[] family, byte[] qualifier,
-      long newValue, long now) {
-    Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
-    // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
-    Cell snc = snapshot.getFirstAfter(firstCell);
-    if(snc != null) {
-      // is there a matching Cell in the snapshot?
-      if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) {
-        if (snc.getTimestamp() == now) {
-          now += 1;
-        }
-      }
-    }
-    // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
-    // But the timestamp should also be max(now, mostRecentTsInMemstore)
-
-    // so we cant add the new Cell w/o knowing what's there already, but we also
-    // want to take this chance to delete some cells. So two loops (sad)
-
-    SortedSet<Cell> ss = this.active.tailSet(firstCell);
-    for (Cell cell : ss) {
-      // if this isnt the row we are interested in, then bail:
-      if (!CellUtil.matchingColumn(cell, family, qualifier)
-          || !CellUtil.matchingRow(cell, firstCell)) {
-        break; // rows dont match, bail.
-      }
-
-      // if the qualifier matches and it's a put, just RM it out of the active.
-      if (cell.getTypeByte() == KeyValue.Type.Put.getCode() &&
-          cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) {
-        now = cell.getTimestamp();
-      }
-    }
-
-    // create or update (upsert) a new Cell with
-    // 'now' and a 0 memstoreTS == immediately visible
-    List<Cell> cells = new ArrayList<Cell>(1);
-    cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
-    return upsert(cells, 1L);
-  }
-
   private Cell maybeCloneWithAllocator(Cell cell) {
     return active.maybeCloneWithAllocator(cell);
   }
 
-  /**
+  /*
    * Internal version of add() that doesn't clone Cells with the
    * allocator, and doesn't take the lock.
    *
    * Callers should ensure they already have the read lock taken
    * @param toAdd the cell to add
    * @param mslabUsed whether using MSLAB
-   * @return the heap size change in bytes
+   * @param memstoreSize
    */
-  private long internalAdd(final Cell toAdd, final boolean mslabUsed) {
-    long s = active.add(toAdd, mslabUsed);
+  private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemstoreSize memstoreSize) {
+    active.add(toAdd, mslabUsed, memstoreSize);
     setOldestEditTimeToNow();
     checkActiveSize();
-    return s;
   }
 
   private void setOldestEditTimeToNow() {
@@ -437,11 +278,15 @@ public abstract class AbstractMemStore implements MemStore {
   }
 
   /**
-   * @return The size of the active segment. Means sum of all cell's size.
+   * @return The total size of cells in this memstore. We will not consider cells in the snapshot
    */
-  protected long keySize() {
-    return this.active.keySize();
-  }
+  protected abstract long keySize();
+
+  /**
+   * @return The total heap overhead of cells in this memstore. We will not consider cells in the
+   *         snapshot
+   */
+  protected abstract long heapOverhead();
 
   protected CellComparator getComparator() {
     return comparator;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 1ecd868..a7eb19e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -100,19 +100,19 @@ public class CompactingMemStore extends AbstractMemStore {
   }
 
   /**
-   * @return Total memory occupied by this MemStore. This includes active segment size and heap size
-   *         overhead of this memstore but won't include any size occupied by the snapshot. We
-   *         assume the snapshot will get cleared soon. This is not thread safe and the memstore may
-   *         be changed while computing its size. It is the responsibility of the caller to make
-   *         sure this doesn't happen.
+   * @return Total memory occupied by this MemStore. This won't include any size occupied by the
+   *         snapshot. We assume the snapshot will get cleared soon. This is not thread safe and
+   *         the memstore may be changed while computing its size. It is the responsibility of the
+   *         caller to make sure this doesn't happen.
    */
   @Override
-  public long size() {
-    long res = DEEP_OVERHEAD + this.active.size();
+  public MemstoreSize size() {
+    MemstoreSize memstoreSize = new MemstoreSize();
+    memstoreSize.incMemstoreSize(this.active.keySize(), this.active.heapOverhead());
     for (Segment item : pipeline.getSegments()) {
-      res += CompactionPipeline.ENTRY_OVERHEAD + item.size();
+      memstoreSize.incMemstoreSize(item.keySize(), item.heapOverhead());
     }
-    return res;
+    return memstoreSize;
   }
 
   /**
@@ -163,13 +163,34 @@ public class CompactingMemStore extends AbstractMemStore {
    * @return size of data that is going to be flushed
    */
   @Override
-  public long getFlushableSize() {
-    long snapshotSize = getSnapshotSize();
-    if (snapshotSize == 0) {
+  public MemstoreSize getFlushableSize() {
+    MemstoreSize snapshotSize = getSnapshotSize();
+    if (snapshotSize.getDataSize() == 0) {
       // if snapshot is empty the tail of the pipeline is flushed
       snapshotSize = pipeline.getTailSize();
     }
-    return snapshotSize > 0 ? snapshotSize : keySize();
+    return snapshotSize.getDataSize() > 0 ? snapshotSize
+        : new MemstoreSize(this.active.keySize(), this.active.heapOverhead());
+  }
+
+  @Override
+  protected long keySize() {
+    // Need to consider keySize of all segments in pipeline and active
+    long k = this.active.keySize();
+    for (Segment segment : this.pipeline.getSegments()) {
+      k += segment.keySize();
+    }
+    return k;
+  }
+
+  @Override
+  protected long heapOverhead() {
+    // Need to consider heapOverhead of all segments in pipeline and active
+    long h = this.active.heapOverhead();
+    for (Segment segment : this.pipeline.getSegments()) {
+      h += segment.heapOverhead();
+    }
+    return h;
   }
 
   @Override
@@ -318,7 +339,7 @@ public class CompactingMemStore extends AbstractMemStore {
   }
 
   private boolean shouldFlushInMemory() {
-    if (this.active.size() > inmemoryFlushSize) { // size above flush threshold
+    if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold
         // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude
         // the insert of the active into the compaction pipeline
         return (inMemoryFlushInProgress.compareAndSet(false,true));
@@ -419,7 +440,7 @@ public class CompactingMemStore extends AbstractMemStore {
 
   // debug method
   public void debug() {
-    String msg = "active size=" + this.active.size();
+    String msg = "active size=" + this.active.keySize();
     msg += " threshold="+IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT* inmemoryFlushSize;
     msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false");
     msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false");

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 28c9383..6676170 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -115,19 +115,30 @@ public class CompactionPipeline {
     }
     if (region != null) {
       // update the global memstore size counter
-      long suffixSize = getSegmentsKeySize(suffix);
-      long newSize = segment.keySize();
-      long delta = suffixSize - newSize;
-      assert ( closeSuffix || delta>0 ); // sanity check
-      long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta);
+      long suffixDataSize = getSegmentsKeySize(suffix);
+      long newDataSize = segment.keySize();
+      long dataSizeDelta = suffixDataSize - newDataSize;
+      long suffixHeapOverhead = getSegmentsHeapOverhead(suffix);
+      long newHeapOverhead = segment.heapOverhead();
+      long heapOverheadDelta = suffixHeapOverhead - newHeapOverhead;
+      region.addMemstoreSize(new MemstoreSize(-dataSizeDelta, -heapOverheadDelta));
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Suffix size: " + suffixSize + " compacted item size: " + newSize
-            + " globalMemstoreSize: " + globalMemstoreSize);
+        LOG.debug("Suffix data size: " + suffixDataSize + " compacted item data size: "
+            + newDataSize + ". Suffix heap overhead: " + suffixHeapOverhead
+            + " compacted item heap overhead: " + newHeapOverhead);
       }
     }
     return true;
   }
 
+  private static long getSegmentsHeapOverhead(List<? extends Segment> list) {
+    long res = 0;
+    for (Segment segment : list) {
+      res += segment.heapOverhead();
+    }
+    return res;
+  }
+
   private static long getSegmentsKeySize(List<? extends Segment> list) {
     long res = 0;
     for (Segment segment : list) {
@@ -160,16 +171,12 @@ public class CompactionPipeline {
 
       for (ImmutableSegment s : pipeline) {
         // remember the old size in case this segment is going to be flatten
-        long sizeBeforeFlat = s.keySize();
-        long globalMemstoreSize = 0;
-        if (s.flatten()) {
+        MemstoreSize memstoreSize = new MemstoreSize();
+        if (s.flatten(memstoreSize)) {
           if(region != null) {
-            long sizeAfterFlat = s.keySize();
-            long delta = sizeBeforeFlat - sizeAfterFlat;
-            globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta);
+            region.addMemstoreSize(memstoreSize);
           }
-          LOG.debug("Compaction pipeline segment " + s + " was flattened; globalMemstoreSize: "
-              + globalMemstoreSize);
+          LOG.debug("Compaction pipeline segment " + s + " was flattened");
           return true;
         }
       }
@@ -203,9 +210,9 @@ public class CompactionPipeline {
     return minSequenceId;
   }
 
-  public long getTailSize() {
-    if (isEmpty()) return 0;
-    return pipeline.peekLast().keySize();
+  public MemstoreSize getTailSize() {
+    if (isEmpty()) return MemstoreSize.EMPTY_SIZE;
+    return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead());
   }
 
   private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment segment,

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index b448b04..d4e6e12 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -104,9 +104,20 @@ public class DefaultMemStore extends AbstractMemStore {
    * @return size of data that is going to be flushed from active set
    */
   @Override
-  public long getFlushableSize() {
-    long snapshotSize = getSnapshotSize();
-    return snapshotSize > 0 ? snapshotSize : keySize();
+  public MemstoreSize getFlushableSize() {
+    MemstoreSize snapshotSize = getSnapshotSize();
+    return snapshotSize.getDataSize() > 0 ? snapshotSize
+        : new MemstoreSize(keySize(), heapOverhead());
+  }
+
+  @Override
+  protected long keySize() {
+    return this.active.keySize();
+  }
+
+  @Override
+  protected long heapOverhead() {
+    return this.active.heapOverhead();
   }
 
   @Override
@@ -144,8 +155,8 @@ public class DefaultMemStore extends AbstractMemStore {
   }
 
   @Override
-  public long size() {
-    return this.active.size() + DEEP_OVERHEAD;
+  public MemstoreSize size() {
+    return new MemstoreSize(this.active.keySize(), this.active.heapOverhead());
   }
 
   /**
@@ -179,26 +190,30 @@ public class DefaultMemStore extends AbstractMemStore {
     LOG.info("vmInputArguments=" + runtime.getInputArguments());
     DefaultMemStore memstore1 = new DefaultMemStore();
     // TODO: x32 vs x64
-    long size = 0;
     final int count = 10000;
     byte [] fam = Bytes.toBytes("col");
     byte [] qf = Bytes.toBytes("umn");
     byte [] empty = new byte[0];
+    MemstoreSize memstoreSize = new MemstoreSize();
     for (int i = 0; i < count; i++) {
       // Give each its own ts
-      size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+      memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSize);
     }
-    LOG.info("memstore1 estimated size=" + size);
+    LOG.info("memstore1 estimated size="
+        + (memstoreSize.getDataSize() + memstoreSize.getHeapOverhead()));
     for (int i = 0; i < count; i++) {
-      size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+      memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSize);
     }
-    LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
+    LOG.info("memstore1 estimated size (2nd loading of same data)="
+        + (memstoreSize.getDataSize() + memstoreSize.getHeapOverhead()));
     // Make a variably sized memstore.
     DefaultMemStore memstore2 = new DefaultMemStore();
+    memstoreSize = new MemstoreSize();
     for (int i = 0; i < count; i++) {
-      size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]));
+      memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memstoreSize);
     }
-    LOG.info("memstore2 estimated size=" + size);
+    LOG.info("memstore2 estimated size="
+        + (memstoreSize.getDataSize() + memstoreSize.getHeapOverhead()));
     final int seconds = 30;
     LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
     LOG.info("Exiting.");

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index 079501e..93837b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -89,7 +89,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
       scanner.close();
     }
     LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
-        + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getSize(), "", 1) +
+        + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getDataSize(), "", 1) +
         ", hasBloomFilter=" + writer.hasGeneralBloom() +
         ", into tmp file " + writer.getPath());
     result.add(writer.getPath());

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
index 49cb747..119fdb5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
@@ -78,11 +78,12 @@ public abstract class FlushLargeStoresPolicy extends FlushPolicy {
   }
 
   protected boolean shouldFlush(Store store) {
-    if (store.getMemStoreSize() > this.flushSizeLowerBound) {
+    if (store.getSizeOfMemStore().getDataSize() > this.flushSizeLowerBound) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " +
             region.getRegionInfo().getEncodedName() + " because memstoreSize=" +
-            store.getMemStoreSize() + " > lower bound=" + this.flushSizeLowerBound);
+            store.getSizeOfMemStore().getDataSize() + " > lower bound="
+            + this.flushSizeLowerBound);
       }
       return true;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 99c389d..fff2e6f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -259,7 +259,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   // TODO: account for each registered handler in HeapSize computation
   private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap();
 
-  private final AtomicLong memstoreSize = new AtomicLong(0);
+  private final AtomicLong memstoreDataSize = new AtomicLong(0);// Track data size in all memstores
   private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this);
 
   // Debug possible data loss due to WAL off
@@ -506,23 +506,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     final FlushResult result; // indicating a failure result from prepare
     final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
     final TreeMap<byte[], List<Path>> committedFiles;
-    final TreeMap<byte[], Long> storeFlushableSize;
+    final TreeMap<byte[], MemstoreSize> storeFlushableSize;
     final long startTime;
     final long flushOpSeqId;
     final long flushedSeqId;
-    final long totalFlushableSize;
+    final MemstoreSize totalFlushableSize;
 
     /** Constructs an early exit case */
     PrepareFlushResult(FlushResult result, long flushSeqId) {
-      this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, 0);
+      this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, new MemstoreSize());
     }
 
     /** Constructs a successful prepare flush result */
     PrepareFlushResult(
       TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
       TreeMap<byte[], List<Path>> committedFiles,
-      TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
-      long flushedSeqId, long totalFlushableSize) {
+      TreeMap<byte[], MemstoreSize> storeFlushableSize, long startTime, long flushSeqId,
+      long flushedSeqId, MemstoreSize totalFlushableSize) {
       this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
         flushSeqId, flushedSeqId, totalFlushableSize);
     }
@@ -531,8 +531,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       FlushResult result,
       TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
       TreeMap<byte[], List<Path>> committedFiles,
-      TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
-      long flushedSeqId, long totalFlushableSize) {
+      TreeMap<byte[], MemstoreSize> storeFlushableSize, long startTime, long flushSeqId,
+      long flushedSeqId, MemstoreSize totalFlushableSize) {
       this.result = result;
       this.storeFlushCtxs = storeFlushCtxs;
       this.committedFiles = committedFiles;
@@ -1125,19 +1125,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * store
    * @return the size of memstore in this region
    */
-  public long addAndGetGlobalMemstoreSize(long memStoreSize) {
+  public long addAndGetMemstoreSize(MemstoreSize memstoreSize) {
     if (this.rsAccounting != null) {
-      rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
+      rsAccounting.incGlobalMemstoreSize(memstoreSize);
     }
-    long size = this.memstoreSize.addAndGet(memStoreSize);
+    long size = this.memstoreDataSize.addAndGet(memstoreSize.getDataSize());
+    checkNegativeMemstoreDataSize(size, memstoreSize.getDataSize());
+    return size;
+  }
+
+  public void decrMemstoreSize(MemstoreSize memstoreSize) {
+    if (this.rsAccounting != null) {
+      rsAccounting.decGlobalMemstoreSize(memstoreSize);
+    }
+    long size = this.memstoreDataSize.addAndGet(-memstoreSize.getDataSize());
+    checkNegativeMemstoreDataSize(size, -memstoreSize.getDataSize());
+  }
+
+  private void checkNegativeMemstoreDataSize(long memstoreDataSize, long delta) {
     // This is extremely bad if we make memstoreSize negative. Log as much info on the offending
     // caller as possible. (memStoreSize might be a negative value already -- freeing memory)
-    if (size < 0) {
+    if (memstoreDataSize < 0) {
       LOG.error("Asked to modify this region's (" + this.toString()
-      + ") memstoreSize to a negative value which is incorrect. Current memstoreSize="
-      + (size-memStoreSize) + ", delta=" + memStoreSize, new Exception());
+          + ") memstoreSize to a negative value which is incorrect. Current memstoreSize="
+          + (memstoreDataSize - delta) + ", delta=" + delta, new Exception());
     }
-    return size;
   }
 
   @Override
@@ -1180,7 +1192,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public long getMemstoreSize() {
-    return memstoreSize.get();
+    return memstoreDataSize.get();
   }
 
   @Override
@@ -1490,7 +1502,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         int failedfFlushCount = 0;
         int flushCount = 0;
         long tmp = 0;
-        long remainingSize = this.memstoreSize.get();
+        long remainingSize = this.memstoreDataSize.get();
         while (remainingSize > 0) {
           try {
             internalFlushcache(status);
@@ -1499,7 +1511,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                   " (carrying snapshot?) " + this);
             }
             flushCount++;
-            tmp = this.memstoreSize.get();
+            tmp = this.memstoreDataSize.get();
             if (tmp >= remainingSize) {
               failedfFlushCount++;
             }
@@ -1534,8 +1546,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
         // close each store in parallel
         for (final Store store : stores.values()) {
-          long flushableSize = store.getFlushableSize();
-          if (!(abort || flushableSize == 0 || writestate.readOnly)) {
+          MemstoreSize flushableSize = store.getSizeToFlush();
+          if (!(abort || flushableSize.getDataSize() == 0 || writestate.readOnly)) {
             if (getRegionServerServices() != null) {
               getRegionServerServices().abort("Assertion failed while closing store "
                 + getRegionInfo().getRegionNameAsString() + " " + store
@@ -1580,9 +1592,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       this.closed.set(true);
       if (!canFlush) {
-        addAndGetGlobalMemstoreSize(-memstoreSize.get());
-      } else if (memstoreSize.get() != 0) {
-        LOG.error("Memstore size is " + memstoreSize.get());
+        this.decrMemstoreSize(new MemstoreSize(memstoreDataSize.get(), getMemstoreHeapOverhead()));
+      } else if (memstoreDataSize.get() != 0) {
+        LOG.error("Memstore size is " + memstoreDataSize.get());
       }
       if (coprocessorHost != null) {
         status.setStatus("Running coprocessor post-close hooks");
@@ -1605,6 +1617,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  private long getMemstoreHeapOverhead() {
+    long overhead = 0;
+    for (Store s : this.stores.values()) {
+      overhead += s.getSizeOfMemStore().getHeapOverhead();
+    }
+    return overhead;
+  }
+
   @Override
   public void waitForFlushesAndCompactions() {
     synchronized (writestate) {
@@ -1670,7 +1690,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     * @return True if its worth doing a flush before we put up the close flag.
     */
   private boolean worthPreFlushing() {
-    return this.memstoreSize.get() >
+    return this.memstoreDataSize.get() >
       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
   }
 
@@ -2246,12 +2266,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // bulk loaded file between memory and existing hfiles. It wants a good seqeunceId that belongs
     // to no other that it can use to associate with the bulk load. Hence this little dance below
     // to go get one.
-    if (this.memstoreSize.get() <= 0) {
+    if (this.memstoreDataSize.get() <= 0) {
       // Take an update lock so no edits can come into memory just yet.
       this.updatesLock.writeLock().lock();
       WriteEntry writeEntry = null;
       try {
-        if (this.memstoreSize.get() <= 0) {
+        if (this.memstoreDataSize.get() <= 0) {
           // Presume that if there are still no edits in the memstore, then there are no edits for
           // this region out in the WAL subsystem so no need to do any trickery clearing out
           // edits in the WAL sub-system. Up the sequence number so the resulting flush id is for
@@ -2294,7 +2314,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // block waiting for the lock for internal flush
     this.updatesLock.writeLock().lock();
     status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName());
-    long totalFlushableSizeOfFlushableStores = 0;
+    MemstoreSize totalSizeOfFlushableStores = new MemstoreSize();
 
     Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
     for (Store store: storesToFlush) {
@@ -2305,8 +2325,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       = new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
         Bytes.BYTES_COMPARATOR);
-    TreeMap<byte[], Long> storeFlushableSize
-        = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+    TreeMap<byte[], MemstoreSize> storeFlushableSize
+        = new TreeMap<byte[], MemstoreSize>(Bytes.BYTES_COMPARATOR);
     // The sequence id of this flush operation which is used to log FlushMarker and pass to
     // createFlushContext to use as the store file's sequence id. It can be in advance of edits
     // still in the memstore, edits that are in other column families yet to be flushed.
@@ -2338,10 +2358,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
 
       for (Store s : storesToFlush) {
-        totalFlushableSizeOfFlushableStores += s.getFlushableSize();
+        MemstoreSize flushableSize = s.getSizeToFlush();
+        totalSizeOfFlushableStores.incMemstoreSize(flushableSize);
         storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
         committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
-        storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize());
+        storeFlushableSize.put(s.getFamily().getName(), flushableSize);
       }
 
       // write the snapshot start to WAL
@@ -2364,11 +2385,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       this.updatesLock.writeLock().unlock();
     }
     String s = "Finished memstore snapshotting " + this + ", syncing WAL and waiting on mvcc, " +
-        "flushsize=" + totalFlushableSizeOfFlushableStores;
+        "flushsize=" + totalSizeOfFlushableStores;
     status.setStatus(s);
     doSyncOfUnflushedWALChanges(wal, getRegionInfo());
     return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
-        flushOpSeqId, flushedSeqId, totalFlushableSizeOfFlushableStores);
+        flushOpSeqId, flushedSeqId, totalSizeOfFlushableStores);
   }
 
   /**
@@ -2384,11 +2405,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       perCfExtras = new StringBuilder();
       for (Store store: storesToFlush) {
         perCfExtras.append("; ").append(store.getColumnFamilyName());
-        perCfExtras.append("=").append(StringUtils.byteDesc(store.getFlushableSize()));
+        perCfExtras.append("=")
+            .append(StringUtils.byteDesc(store.getSizeToFlush().getDataSize()));
       }
     }
     LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
-        " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) +
+        " column families, memstore=" + StringUtils.byteDesc(this.memstoreDataSize.get()) +
         ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
         ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId));
   }
@@ -2468,7 +2490,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     long startTime = prepareResult.startTime;
     long flushOpSeqId = prepareResult.flushOpSeqId;
     long flushedSeqId = prepareResult.flushedSeqId;
-    long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize;
 
     String s = "Flushing stores of " + this;
     status.setStatus(s);
@@ -2504,14 +2525,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         committedFiles.put(storeName, storeCommittedFiles);
         // Flush committed no files, indicating flush is empty or flush was canceled
         if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) {
-          totalFlushableSizeOfFlushableStores -= prepareResult.storeFlushableSize.get(storeName);
+          MemstoreSize storeFlushableSize = prepareResult.storeFlushableSize.get(storeName);
+          prepareResult.totalFlushableSize.decMemstoreSize(storeFlushableSize);
         }
         flushedOutputFileSize += flush.getOutputFileSize();
       }
       storeFlushCtxs.clear();
 
       // Set down the memstore size by amount of flush.
-      this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
+      this.decrMemstoreSize(prepareResult.totalFlushableSize);
 
       if (wal != null) {
         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
@@ -2581,10 +2603,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     long time = EnvironmentEdgeManager.currentTime() - startTime;
-    long memstoresize = this.memstoreSize.get();
+    long memstoresize = this.memstoreDataSize.get();
     String msg = "Finished memstore flush of ~"
-        + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
-        + totalFlushableSizeOfFlushableStores + ", currentsize="
+        + StringUtils.byteDesc(prepareResult.totalFlushableSize.getDataSize()) + "/"
+        + prepareResult.totalFlushableSize.getDataSize() + ", currentsize="
         + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
         + " for region " + this + " in " + time + "ms, sequenceid="
         + flushOpSeqId +  ", compaction requested=" + compactionRequested
@@ -2594,7 +2616,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     if (rsServices != null && rsServices.getMetrics() != null) {
       rsServices.getMetrics().updateFlush(time - startTime,
-        totalFlushableSizeOfFlushableStores, flushedOutputFileSize);
+          prepareResult.totalFlushableSize.getDataSize(), flushedOutputFileSize);
     }
 
     return new FlushResultImpl(compactionRequested ?
@@ -3026,10 +3048,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return Change in size brought about by applying <code>batchOp</code>
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK",
-		  justification="Findbugs seems to be confused on this.")
+      justification="Findbugs seems to be confused on this.")
   @SuppressWarnings("unchecked")
   // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120
-  private long doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException {
+  private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException {
     boolean replay = batchOp.isInReplay();
     // Variable to note if all Put items are for the same CF -- metrics related
     boolean putsCfSetConsistent = true;
@@ -3055,7 +3077,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     int cellCount = 0;
     /** Keep track of the locks we hold so we can release them in finally clause */
     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
-    long addedSize = 0;
+    MemstoreSize memstoreSize = new MemstoreSize();
     try {
       // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.
       int numReadyToWrite = 0;
@@ -3117,7 +3139,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
       if (numReadyToWrite <= 0) {
-        return 0L;
+        return;
       }
 
       for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) {
@@ -3155,7 +3177,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
         if (coprocessorHost.preBatchMutate(miniBatchOp)) {
-          return 0L;
+          return;
         } else {
           for (int i = firstIndex; i < lastIndexExclusive; i++) {
             if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
@@ -3303,7 +3325,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           this.updateSequenceId(familyMaps[i].values(),
             replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber());
         }
-        addedSize += applyFamilyMapToMemstore(familyMaps[i]);
+        applyFamilyMapToMemstore(familyMaps[i], memstoreSize);
       }
 
       // STEP 6. Complete mvcc.
@@ -3355,11 +3377,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
 
       success = true;
-      return addedSize;
     } finally {
       // Call complete rather than completeAndWait because we probably had error if walKey != null
       if (writeEntry != null) mvcc.complete(writeEntry);
-      this.addAndGetGlobalMemstoreSize(addedSize);
+      this.addAndGetMemstoreSize(memstoreSize);
       if (locked) {
         this.updatesLock.readLock().unlock();
       }
@@ -3778,7 +3799,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // If catalog region, do not impose resource constraints or block updates.
     if (this.getRegionInfo().isMetaRegion()) return;
 
-    if (this.memstoreSize.get() > this.blockingMemStoreSize) {
+    if (this.memstoreDataSize.get() > this.blockingMemStoreSize) {
       blockedRequestsCount.increment();
       requestFlush();
       throw new RegionTooBusyException("Above memstore limit, " +
@@ -3786,7 +3807,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           this.getRegionInfo().getRegionNameAsString()) +
           ", server=" + (this.getRegionServerServices() == null ? "unknown" :
           this.getRegionServerServices().getServerName()) +
-          ", memstoreSize=" + memstoreSize.get() +
+          ", memstoreSize=" + memstoreDataSize.get() +
           ", blockingMemStoreSize=" + blockingMemStoreSize);
     }
   }
@@ -3831,57 +3852,53 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     doBatchMutate(p);
   }
 
-  /**
+  /*
    * Atomically apply the given map of family->edits to the memstore.
    * This handles the consistency control on its own, but the caller
    * should already have locked updatesLock.readLock(). This also does
    * <b>not</b> check the families for validity.
    *
    * @param familyMap Map of Cells by family
-   * @return the additional memory usage of the memstore caused by the new entries.
+   * @param memstoreSize
    */
-  private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap)
-  throws IOException {
-    long size = 0;
+  private void applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
+      MemstoreSize memstoreSize) throws IOException {
     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
       byte[] family = e.getKey();
       List<Cell> cells = e.getValue();
       assert cells instanceof RandomAccess;
-      size += applyToMemstore(getStore(family), cells, false);
+      applyToMemstore(getStore(family), cells, false, memstoreSize);
     }
-    return size;
   }
 
-  /**
+  /*
    * @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be
    *  set; when set we will run operations that make sense in the increment/append scenario but
    *  that do not make sense otherwise.
-   * @return Memstore change in size on insert of these Cells.
    * @see #applyToMemstore(Store, Cell, long)
    */
-  private long applyToMemstore(final Store store, final List<Cell> cells, final boolean delta)
-  throws IOException {
+  private void applyToMemstore(final Store store, final List<Cell> cells, final boolean delta,
+      MemstoreSize memstoreSize) throws IOException {
     // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
     boolean upsert = delta && store.getFamily().getMaxVersions() == 1;
     if (upsert) {
-      return ((HStore) store).upsert(cells, getSmallestReadPoint());
+      ((HStore) store).upsert(cells, getSmallestReadPoint(), memstoreSize);
     } else {
-      return ((HStore) store).add(cells);
+      ((HStore) store).add(cells, memstoreSize);
     }
   }
 
-  /**
-   * @return Memstore change in size on insert of these Cells.
+  /*
    * @see #applyToMemstore(Store, List, boolean, boolean, long)
    */
-  private long applyToMemstore(final Store store, final Cell cell)
+  private void applyToMemstore(final Store store, final Cell cell, MemstoreSize memstoreSize)
   throws IOException {
     // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
     if (store == null) {
       checkFamily(CellUtil.cloneFamily(cell));
       // Unreachable because checkFamily will throw exception
     }
-    return ((HStore) store).add(cell);
+    ((HStore) store).add(cell, memstoreSize);
   }
 
   @Override
@@ -4200,6 +4217,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           }
 
           boolean flush = false;
+          MemstoreSize memstoreSize = new MemstoreSize();
           for (Cell cell: val.getCells()) {
             // Check this edit is for me. Also, guard against writing the special
             // METACOLUMN info such as HBASE::CACHEFLUSH entries
@@ -4241,12 +4259,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             }
             CellUtil.setSequenceId(cell, currentReplaySeqId);
 
-            // Once we are over the limit, restoreEdit will keep returning true to
-            // flush -- but don't flush until we've played all the kvs that make up
-            // the WALEdit.
-            flush |= restoreEdit(store, cell);
+            restoreEdit(store, cell, memstoreSize);
             editsCount++;
           }
+          if (this.rsAccounting != null) {
+            rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(),
+                memstoreSize);
+          }
+          flush = isFlushSize(this.addAndGetMemstoreSize(memstoreSize));
           if (flush) {
             internalFlushcache(null, currentEditSeqId, stores.values(), status, false);
           }
@@ -4555,7 +4575,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             replayFlushInStores(flush, prepareFlushResult, true);
 
             // Set down the memstore size by amount of flush.
-            this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
+            this.decrMemstoreSize(prepareFlushResult.totalFlushableSize);
 
             this.prepareFlushResult = null;
             writestate.flushing = false;
@@ -4588,7 +4608,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             replayFlushInStores(flush, prepareFlushResult, true);
 
             // Set down the memstore size by amount of flush.
-            this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
+            this.decrMemstoreSize(prepareFlushResult.totalFlushableSize);
 
             // Inspect the memstore contents to see whether the memstore contains only edits
             // with seqId smaller than the flush seqId. If so, we can discard those edits.
@@ -4691,8 +4711,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * if the memstore edits have seqNums smaller than the given seq id
    * @throws IOException
    */
-  private long dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
-    long totalFreedSize = 0;
+  private MemstoreSize dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
+    MemstoreSize totalFreedSize = new MemstoreSize();
     this.updatesLock.writeLock().lock();
     try {
 
@@ -4706,10 +4726,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // Prepare flush (take a snapshot) and then abort (drop the snapshot)
         if (store == null) {
           for (Store s : stores.values()) {
-            totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, currentSeqId);
+            totalFreedSize.incMemstoreSize(doDropStoreMemstoreContentsForSeqId(s, currentSeqId));
           }
         } else {
-          totalFreedSize += doDropStoreMemstoreContentsForSeqId(store, currentSeqId);
+          totalFreedSize.incMemstoreSize(doDropStoreMemstoreContentsForSeqId(store, currentSeqId));
         }
       } else {
         LOG.info(getRegionInfo().getEncodedName() + " : "
@@ -4722,13 +4742,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return totalFreedSize;
   }
 
-  private long doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException {
-    long snapshotSize = s.getFlushableSize();
-    this.addAndGetGlobalMemstoreSize(-snapshotSize);
+  private MemstoreSize doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId)
+      throws IOException {
+    MemstoreSize flushableSize = s.getSizeToFlush();
+    this.decrMemstoreSize(flushableSize);
     StoreFlushContext ctx = s.createFlushContext(currentSeqId);
     ctx.prepare();
     ctx.abort();
-    return snapshotSize;
+    return flushableSize;
   }
 
   private void replayWALFlushAbortMarker(FlushDescriptor flush) {
@@ -4841,9 +4862,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
               StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
                   null : this.prepareFlushResult.storeFlushCtxs.get(family);
               if (ctx != null) {
-                long snapshotSize = store.getFlushableSize();
+                MemstoreSize snapshotSize = store.getSizeToFlush();
                 ctx.abort();
-                this.addAndGetGlobalMemstoreSize(-snapshotSize);
+                this.decrMemstoreSize(snapshotSize);
                 this.prepareFlushResult.storeFlushCtxs.remove(family);
               }
             }
@@ -4972,7 +4993,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           if (store == null) {
             continue;
           }
-          if (store.getSnapshotSize() > 0) {
+          if (store.getSizeOfSnapshot().getDataSize() > 0) {
             canDrop = false;
             break;
           }
@@ -5005,7 +5026,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           + "Refreshing store files to see whether we can free up memstore");
     }
 
-    long totalFreedSize = 0;
+    long totalFreedDataSize = 0;
 
     long smallestSeqIdInStores = Long.MAX_VALUE;
 
@@ -5035,11 +5056,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                 StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
                     null : this.prepareFlushResult.storeFlushCtxs.get(store.getFamily().getName());
                 if (ctx != null) {
-                  long snapshotSize = store.getFlushableSize();
+                  MemstoreSize snapshotSize = store.getSizeToFlush();
                   ctx.abort();
-                  this.addAndGetGlobalMemstoreSize(-snapshotSize);
+                  this.decrMemstoreSize(snapshotSize);
                   this.prepareFlushResult.storeFlushCtxs.remove(store.getFamily().getName());
-                  totalFreedSize += snapshotSize;
+                  totalFreedDataSize += snapshotSize.getDataSize();
                 }
               }
             }
@@ -5071,7 +5092,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         if (!force) {
           for (Map.Entry<Store, Long> entry : map.entrySet()) {
             // Drop the memstore contents if they are now smaller than the latest seen flushed file
-            totalFreedSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey());
+            totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey())
+                .getDataSize();
           }
         } else {
           synchronized (storeSeqIds) {
@@ -5085,7 +5107,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       synchronized (this) {
         notifyAll(); // FindBugs NN_NAKED_NOTIFY
       }
-      return totalFreedSize > 0;
+      return totalFreedDataSize > 0;
     } finally {
       closeRegionOperation();
     }
@@ -5124,18 +5146,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       + " does not match this region: " + this.getRegionInfo());
   }
 
-  /**
+  /*
    * Used by tests
    * @param s Store to add edit too.
    * @param cell Cell to add.
-   * @return True if we should flush.
+   * @param memstoreSize
    */
-  protected boolean restoreEdit(final HStore s, final Cell cell) {
-    long kvSize = s.add(cell);
-    if (this.rsAccounting != null) {
-      rsAccounting.addAndGetRegionReplayEditsSize(getRegionInfo().getRegionName(), kvSize);
-    }
-    return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
+  protected void restoreEdit(final HStore s, final Cell cell, MemstoreSize memstoreSize) {
+    s.add(cell, memstoreSize);
   }
 
   /*
@@ -6986,7 +7004,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return null;
     }
     ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
-    stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this
+    stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreDataSize.get() * 100) / this
         .memstoreFlushSize)));
     stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100);
     stats.setCompactionPressure((int)rsServices.getCompactionPressure()*100 > 100 ? 100 :
@@ -7035,12 +7053,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     boolean locked;
     List<RowLock> acquiredRowLocks;
-    long addedSize = 0;
     List<Mutation> mutations = new ArrayList<Mutation>();
     Collection<byte[]> rowsToLock = processor.getRowsToLock();
     // This is assigned by mvcc either explicity in the below or in the guts of the WAL append
     // when it assigns the edit a sequencedid (A.K.A the mvcc write number).
     WriteEntry writeEntry = null;
+    MemstoreSize memstoreSize = new MemstoreSize();
     try {
       // STEP 2. Acquire the row lock(s)
       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
@@ -7084,7 +7102,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                 // If no WAL, need to stamp it here.
                 CellUtil.setSequenceId(cell, sequenceId);
               }
-              addedSize += applyToMemstore(getHStore(cell), cell);
+              applyToMemstore(getHStore(cell), cell, memstoreSize);
             }
           }
           // STEP 8. Complete mvcc.
@@ -7119,7 +7137,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     } finally {
       closeRegionOperation();
       if (!mutations.isEmpty()) {
-        long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
+        long newSize = this.addAndGetMemstoreSize(memstoreSize);
         requestFlushIfNeeded(newSize);
       }
     }
@@ -7203,7 +7221,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   // See HBASE-16304
   @SuppressWarnings("unchecked")
   private void dropMemstoreContents() throws IOException {
-    long totalFreedSize = 0;
+    MemstoreSize totalFreedSize = new MemstoreSize();
     while (!storeSeqIds.isEmpty()) {
       Map<Store, Long> map = null;
       synchronized (storeSeqIds) {
@@ -7212,11 +7230,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
       for (Map.Entry<Store, Long> entry : map.entrySet()) {
         // Drop the memstore contents if they are now smaller than the latest seen flushed file
-        totalFreedSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey());
+        totalFreedSize
+            .incMemstoreSize(dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey()));
       }
     }
-    if (totalFreedSize > 0) {
-      LOG.debug("Freed " + totalFreedSize + " bytes from memstore");
+    if (totalFreedSize.getDataSize() > 0) {
+      LOG.debug("Freed " + totalFreedSize.getDataSize() + " bytes from memstore");
     }
   }
 
@@ -7237,8 +7256,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * on the passed in <code>op</code> to do increment or append specific paths.
    */
   private Result doDelta(Operation op, Mutation mutation, long nonceGroup, long nonce,
-      boolean returnResults)
-  throws IOException {
+      boolean returnResults) throws IOException {
     checkReadOnly();
     checkResources();
     checkRow(mutation.getRow(), op.toString());
@@ -7246,9 +7264,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     this.writeRequestsCount.increment();
     WriteEntry writeEntry = null;
     startRegionOperation(op);
-    long accumulatedResultSize = 0;
     List<Cell> results = returnResults? new ArrayList<Cell>(mutation.size()): null;
     RowLock rowLock = getRowLockInternal(mutation.getRow(), false);
+    MemstoreSize memstoreSize = new MemstoreSize();
     try {
       lock(this.updatesLock.readLock());
       try {
@@ -7273,8 +7291,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber());
         }
         // Now write to MemStore. Do it a column family at a time.
-        for (Map.Entry<Store, List<Cell>> e: forMemStore.entrySet()) {
-          accumulatedResultSize += applyToMemstore(e.getKey(), e.getValue(), true);
+        for (Map.Entry<Store, List<Cell>> e : forMemStore.entrySet()) {
+          applyToMemstore(e.getKey(), e.getValue(), true, memstoreSize);
         }
         mvcc.completeAndWait(writeEntry);
         if (rsServices != null && rsServices.getNonceManager() != null) {
@@ -7299,7 +7317,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       if (writeEntry != null) mvcc.complete(writeEntry);
       rowLock.release();
       // Request a cache flush if over the limit.  Do it outside update lock.
-      if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush();
+      if (isFlushSize(addAndGetMemstoreSize(memstoreSize))) {
+        requestFlush();
+      }
       closeRegionOperation(op);
       if (this.metricsRegion != null) {
         switch (op) {
@@ -8155,7 +8175,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     for (Store s : getStores()) {
       buf.append(s.getFamily().getNameAsString());
       buf.append(" size: ");
-      buf.append(s.getMemStoreSize());
+      buf.append(s.getSizeOfMemStore().getDataSize());
       buf.append(" ");
     }
     buf.append("end-of-stores");

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index d7b0d36..1dcf060 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
@@ -362,12 +361,26 @@ public class HStore implements Store {
   }
 
   @Override
+  @Deprecated
   public long getFlushableSize() {
+    MemstoreSize size = getSizeToFlush();
+    return size.getDataSize() + size.getHeapOverhead();
+  }
+
+  @Override
+  public MemstoreSize getSizeToFlush() {
     return this.memstore.getFlushableSize();
   }
 
   @Override
+  @Deprecated
   public long getSnapshotSize() {
+    MemstoreSize size = getSizeOfSnapshot();
+    return size.getDataSize() + size.getHeapOverhead();
+  }
+
+  @Override
+  public MemstoreSize getSizeOfSnapshot() {
     return this.memstore.getSnapshotSize();
   }
 
@@ -636,12 +649,12 @@ public class HStore implements Store {
   /**
    * Adds a value to the memstore
    * @param cell
-   * @return memstore size delta
+   * @param memstoreSize
    */
-  public long add(final Cell cell) {
+  public void add(final Cell cell, MemstoreSize memstoreSize) {
     lock.readLock().lock();
     try {
-       return this.memstore.add(cell);
+       this.memstore.add(cell, memstoreSize);
     } finally {
       lock.readLock().unlock();
     }
@@ -650,12 +663,12 @@ public class HStore implements Store {
   /**
    * Adds the specified value to the memstore
    * @param cells
-   * @return memstore size delta
+   * @param memstoreSize
    */
-  public long add(final Iterable<Cell> cells) {
+  public void add(final Iterable<Cell> cells, MemstoreSize memstoreSize) {
     lock.readLock().lock();
     try {
-      return memstore.add(cells);
+      memstore.add(cells, memstoreSize);
     } finally {
       lock.readLock().unlock();
     }
@@ -667,21 +680,6 @@ public class HStore implements Store {
   }
 
   /**
-   * Adds a value to the memstore
-   *
-   * @param kv
-   * @return memstore size delta
-   */
-  protected long delete(final KeyValue kv) {
-    lock.readLock().lock();
-    try {
-      return this.memstore.delete(kv);
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  /**
    * @return All store files.
    */
   @Override
@@ -2026,7 +2024,14 @@ public class HStore implements Store {
   }
 
   @Override
+  @Deprecated
   public long getMemStoreSize() {
+    MemstoreSize size = getSizeOfMemStore();
+    return size.getDataSize() + size.getHeapOverhead();
+  }
+
+  @Override
+  public MemstoreSize getSizeOfMemStore() {
     return this.memstore.size();
   }
 
@@ -2069,37 +2074,6 @@ public class HStore implements Store {
   }
 
   /**
-   * Updates the value for the given row/family/qualifier. This function will always be seen as
-   * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
-   * control necessary.
-   * @param row row to update
-   * @param f family to update
-   * @param qualifier qualifier to update
-   * @param newValue the new value to set into memstore
-   * @return memstore size delta
-   * @throws IOException
-   */
-  @VisibleForTesting
-  public long updateColumnValue(byte [] row, byte [] f,
-                                byte [] qualifier, long newValue)
-      throws IOException {
-
-    this.lock.readLock().lock();
-    try {
-      long now = EnvironmentEdgeManager.currentTime();
-
-      return this.memstore.updateColumnValue(row,
-          f,
-          qualifier,
-          newValue,
-          now);
-
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /**
    * Adds or replaces the specified KeyValues.
    * <p>
    * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in
@@ -2109,13 +2083,14 @@ public class HStore implements Store {
    * across all of them.
    * @param cells
    * @param readpoint readpoint below which we can safely remove duplicate KVs
-   * @return memstore size delta
+   * @param memstoreSize
    * @throws IOException
    */
-  public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
+  public void upsert(Iterable<Cell> cells, long readpoint, MemstoreSize memstoreSize)
+      throws IOException {
     this.lock.readLock().lock();
     try {
-      return this.memstore.upsert(cells, readpoint);
+      this.memstore.upsert(cells, readpoint, memstoreSize);
     } finally {
       this.lock.readLock().unlock();
     }
@@ -2149,7 +2124,7 @@ public class HStore implements Store {
       // passing the current sequence number of the wal - to allow bookkeeping in the memstore
       this.snapshot = memstore.snapshot();
       this.cacheFlushCount = snapshot.getCellsCount();
-      this.cacheFlushSize = snapshot.getSize();
+      this.cacheFlushSize = snapshot.getDataSize();
       committedFiles = new ArrayList<Path>(1);
     }
 
@@ -2282,7 +2257,8 @@ public class HStore implements Store {
 
   @Override
   public long heapSize() {
-    return DEEP_OVERHEAD + this.memstore.heapSize();
+    MemstoreSize memstoreSize = this.memstore.size();
+    return DEEP_OVERHEAD + memstoreSize.getDataSize() + memstoreSize.getHeapOverhead();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
index 6226cf2..7646293 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
@@ -298,8 +298,10 @@ public class HeapMemoryManager {
       metricsHeapMemoryManager.updateUnblockedFlushCount(unblockedFlushCnt);
       tunerContext.setCurBlockCacheUsed((float) blockCache.getCurrentSize() / maxHeapSize);
       metricsHeapMemoryManager.setCurBlockCacheSizeGauge(blockCache.getCurrentSize());
-      tunerContext.setCurMemStoreUsed((float)regionServerAccounting.getGlobalMemstoreSize() / maxHeapSize);
-      metricsHeapMemoryManager.setCurMemStoreSizeGauge(regionServerAccounting.getGlobalMemstoreSize());
+      long globalMemstoreHeapSize = regionServerAccounting.getGlobalMemstoreSize()
+          + regionServerAccounting.getGlobalMemstoreHeapOverhead();
+      tunerContext.setCurMemStoreUsed((float) globalMemstoreHeapSize / maxHeapSize);
+      metricsHeapMemoryManager.setCurMemStoreSizeGauge(globalMemstoreHeapSize);
       tunerContext.setCurBlockCacheSize(blockCachePercent);
       tunerContext.setCurMemStoreSize(globalMemStorePercent);
       TunerResult result = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index 8e79ad5..4cdb29d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -21,7 +21,8 @@ package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.client.Scan;
@@ -108,13 +109,14 @@ public class ImmutableSegment extends Segment {
     super(new CellSet(comparator), // initiailize the CellSet with empty CellSet
         comparator, memStoreLAB);
     type = Type.SKIPLIST_MAP_BASED;
+    MemstoreSize memstoreSize = new MemstoreSize();
     while (iterator.hasNext()) {
       Cell c = iterator.next();
       // The scanner is doing all the elimination logic
       // now we just copy it to the new segment
       Cell newKV = maybeCloneWithAllocator(c);
       boolean usedMSLAB = (newKV != c);
-      internalAdd(newKV, usedMSLAB); //
+      internalAdd(newKV, usedMSLAB, memstoreSize);
     }
     this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
   }
@@ -140,19 +142,6 @@ public class ImmutableSegment extends Segment {
     return this.timeRange.getMin();
   }
 
-
-  @Override
-  public long size() {
-    switch (this.type) {
-    case SKIPLIST_MAP_BASED:
-      return keySize() + DEEP_OVERHEAD_CSLM;
-    case ARRAY_MAP_BASED:
-      return keySize() + DEEP_OVERHEAD_CAM;
-    default:
-      throw new RuntimeException("Unknown type " + type);
-    }
-  }
-
   /**------------------------------------------------------------------------
    * Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one
    * based on CellArrayMap.
@@ -164,7 +153,7 @@ public class ImmutableSegment extends Segment {
    * thread of compaction, but to be on the safe side the initial CellSet is locally saved
    * before the flattening and then replaced using CAS instruction.
    */
-  public boolean flatten() {
+  public boolean flatten(MemstoreSize memstoreSize) {
     if (isFlat()) return false;
     CellSet oldCellSet = getCellSet();
     int numOfCells = getCellsCount();
@@ -176,12 +165,13 @@ public class ImmutableSegment extends Segment {
 
     // arrange the meta-data size, decrease all meta-data sizes related to SkipList
     // (recreateCellArrayMapSet doesn't take the care for the sizes)
-    long newSegmentSizeDelta = -(ClassSize.CONCURRENT_SKIPLISTMAP +
-        numOfCells * ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    long newSegmentSizeDelta = -(numOfCells * ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
     // add size of CellArrayMap and meta-data overhead per Cell
-    newSegmentSizeDelta = newSegmentSizeDelta + ClassSize.CELL_ARRAY_MAP +
-        numOfCells * ClassSize.CELL_ARRAY_MAP_ENTRY;
-    incSize(newSegmentSizeDelta);
+    newSegmentSizeDelta = newSegmentSizeDelta + numOfCells * ClassSize.CELL_ARRAY_MAP_ENTRY;
+    incSize(0, newSegmentSizeDelta);
+    if (memstoreSize != null) {
+      memstoreSize.incMemstoreSize(0, newSegmentSizeDelta);
+    }
 
     return true;
   }
@@ -208,7 +198,7 @@ public class ImmutableSegment extends Segment {
       boolean useMSLAB = (getMemStoreLAB()!=null);
       // second parameter true, because in compaction addition of the cell to new segment
       // is always successful
-      updateMetaInfo(c, true, useMSLAB); // updates the size per cell
+      updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell
       i++;
     }
     // build the immutable CellSet
@@ -216,14 +206,18 @@ public class ImmutableSegment extends Segment {
     return new CellSet(cam);
   }
 
-  protected long heapSizeChange(Cell cell, boolean succ) {
+  @Override
+  protected long heapOverheadChange(Cell cell, boolean succ) {
     if (succ) {
       switch (this.type) {
       case SKIPLIST_MAP_BASED:
-        return ClassSize
-            .align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell));
+        return super.heapOverheadChange(cell, succ);
       case ARRAY_MAP_BASED:
-        return ClassSize.align(ClassSize.CELL_ARRAY_MAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell));
+        if (cell instanceof ExtendedCell) {
+          return ClassSize
+              .align(ClassSize.CELL_ARRAY_MAP_ENTRY + ((ExtendedCell) cell).heapOverhead());
+        }
+        return ClassSize.align(ClassSize.CELL_ARRAY_MAP_ENTRY + KeyValue.FIXED_OVERHEAD);
       }
     }
     return 0;


[3/4] hbase git commit: HBASE-16747 Track memstore data size and heap overhead separately.

Posted by an...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index bcaf3a2..b094476 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -23,7 +23,6 @@ import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
-import org.apache.hadoop.hbase.io.HeapSize;
 
 /**
  * The MemStore holds in-memory modifications to the Store. Modifications are {@link Cell}s.
@@ -33,7 +32,7 @@ import org.apache.hadoop.hbase.io.HeapSize;
  * </p>
  */
 @InterfaceAudience.Private
-public interface MemStore extends HeapSize {
+public interface MemStore {
 
   /**
    * Creates a snapshot of the current memstore. Snapshot must be cleared by call to
@@ -58,27 +57,29 @@ public interface MemStore extends HeapSize {
    *
    * @return size of data that is going to be flushed
    */
-  long getFlushableSize();
+  MemstoreSize getFlushableSize();
 
   /**
    * Return the size of the snapshot(s) if any
    * @return size of the memstore snapshot
    */
-  long getSnapshotSize();
+  MemstoreSize getSnapshotSize();
 
   /**
    * Write an update
    * @param cell
-   * @return approximate size of the passed cell.
+   * @param memstoreSize The delta in memstore size will be passed back via this.
+   *        This will include both data size and heap overhead delta.
    */
-  long add(final Cell cell);
+  void add(final Cell cell, MemstoreSize memstoreSize);
 
   /**
    * Write the updates
    * @param cells
-   * @return approximate size of the passed cell.
+   * @param memstoreSize The delta in memstore size will be passed back via this.
+   *        This will include both data size and heap overhead delta.
    */
-  long add(Iterable<Cell> cells);
+  void add(Iterable<Cell> cells, MemstoreSize memstoreSize);
 
   /**
    * @return Oldest timestamp of all the Cells in the MemStore
@@ -86,30 +87,6 @@ public interface MemStore extends HeapSize {
   long timeOfOldestEdit();
 
   /**
-   * Write a delete
-   * @param deleteCell
-   * @return approximate size of the passed key and value.
-   */
-  long delete(final Cell deleteCell);
-
-  /**
-   * Given the specs of a column, update it, first by inserting a new record,
-   * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
-   * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
-   * store will ensure that the insert/delete each are atomic. A scanner/reader will either
-   * get the new value, or the old value and all readers will eventually only see the new
-   * value after the old was removed.
-   *
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param newValue
-   * @param now
-   * @return Timestamp
-   */
-  long updateColumnValue(byte[] row, byte[] family, byte[] qualifier, long newValue, long now);
-
-  /**
    * Update or insert the specified cells.
    * <p>
    * For each Cell, insert into MemStore. This will atomically upsert the value for that
@@ -122,9 +99,10 @@ public interface MemStore extends HeapSize {
    * only see each KeyValue update as atomic.
    * @param cells
    * @param readpoint readpoint below which we can safely remove duplicate Cells.
-   * @return change in memstore size
+   * @param memstoreSize The delta in memstore size will be passed back via this.
+   *        This will include both data size and heap overhead delta.
    */
-  long upsert(Iterable<Cell> cells, long readpoint);
+  void upsert(Iterable<Cell> cells, long readpoint, MemstoreSize memstoreSize);
 
   /**
    * @return scanner over the memstore. This might include scanner over the snapshot when one is
@@ -133,13 +111,12 @@ public interface MemStore extends HeapSize {
   List<KeyValueScanner> getScanners(long readPt) throws IOException;
 
   /**
-   * @return Total memory occupied by this MemStore. This includes active segment size and heap size
-   *         overhead of this memstore but won't include any size occupied by the snapshot. We
-   *         assume the snapshot will get cleared soon. This is not thread safe and the memstore may
-   *         be changed while computing its size. It is the responsibility of the caller to make
-   *         sure this doesn't happen.
+   * @return Total memory occupied by this MemStore. This won't include any size occupied by the
+   *         snapshot. We assume the snapshot will get cleared soon. This is not thread safe and
+   *         the memstore may be changed while computing its size. It is the responsibility of the
+   *         caller to make sure this doesn't happen.
    */
-  long size();
+  MemstoreSize size();
 
   /**
    * This method is called when it is clear that the flush to disk is completed.

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 8f78a3b..2f4d225 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -197,9 +197,12 @@ class MemStoreFlusher implements FlushRequester {
            ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
            (bestRegionReplica.getMemstoreSize()
                > secondaryMultiplier * regionToFlush.getMemstoreSize()))) {
-        LOG.info("Refreshing storefiles of region " + bestRegionReplica +
-          " due to global heap pressure. memstore size=" + StringUtils.humanReadableInt(
-            server.getRegionServerAccounting().getGlobalMemstoreSize()));
+        LOG.info("Refreshing storefiles of region " + bestRegionReplica
+            + " due to global heap pressure. Total memstore size="
+            + StringUtils
+                .humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
+            + " memstore heap overhead=" + StringUtils.humanReadableInt(
+                server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead()));
         flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
         if (!flushedOne) {
           LOG.info("Excluding secondary region " + bestRegionReplica +
@@ -343,16 +346,16 @@ class MemStoreFlusher implements FlushRequester {
    * Return true if global memory usage is above the high watermark
    */
   private boolean isAboveHighWaterMark() {
-    return server.getRegionServerAccounting().
-      getGlobalMemstoreSize() >= globalMemStoreLimit;
+    return server.getRegionServerAccounting().getGlobalMemstoreSize()
+        + server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead() >= globalMemStoreLimit;
   }
 
   /**
    * Return true if we're above the high watermark
    */
   private boolean isAboveLowWaterMark() {
-    return server.getRegionServerAccounting().
-      getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
+    return server.getRegionServerAccounting().getGlobalMemstoreSize() + server
+        .getRegionServerAccounting().getGlobalMemstoreHeapOverhead() >= globalMemStoreLimitLowMark;
   }
 
   @Override
@@ -586,11 +589,13 @@ class MemStoreFlusher implements FlushRequester {
           while (isAboveHighWaterMark() && !server.isStopped()) {
             if (!blocked) {
               startTime = EnvironmentEdgeManager.currentTime();
-              LOG.info("Blocking updates on "
-                  + server.toString()
-                  + ": the global memstore size "
-                  + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
-                      .getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
+              LOG.info("Blocking updates on " + server.toString() + ": the global memstore size "
+                  + TraditionalBinaryPrefix.long2String(
+                      server.getRegionServerAccounting().getGlobalMemstoreSize(), "", 1)
+                  + " + global memstore heap overhead "
+                  + TraditionalBinaryPrefix.long2String(
+                      server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead(), "", 1)
+                  + " is >= than blocking "
                   + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
             }
             blocked = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
index 1bb4511..74d1e17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
@@ -28,7 +28,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 public class MemStoreSnapshot {
   private final long id;
   private final int cellsCount;
-  private final long size;
+  private final long dataSize;
+  private final long heapOverhead;
   private final TimeRangeTracker timeRangeTracker;
   private final KeyValueScanner scanner;
   private final boolean tagsPresent;
@@ -36,7 +37,8 @@ public class MemStoreSnapshot {
   public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
     this.id = id;
     this.cellsCount = snapshot.getCellsCount();
-    this.size = snapshot.keySize();
+    this.dataSize = snapshot.keySize();
+    this.heapOverhead = snapshot.heapOverhead();
     this.timeRangeTracker = snapshot.getTimeRangeTracker();
     this.scanner = snapshot.getKeyValueScanner();
     this.tagsPresent = snapshot.isTagsPresent();
@@ -59,8 +61,12 @@ public class MemStoreSnapshot {
   /**
    * @return Total memory size occupied by this snapshot.
    */
-  public long getSize() {
-    return size;
+  public long getDataSize() {
+    return dataSize;
+  }
+
+  public long getHeapOverhead() {
+    return this.heapOverhead;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
new file mode 100644
index 0000000..77cea51
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Wraps the data size part and heap overhead of the memstore.
+ */
+@InterfaceAudience.Private
+public class MemstoreSize {
+
+  static final MemstoreSize EMPTY_SIZE = new MemstoreSize();
+
+  private long dataSize;
+  private long heapOverhead;
+
+  public MemstoreSize() {
+    dataSize = 0;
+    heapOverhead = 0;
+  }
+
+  public MemstoreSize(long dataSize, long heapOverhead) {
+    this.dataSize = dataSize;
+    this.heapOverhead = heapOverhead;
+  }
+
+  public void incMemstoreSize(long dataSize, long heapOverhead) {
+    this.dataSize += dataSize;
+    this.heapOverhead += heapOverhead;
+  }
+
+  public void incMemstoreSize(MemstoreSize size) {
+    this.dataSize += size.dataSize;
+    this.heapOverhead += size.heapOverhead;
+  }
+
+  public void decMemstoreSize(long dataSize, long heapOverhead) {
+    this.dataSize -= dataSize;
+    this.heapOverhead -= heapOverhead;
+  }
+
+  public void decMemstoreSize(MemstoreSize size) {
+    this.dataSize -= size.dataSize;
+    this.heapOverhead -= size.heapOverhead;
+  }
+
+  public long getDataSize() {
+    return dataSize;
+  }
+
+  public long getHeapOverhead() {
+    return heapOverhead;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof MemstoreSize)) {
+      return false;
+    }
+    MemstoreSize other = (MemstoreSize) obj;
+    return this.dataSize == other.dataSize && this.heapOverhead == other.heapOverhead;
+  }
+
+  @Override
+  public int hashCode() {
+    long h = 13 * this.dataSize;
+    h = h + 14 * this.heapOverhead;
+    return (int) h;
+  }
+
+  @Override
+  public String toString() {
+    return "dataSize=" + this.dataSize + " , heapOverhead=" + this.heapOverhead;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 3ab1dba..882044c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -685,7 +685,7 @@ class MetricsRegionServerWrapperImpl
           tempNumStores += storeList.size();
           for (Store store : storeList) {
             tempNumStoreFiles += store.getStorefilesCount();
-            tempMemstoreSize += store.getMemStoreSize();
+            tempMemstoreSize += store.getSizeOfMemStore().getDataSize();
             tempStoreFileSize += store.getStorefilesSize();
 
             long storeMaxStoreFileAge = store.getMaxStoreFileAge();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
index aed75c2..ecee53f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
@@ -203,7 +203,7 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
       if (region.stores != null) {
         for (Store store : region.stores.values()) {
           tempNumStoreFiles += store.getStorefilesCount();
-          tempMemstoreSize += store.getMemStoreSize();
+          tempMemstoreSize += store.getSizeOfMemStore().getDataSize();
           tempStoreFileSize += store.getStorefilesSize();
 
           long storeMaxStoreFileAge = store.getMaxStoreFileAge();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
index 3fb9723..cdc11a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
@@ -18,8 +18,14 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.Iterator;
+import java.util.SortedSet;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -42,10 +48,60 @@ public class MutableSegment extends Segment {
    * Adds the given cell into the segment
    * @param cell the cell to add
    * @param mslabUsed whether using MSLAB
-   * @return the change in the heap size
+   * @param memstoreSize
    */
-  public long add(Cell cell, boolean mslabUsed) {
-    return internalAdd(cell, mslabUsed);
+  public void add(Cell cell, boolean mslabUsed, MemstoreSize memstoreSize) {
+    internalAdd(cell, mslabUsed, memstoreSize);
+  }
+
+  public void upsert(Cell cell, long readpoint, MemstoreSize memstoreSize) {
+    internalAdd(cell, false, memstoreSize);
+
+    // Get the Cells for the row/family/qualifier regardless of timestamp.
+    // For this case we want to clean up any other puts
+    Cell firstCell = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(),
+        cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+        cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+    SortedSet<Cell> ss = this.tailSet(firstCell);
+    Iterator<Cell> it = ss.iterator();
+    // versions visible to oldest scanner
+    int versionsVisible = 0;
+    while (it.hasNext()) {
+      Cell cur = it.next();
+
+      if (cell == cur) {
+        // ignore the one just put in
+        continue;
+      }
+      // check that this is the row and column we are interested in, otherwise bail
+      if (CellUtil.matchingRows(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
+        // only remove Puts that concurrent scanners cannot possibly see
+        if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && cur.getSequenceId() <= readpoint) {
+          if (versionsVisible >= 1) {
+            // if we get here we have seen at least one version visible to the oldest scanner,
+            // which means we can prove that no scanner will see this version
+
+            // false means there was a change, so give us the size.
+            // TODO when the removed cell ie.'cur' having its data in MSLAB, we can not release that
+            // area. Only the Cell object as such going way. We need to consider cellLen to be
+            // decreased there as 0 only. Just keeping it as existing code now. We need to know the
+            // removed cell is from MSLAB or not. Will do once HBASE-16438 is in
+            int cellLen = getCellLength(cur);
+            long heapOverheadDelta = heapOverheadChange(cur, true);
+            this.incSize(-cellLen, -heapOverheadDelta);
+            if (memstoreSize != null) {
+              memstoreSize.decMemstoreSize(cellLen, heapOverheadDelta);
+            }
+            it.remove();
+          } else {
+            versionsVisible++;
+          }
+        }
+      } else {
+        // past the row or column, done
+        break;
+      }
+    }
   }
 
   /**
@@ -67,9 +123,4 @@ public class MutableSegment extends Segment {
   public long getMinTimestamp() {
     return this.timeRangeTracker.getMin();
   }
-
-  @Override
-  public long size() {
-    return keySize() + DEEP_OVERHEAD;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 3a5acfe..1b106b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -195,7 +195,11 @@ public interface Region extends ConfigurationObserver {
    */
   void updateWriteRequestsCount(long i);
 
-  /** @return memstore size for this region, in bytes */
+  /**
+   * @return memstore size for this region, in bytes. It just accounts data size of cells added to
+   *         the memstores of this Region. Means size in bytes for key, value and tags within Cells.
+   *         It wont consider any java heap overhead for the cell objects or any other.
+   */
   long getMemstoreSize();
 
   /** @return store services for this region, to access services required by store level needs */

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
index 879b573..cb8551f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
@@ -32,43 +32,57 @@ import org.apache.hadoop.hbase.util.Bytes;
 @InterfaceAudience.Private
 public class RegionServerAccounting {
 
-  private final AtomicLong atomicGlobalMemstoreSize = new AtomicLong(0);
-  
+  private final AtomicLong globalMemstoreDataSize = new AtomicLong(0);
+  private final AtomicLong globalMemstoreHeapOverhead = new AtomicLong(0);
+
   // Store the edits size during replaying WAL. Use this to roll back the  
   // global memstore size once a region opening failed.
-  private final ConcurrentMap<byte[], AtomicLong> replayEditsPerRegion = 
-    new ConcurrentSkipListMap<byte[], AtomicLong>(Bytes.BYTES_COMPARATOR);
+  private final ConcurrentMap<byte[], MemstoreSize> replayEditsPerRegion =
+    new ConcurrentSkipListMap<byte[], MemstoreSize>(Bytes.BYTES_COMPARATOR);
 
   /**
    * @return the global Memstore size in the RegionServer
    */
   public long getGlobalMemstoreSize() {
-    return atomicGlobalMemstoreSize.get();
+    return globalMemstoreDataSize.get();
+  }
+
+  public long getGlobalMemstoreHeapOverhead() {
+    return this.globalMemstoreHeapOverhead.get();
   }
-  
+
   /**
    * @param memStoreSize the Memstore size will be added to 
    *        the global Memstore size 
-   * @return the global Memstore size in the RegionServer 
    */
-  public long addAndGetGlobalMemstoreSize(long memStoreSize) {
-    return atomicGlobalMemstoreSize.addAndGet(memStoreSize);
+  public void incGlobalMemstoreSize(MemstoreSize memStoreSize) {
+    globalMemstoreDataSize.addAndGet(memStoreSize.getDataSize());
+    globalMemstoreHeapOverhead.addAndGet(memStoreSize.getHeapOverhead());
   }
-  
+
+  public void decGlobalMemstoreSize(MemstoreSize memStoreSize) {
+    globalMemstoreDataSize.addAndGet(-memStoreSize.getDataSize());
+    globalMemstoreHeapOverhead.addAndGet(-memStoreSize.getHeapOverhead());
+  }
+
   /***
    * Add memStoreSize to replayEditsPerRegion.
    * 
    * @param regionName region name.
    * @param memStoreSize the Memstore size will be added to replayEditsPerRegion.
-   * @return the replay edits size for the region.
    */
-  public long addAndGetRegionReplayEditsSize(byte[] regionName, long memStoreSize) {
-    AtomicLong replayEdistsSize = replayEditsPerRegion.get(regionName);
+  public void addRegionReplayEditsSize(byte[] regionName, MemstoreSize memStoreSize) {
+    MemstoreSize replayEdistsSize = replayEditsPerRegion.get(regionName);
+    // All ops on the same MemstoreSize object is going to be done by single thread, sequentially
+    // only. First calls to this method to increment the per region reply edits size and then call
+    // to either rollbackRegionReplayEditsSize or clearRegionReplayEditsSize as per the result of
+    // the region open operation. No need to handle multi thread issues on one region's entry in
+    // this Map.
     if (replayEdistsSize == null) {
-      replayEdistsSize = new AtomicLong(0);
+      replayEdistsSize = new MemstoreSize();
       replayEditsPerRegion.put(regionName, replayEdistsSize);
     }
-    return replayEdistsSize.addAndGet(memStoreSize);
+    replayEdistsSize.incMemstoreSize(memStoreSize);
   }
 
   /**
@@ -76,16 +90,13 @@ public class RegionServerAccounting {
    * can't be opened.
    * 
    * @param regionName the region which could not open.
-   * @return the global Memstore size in the RegionServer
    */
-  public long rollbackRegionReplayEditsSize(byte[] regionName) {
-    AtomicLong replayEditsSize = replayEditsPerRegion.get(regionName);
-    long editsSizeLong = 0L;
+  public void rollbackRegionReplayEditsSize(byte[] regionName) {
+    MemstoreSize replayEditsSize = replayEditsPerRegion.get(regionName);
     if (replayEditsSize != null) {
-      editsSizeLong = -replayEditsSize.get();
       clearRegionReplayEditsSize(regionName);
+      decGlobalMemstoreSize(replayEditsSize);
     }
-    return addAndGetGlobalMemstoreSize(editsSizeLong);
   }
 
   /**
@@ -96,5 +107,4 @@ public class RegionServerAccounting {
   public void clearRegionReplayEditsSize(byte[] regionName) {
     replayEditsPerRegion.remove(regionName);
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
index e481a63..c7f2ce6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
@@ -25,8 +25,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.util.StealJobQueue;
 import org.apache.hadoop.hbase.wal.WAL;
 
 /**
@@ -37,7 +35,6 @@ import org.apache.hadoop.hbase.wal.WAL;
  * take occasional lock and update size counters at the region level.
  */
 @InterfaceAudience.Private
-@InterfaceStability.Evolving
 public class RegionServicesForStores {
 
   private static final int POOL_SIZE = 10;
@@ -68,8 +65,8 @@ public class RegionServicesForStores {
     region.unblockUpdates();
   }
 
-  public long addAndGetGlobalMemstoreSize(long size) {
-    return region.addAndGetGlobalMemstoreSize(size);
+  public void addMemstoreSize(MemstoreSize size) {
+    region.addAndGetMemstoreSize(size);
   }
 
   public HRegionInfo getRegionInfo() {
@@ -91,7 +88,7 @@ public class RegionServicesForStores {
   }
 
   // methods for tests
-  long getGlobalMemstoreTotalSize() {
+  long getMemstoreSize() {
     return region.getMemstoreSize();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
index 864e256..afdfe6f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ExtendedCell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -55,11 +55,12 @@ public abstract class Segment {
 
   private AtomicReference<CellSet> cellSet= new AtomicReference<CellSet>();
   private final CellComparator comparator;
-  private long minSequenceId;
+  protected long minSequenceId;
   private MemStoreLAB memStoreLAB;
   // Sum of sizes of all Cells added to this Segment. Cell's heapSize is considered. This is not
   // including the heap overhead of this class.
-  protected final AtomicLong size;
+  protected final AtomicLong dataSize;
+  protected final AtomicLong heapOverhead;
   protected final TimeRangeTracker timeRangeTracker;
   protected volatile boolean tagsPresent;
 
@@ -69,7 +70,8 @@ public abstract class Segment {
     this.comparator = comparator;
     this.minSequenceId = Long.MAX_VALUE;
     this.memStoreLAB = memStoreLAB;
-    this.size = new AtomicLong(0);
+    this.dataSize = new AtomicLong(0);
+    this.heapOverhead = new AtomicLong(0);
     this.tagsPresent = false;
     this.timeRangeTracker = new TimeRangeTracker();
   }
@@ -79,7 +81,8 @@ public abstract class Segment {
     this.comparator = segment.getComparator();
     this.minSequenceId = segment.getMinSequenceId();
     this.memStoreLAB = segment.getMemStoreLAB();
-    this.size = new AtomicLong(segment.keySize());
+    this.dataSize = new AtomicLong(segment.keySize());
+    this.heapOverhead = new AtomicLong(segment.heapOverhead.get());
     this.tagsPresent = segment.isTagsPresent();
     this.timeRangeTracker = segment.getTimeRangeTracker();
   }
@@ -154,7 +157,7 @@ public abstract class Segment {
    * Get cell length after serialized in {@link KeyValue}
    */
   @VisibleForTesting
-  int getCellLength(Cell cell) {
+  static int getCellLength(Cell cell) {
     return KeyValueUtil.length(cell);
   }
 
@@ -193,19 +196,26 @@ public abstract class Segment {
    * @return Sum of all cell's size.
    */
   public long keySize() {
-    return this.size.get();
+    return this.dataSize.get();
   }
 
   /**
-   * @return the heap size of the segment
+   * @return The heap overhead of this segment.
    */
-  public abstract long size();
+  public long heapOverhead() {
+    return this.heapOverhead.get();
+  }
 
   /**
    * Updates the heap size counter of the segment by the given delta
    */
-  public void incSize(long delta) {
-    this.size.addAndGet(delta);
+  protected void incSize(long delta, long heapOverhead) {
+    this.dataSize.addAndGet(delta);
+    this.heapOverhead.addAndGet(heapOverhead);
+  }
+
+  protected void incHeapOverheadSize(long delta) {
+    this.heapOverhead.addAndGet(delta);
   }
 
   public long getMinSequenceId() {
@@ -252,36 +262,47 @@ public abstract class Segment {
     return comparator;
   }
 
-  protected long internalAdd(Cell cell, boolean mslabUsed) {
+  protected void internalAdd(Cell cell, boolean mslabUsed, MemstoreSize memstoreSize) {
     boolean succ = getCellSet().add(cell);
-    long s = updateMetaInfo(cell, succ, mslabUsed);
-    return s;
+    updateMetaInfo(cell, succ, mslabUsed, memstoreSize);
   }
 
-  protected long updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed) {
-    long s = heapSizeChange(cellToAdd, succ);
+  protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
+      MemstoreSize memstoreSize) {
+    long cellSize = 0;
     // If there's already a same cell in the CellSet and we are using MSLAB, we must count in the
     // MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger
     // than the counted number)
-    if (!succ && mslabUsed) {
-      s += getCellLength(cellToAdd);
+    if (succ || mslabUsed) {
+      cellSize = getCellLength(cellToAdd);
+    }
+    long overhead = heapOverheadChange(cellToAdd, succ);
+    incSize(cellSize, overhead);
+    if (memstoreSize != null) {
+      memstoreSize.incMemstoreSize(cellSize, overhead);
     }
     getTimeRangeTracker().includeTimestamp(cellToAdd);
-    incSize(s);
     minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId());
     // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
     // When we use ACL CP or Visibility CP which deals with Tags during
     // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
     // parse the byte[] to identify the tags length.
-    if( cellToAdd.getTagsLength() > 0) {
+    if (cellToAdd.getTagsLength() > 0) {
       tagsPresent = true;
     }
-    return s;
   }
 
-  protected long heapSizeChange(Cell cell, boolean succ) {
-    return succ ? ClassSize
-        .align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell)) : 0;
+  protected long heapOverheadChange(Cell cell, boolean succ) {
+    if (succ) {
+      if (cell instanceof ExtendedCell) {
+        return ClassSize
+            .align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ((ExtendedCell) cell).heapOverhead());
+      }
+      // All cells in server side will be of type ExtendedCell. If not just go with estimation on
+      // the heap overhead considering it is KeyValue.
+      return ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD);
+    }
+    return 0;
   }
 
   /**
@@ -314,7 +335,7 @@ public abstract class Segment {
     res += "isEmpty "+(isEmpty()?"yes":"no")+"; ";
     res += "cellCount "+getCellsCount()+"; ";
     res += "cellsSize "+keySize()+"; ";
-    res += "heapSize "+size()+"; ";
+    res += "heapOverhead "+heapOverhead()+"; ";
     res += "Min ts "+getMinTimestamp()+"; ";
     return res;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index ef6e400..30e6a74 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -255,22 +255,45 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
 
   /**
    * @return The size of this store's memstore, in bytes
+   * @deprecated Since 2.0 and will be removed in 3.0. Use {@link #getSizeOfMemStore()} instead.
    */
+  @Deprecated
   long getMemStoreSize();
 
   /**
+   * @return The size of this store's memstore.
+   */
+  MemstoreSize getSizeOfMemStore();
+
+  /**
    * @return The amount of memory we could flush from this memstore; usually this is equal to
    * {@link #getMemStoreSize()} unless we are carrying snapshots and then it will be the size of
    * outstanding snapshots.
+   * @deprecated Since 2.0 and will be removed in 3.0. Use {@link #getSizeToFlush()} instead.
    */
+  @Deprecated
   long getFlushableSize();
 
   /**
+   * @return The amount of memory we could flush from this memstore; usually this is equal to
+   * {@link #getSizeOfMemStore()} unless we are carrying snapshots and then it will be the size of
+   * outstanding snapshots.
+   */
+  MemstoreSize getSizeToFlush();
+
+  /**
    * Returns the memstore snapshot size
    * @return size of the memstore snapshot
+   * @deprecated Since 2.0 and will be removed in 3.0. Use {@link #getSizeOfSnapshot()} instead.
    */
+  @Deprecated
   long getSnapshotSize();
 
+  /**
+   * @return size of the memstore snapshot
+   */
+  MemstoreSize getSizeOfSnapshot();
+
   HColumnDescriptor getFamily();
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
index f4fd603..53488ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.MemstoreSize;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -61,7 +62,7 @@ public class TestClientPushback {
   private static final TableName tableName = TableName.valueOf("client-pushback");
   private static final byte[] family = Bytes.toBytes("f");
   private static final byte[] qualifier = Bytes.toBytes("q");
-  private static final long flushSizeBytes = 1024;
+  private static final long flushSizeBytes = 256;
 
   @BeforeClass
   public static void setupCluster() throws Exception{
@@ -103,7 +104,8 @@ public class TestClientPushback {
     table.put(p);
 
     // get the current load on RS. Hopefully memstore isn't flushed since we wrote the the data
-    int load = (int)((((HRegion)region).addAndGetGlobalMemstoreSize(0) * 100) / flushSizeBytes);
+    int load = (int) ((((HRegion) region).addAndGetMemstoreSize(new MemstoreSize(0, 0)) * 100)
+        / flushSizeBytes);
     LOG.debug("Done writing some data to "+tableName);
 
     // get the stats for the region hosting our table

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 84efb09..7dd9479 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -19,8 +19,6 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -40,6 +38,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
@@ -121,8 +120,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     final KeyValue kv2 = new KeyValue(two, f, q, 10, v);
 
     // use case 1: both kvs in kvset
-    this.memstore.add(kv1.clone());
-    this.memstore.add(kv2.clone());
+    this.memstore.add(kv1.clone(), null);
+    this.memstore.add(kv2.clone(), null);
     verifyScanAcrossSnapshot2(kv1, kv2);
 
     // use case 2: both kvs in snapshot
@@ -132,12 +131,12 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     // use case 3: first in snapshot second in kvset
     this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
         CellComparator.COMPARATOR, store, regionServicesForStores);
-    this.memstore.add(kv1.clone());
+    this.memstore.add(kv1.clone(), null);
     // As compaction is starting in the background the repetition
     // of the k1 might be removed BUT the scanners created earlier
     // should look on the OLD MutableCellSetSegment, so this should be OK...
     this.memstore.snapshot();
-    this.memstore.add(kv2.clone());
+    this.memstore.add(kv2.clone(), null);
     verifyScanAcrossSnapshot2(kv1,kv2);
   }
 
@@ -173,7 +172,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     Thread.sleep(1);
     addRows(this.memstore);
     Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY);
-    assertTrue(KeyValue.COMPARATOR.compareRows(closestToEmpty,
+    assertTrue(CellComparator.COMPARATOR.compareRows(closestToEmpty,
         new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0);
     for (int i = 0; i < ROW_COUNT; i++) {
       Cell nr = ((CompactingMemStore)this.memstore).getNextRow(new KeyValue(Bytes.toBytes(i),
@@ -181,7 +180,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       if (i + 1 == ROW_COUNT) {
         assertEquals(nr, null);
       } else {
-        assertTrue(KeyValue.COMPARATOR.compareRows(nr,
+        assertTrue(CellComparator.COMPARATOR.compareRows(nr,
             new KeyValue(Bytes.toBytes(i + 1), System.currentTimeMillis())) == 0);
       }
     }
@@ -226,9 +225,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     byte[] val = Bytes.toBytes("testval");
 
     //Setting up memstore
-    memstore.add(new KeyValue(row, fam, qf1, val));
-    memstore.add(new KeyValue(row, fam, qf2, val));
-    memstore.add(new KeyValue(row, fam, qf3, val));
+    memstore.add(new KeyValue(row, fam, qf1, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, val), null);
     //Pushing to pipeline
     ((CompactingMemStore)memstore).flushInMemory();
     assertEquals(0, memstore.getSnapshot().getCellsCount());
@@ -237,57 +236,11 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     assertEquals(3, memstore.getSnapshot().getCellsCount());
     //Adding value to "new" memstore
     assertEquals(0, memstore.getActive().getCellsCount());
-    memstore.add(new KeyValue(row, fam, qf4, val));
-    memstore.add(new KeyValue(row, fam, qf5, val));
+    memstore.add(new KeyValue(row, fam, qf4, val), null);
+    memstore.add(new KeyValue(row, fam, qf5, val), null);
     assertEquals(2, memstore.getActive().getCellsCount());
   }
 
-
-  ////////////////////////////////////
-  //Test for upsert with MSLAB
-  ////////////////////////////////////
-
-  /**
-   * Test a pathological pattern that shows why we can't currently
-   * use the MSLAB for upsert workloads. This test inserts data
-   * in the following pattern:
-   *
-   * - row0001 through row1000 (fills up one 2M Chunk)
-   * - row0002 through row1001 (fills up another 2M chunk, leaves one reference
-   *   to the first chunk
-   * - row0003 through row1002 (another chunk, another dangling reference)
-   *
-   * This causes OOME pretty quickly if we use MSLAB for upsert
-   * since each 2M chunk is held onto by a single reference.
-   */
-  @Override
-  @Test
-  public void testUpsertMSLAB() throws Exception {
-
-    int ROW_SIZE = 2048;
-    byte[] qualifier = new byte[ROW_SIZE - 4];
-
-    MemoryMXBean bean = ManagementFactory.getMemoryMXBean();
-    for (int i = 0; i < 3; i++) { System.gc(); }
-    long usageBefore = bean.getHeapMemoryUsage().getUsed();
-
-    long size = 0;
-    long ts=0;
-
-    for (int newValue = 0; newValue < 1000; newValue++) {
-      for (int row = newValue; row < newValue + 1000; row++) {
-        byte[] rowBytes = Bytes.toBytes(row);
-        size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts);
-      }
-    }
-    System.out.println("Wrote " + ts + " vals");
-    for (int i = 0; i < 3; i++) { System.gc(); }
-    long usageAfter = bean.getHeapMemoryUsage().getUsed();
-    System.out.println("Memory used: " + (usageAfter - usageBefore)
-        + " (heapsize: " + memstore.heapSize() +
-        " size: " + size + ")");
-  }
-
   ////////////////////////////////////
   // Test for periodic memstore flushes
   // based on time of oldest edit
@@ -302,7 +255,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
   @Override
   @Test
   public void testUpsertMemstoreSize() throws Exception {
-    long oldSize = memstore.size();
+    MemstoreSize oldSize = memstore.size();
 
     List<Cell> l = new ArrayList<Cell>();
     KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
@@ -316,9 +269,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     l.add(kv2);
     l.add(kv3);
 
-    this.memstore.upsert(l, 2);// readpoint is 2
-    long newSize = this.memstore.size();
-    assert (newSize > oldSize);
+    this.memstore.upsert(l, 2, null);// readpoint is 2
+    MemstoreSize newSize = this.memstore.size();
+    assert (newSize.getDataSize() > oldSize.getDataSize());
     //The kv1 should be removed.
     assert (memstore.getActive().getCellsCount() == 2);
 
@@ -326,7 +279,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     kv4.setSequenceId(1);
     l.clear();
     l.add(kv4);
-    this.memstore.upsert(l, 3);
+    this.memstore.upsert(l, 3, null);
     assertEquals(newSize, this.memstore.size());
     //The kv2 should be removed.
     assert (memstore.getActive().getCellsCount() == 2);
@@ -348,7 +301,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       assertEquals(t, Long.MAX_VALUE);
 
       // test the case that the timeOfOldestEdit is updated after a KV add
-      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null);
       t = memstore.timeOfOldestEdit();
       assertTrue(t == 1234);
       // The method will also assert
@@ -356,7 +309,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       t = runSnapshot(memstore, true);
 
       // test the case that the timeOfOldestEdit is updated after a KV delete
-      memstore.delete(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null);
       t = memstore.timeOfOldestEdit();
       assertTrue(t == 1234);
      t = runSnapshot(memstore, true);
@@ -366,7 +319,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
       kv1.setSequenceId(100);
       l.add(kv1);
-      memstore.upsert(l, 1000);
+      memstore.upsert(l, 1000, null);
       t = memstore.timeOfOldestEdit();
       assertTrue(t == 1234);
     } finally {
@@ -384,7 +337,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     MemStoreSnapshot snapshot = hmc.snapshot();
     if (useForce) {
       // Make some assertions about what just happened.
-      assertTrue("History size has not increased", oldHistorySize < snapshot.getSize());
+      assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize());
       long t = hmc.timeOfOldestEdit();
       assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
       hmc.clearSnapshot(snapshot.getId());
@@ -421,9 +374,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     byte[] val = Bytes.toBytes("testval");
 
     // Setting up memstore
-    memstore.add(new KeyValue(row, fam, qf1, val));
-    memstore.add(new KeyValue(row, fam, qf2, val));
-    memstore.add(new KeyValue(row, fam, qf3, val));
+    memstore.add(new KeyValue(row, fam, qf1, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, val), null);
 
     // Creating a snapshot
     MemStoreSnapshot snapshot = memstore.snapshot();
@@ -431,8 +384,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
     // Adding value to "new" memstore
     assertEquals(0, memstore.getActive().getCellsCount());
-    memstore.add(new KeyValue(row, fam, qf4, val));
-    memstore.add(new KeyValue(row, fam, qf5, val));
+    memstore.add(new KeyValue(row, fam, qf4, val), null);
+    memstore.add(new KeyValue(row, fam, qf5, val), null);
     assertEquals(2, memstore.getActive().getCellsCount());
     memstore.clearSnapshot(snapshot.getId());
 
@@ -456,9 +409,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     byte[] val = Bytes.toBytes("testval");
 
     // Setting up memstore
-    memstore.add(new KeyValue(row, fam, qf1, val));
-    memstore.add(new KeyValue(row, fam, qf2, val));
-    memstore.add(new KeyValue(row, fam, qf3, val));
+    memstore.add(new KeyValue(row, fam, qf1, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, val), null);
 
     // Creating a snapshot
     MemStoreSnapshot snapshot = memstore.snapshot();
@@ -466,8 +419,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
     // Adding value to "new" memstore
     assertEquals(0, memstore.getActive().getCellsCount());
-    memstore.add(new KeyValue(row, fam, qf4, val));
-    memstore.add(new KeyValue(row, fam, qf5, val));
+    memstore.add(new KeyValue(row, fam, qf4, val), null);
+    memstore.add(new KeyValue(row, fam, qf5, val), null);
     assertEquals(2, memstore.getActive().getCellsCount());
 
     // opening scanner before clear the snapshot
@@ -491,8 +444,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
     snapshot = memstore.snapshot();
     // Adding more value
-    memstore.add(new KeyValue(row, fam, qf6, val));
-    memstore.add(new KeyValue(row, fam, qf7, val));
+    memstore.add(new KeyValue(row, fam, qf6, val), null);
+    memstore.add(new KeyValue(row, fam, qf7, val), null);
     // opening scanners
     scanners = memstore.getScanners(0);
     // close scanners before clear the snapshot
@@ -521,9 +474,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     byte[] val = Bytes.toBytes("testval");
 
     // Setting up memstore
-    memstore.add(new KeyValue(row, fam, qf1, 1, val));
-    memstore.add(new KeyValue(row, fam, qf2, 1, val));
-    memstore.add(new KeyValue(row, fam, qf3, 1, val));
+    memstore.add(new KeyValue(row, fam, qf1, 1, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, 1, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, 1, val), null);
 
     // Creating a pipeline
     ((CompactingMemStore)memstore).disableCompaction();
@@ -531,8 +484,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
     // Adding value to "new" memstore
     assertEquals(0, memstore.getActive().getCellsCount());
-    memstore.add(new KeyValue(row, fam, qf1, 2, val));
-    memstore.add(new KeyValue(row, fam, qf2, 2, val));
+    memstore.add(new KeyValue(row, fam, qf1, 2, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, 2, val), null);
     assertEquals(2, memstore.getActive().getCellsCount());
 
     // pipeline bucket 2
@@ -547,9 +500,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
     // Adding value to "new" memstore
     assertEquals(0, memstore.getActive().getCellsCount());
-    memstore.add(new KeyValue(row, fam, qf3, 3, val));
-    memstore.add(new KeyValue(row, fam, qf2, 3, val));
-    memstore.add(new KeyValue(row, fam, qf1, 3, val));
+    memstore.add(new KeyValue(row, fam, qf3, 3, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, 3, val), null);
+    memstore.add(new KeyValue(row, fam, qf1, 3, val), null);
     assertEquals(3, memstore.getActive().getCellsCount());
 
     assertTrue(chunkPool.getPoolSize() == 0);
@@ -570,8 +523,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
     snapshot = memstore.snapshot();
     // Adding more value
-    memstore.add(new KeyValue(row, fam, qf2, 4, val));
-    memstore.add(new KeyValue(row, fam, qf3, 4, val));
+    memstore.add(new KeyValue(row, fam, qf2, 4, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, 4, val), null);
     // opening scanners
     scanners = memstore.getScanners(0);
     // close scanners before clear the snapshot
@@ -597,20 +550,27 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
 
     // test 1 bucket
-    addRowsByKeys(memstore, keys1);
-    assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize());
+    int totalCellsLen = addRowsByKeys(memstore, keys1);
+    long totalHeapOverhead = 4 * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
 
-    long size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
+    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
+    // totalCellsLen
+    totalCellsLen = (totalCellsLen * 3) / 4;
+    totalHeapOverhead = 3 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
     assertEquals(3, s.getCellsCount());
-    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
   }
@@ -624,11 +584,13 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     String[] keys1 = { "A", "A", "B", "C" };
     String[] keys2 = { "A", "B", "D" };
 
-    addRowsByKeys(memstore, keys1);
+    int totalCellsLen1 = addRowsByKeys(memstore, keys1);
+    long totalHeapOverhead = 4 * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
 
-    assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
 
-    long size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
     int counter = 0;
     for ( Segment s : memstore.getSegments()) {
@@ -636,22 +598,32 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     }
     assertEquals(3, counter);
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
-
-    addRowsByKeys(memstore, keys2);
-    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
+    // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting
+    // totalCellsLen
+    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
+    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
+    totalHeapOverhead = 3 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
+
+    int totalCellsLen2 = addRowsByKeys(memstore, keys2);
+    totalHeapOverhead += 3 * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
 
     size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(368, regionServicesForStores.getGlobalMemstoreTotalSize());
+    totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    totalHeapOverhead = 4 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
     assertEquals(4, s.getCellsCount());
-    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
   }
@@ -666,33 +638,47 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     String[] keys2 = { "A", "B", "D" };
     String[] keys3 = { "D", "B", "B" };
 
-    addRowsByKeys(memstore, keys1);
-    assertEquals(496, region.getMemstoreSize());
+    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells.
+    assertEquals(totalCellsLen1, region.getMemstoreSize());
+    long totalHeapOverhead = 4 * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
 
-    long size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
 
-    String tstStr = "\n\nFlushable size after first flush in memory:" + size
-        + ". Is MemmStore in compaction?:" + ((CompactingMemStore)memstore).isMemStoreFlushingInMemory();
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
-
-    addRowsByKeys(memstore, keys2);
-
-    tstStr += " After adding second part of the keys. Memstore size: " +
-        region.getMemstoreSize() + ", Memstore Total Size: " +
-        regionServicesForStores.getGlobalMemstoreTotalSize() + "\n\n";
-
-    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
-
-    ((CompactingMemStore)memstore).disableCompaction();
+    // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting
+    // totalCellsLen
+    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
+    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
+    // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff.
+    totalHeapOverhead = 3 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
+
+    int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells.
+    long totalHeapOverhead2 = 3
+        * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead + totalHeapOverhead2,
+        ((CompactingMemStore) memstore).heapOverhead());
+
+    ((CompactingMemStore) memstore).disableCompaction();
     size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
-
-    addRowsByKeys(memstore, keys3);
-    assertEquals(1016, regionServicesForStores.getGlobalMemstoreTotalSize());
+    // No change in the cells data size. ie. memstore size. as there is no compaction.
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead + totalHeapOverhead2,
+        ((CompactingMemStore) memstore).heapOverhead());
+
+    int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added
+    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
+        regionServicesForStores.getMemstoreSize());
+    long totalHeapOverhead3 = 3
+        * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalHeapOverhead + totalHeapOverhead2 + totalHeapOverhead3,
+        ((CompactingMemStore) memstore).heapOverhead());
 
     ((CompactingMemStore)memstore).enableCompaction();
     size = memstore.getFlushableSize();
@@ -701,34 +687,47 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       Threads.sleep(10);
     }
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(384, regionServicesForStores.getGlobalMemstoreTotalSize());
+    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
+    // Out of total 10, only 4 cells are unique
+    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
+    totalCellsLen3 = 0;// All duplicated cells.
+    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
+        regionServicesForStores.getMemstoreSize());
+    // Only 4 unique cells left
+    assertEquals(4 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY),
+        ((CompactingMemStore) memstore).heapOverhead());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
     assertEquals(4, s.getCellsCount());
-    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
 
     //assertTrue(tstStr, false);
   }
 
-  private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
+  private int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
     byte[] fam = Bytes.toBytes("testfamily");
     byte[] qf = Bytes.toBytes("testqualifier");
     long size = hmc.getActive().keySize();
+    long heapOverhead = hmc.getActive().heapOverhead();
+    int totalLen = 0;
     for (int i = 0; i < keys.length; i++) {
       long timestamp = System.currentTimeMillis();
       Threads.sleep(1); // to make sure each kv gets a different ts
       byte[] row = Bytes.toBytes(keys[i]);
       byte[] val = Bytes.toBytes(keys[i] + i);
       KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
-      hmc.add(kv);
+      totalLen += kv.getLength();
+      hmc.add(kv, null);
       LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
     }
-    regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().keySize() - size);
+    regionServicesForStores.addMemstoreSize(new MemstoreSize(hmc.getActive().keySize() - size,
+        hmc.getActive().heapOverhead() - heapOverhead));
+    return totalLen;
   }
 
   private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
index 6499251..c72cae3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
-
+import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.Threads;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -78,27 +78,34 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
     String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
 
     // test 1 bucket
-    addRowsByKeys(memstore, keys1);
-    assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize());
+    long totalCellsLen = addRowsByKeys(memstore, keys1);
+    long totalHeapOverhead = 4 * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
 
     assertEquals(4, memstore.getActive().getCellsCount());
-    long size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
     while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
       Threads.sleep(10);
     }
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
+    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
+    // totalCellsLen
+    totalCellsLen = (totalCellsLen * 3) / 4;
+    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
+    totalHeapOverhead = 3 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
     for ( Segment s : memstore.getSegments()) {
       counter += s.getCellsCount();
     }
     assertEquals(3, counter);
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
     assertEquals(3, s.getCellsCount());
-    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
   }
@@ -108,13 +115,12 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
     String[] keys1 = { "A", "A", "B", "C" };
     String[] keys2 = { "A", "B", "D" };
 
-    addRowsByKeys(memstore, keys1);
-    assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize());
-    long size = memstore.getFlushableSize();
-
-//    assertTrue(
-//        "\n\n<<< This is the active size with 4 keys - " + memstore.getActive().getSize()
-//            + ". This is the memstore flushable size - " + size + "\n",false);
+    long totalCellsLen1 = addRowsByKeys(memstore, keys1);
+    long totalHeapOverhead1 = 4
+        * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead1, ((CompactingMemStore) memstore).heapOverhead());
+    MemstoreSize size = memstore.getFlushableSize();
 
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
     while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
@@ -126,10 +132,19 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
     }
     assertEquals(3,counter);
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
-
-    addRowsByKeys(memstore, keys2);
-    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
+    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
+    // totalCellsLen
+    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
+    totalHeapOverhead1 = 3 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead1, ((CompactingMemStore) memstore).heapOverhead());
+
+    long totalCellsLen2 = addRowsByKeys(memstore, keys2);
+    long totalHeapOverhead2 = 3
+        * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead1 + totalHeapOverhead2,
+        ((CompactingMemStore) memstore).heapOverhead());
 
     size = memstore.getFlushableSize();
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
@@ -147,14 +162,18 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
       counter += s.getCellsCount();
     }
     assertEquals(4,counter);
-    assertEquals(368, regionServicesForStores.getGlobalMemstoreTotalSize());
+    totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    totalHeapOverhead2 = 1 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalHeapOverhead1 + totalHeapOverhead2,
+        ((CompactingMemStore) memstore).heapOverhead());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
     assertEquals(4, s.getCellsCount());
-    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
   }
@@ -165,36 +184,49 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
     String[] keys2 = { "A", "B", "D" };
     String[] keys3 = { "D", "B", "B" };
 
-    addRowsByKeys(memstore, keys1);
-    assertEquals(496, region.getMemstoreSize());
+    long totalCellsLen1 = addRowsByKeys(memstore, keys1);
+    long totalHeapOverhead1 = 4
+        * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalCellsLen1, region.getMemstoreSize());
+    assertEquals(totalHeapOverhead1, ((CompactingMemStore) memstore).heapOverhead());
 
-    long size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
 
-    String tstStr = "\n\nFlushable size after first flush in memory:" + size + ". Is MemmStore in compaction?:"
-        + ((CompactingMemStore) memstore).isMemStoreFlushingInMemory();
     while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
       Threads.sleep(10);
     }
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
-
-    addRowsByKeys(memstore, keys2);
+    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
+    // totalCellsLen
+    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
+    totalHeapOverhead1 = 3 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead1, ((CompactingMemStore) memstore).heapOverhead());
 
-    tstStr += " After adding second part of the keys. Memstore size: " +
-        region.getMemstoreSize() + ", Memstore Total Size: " +
-        regionServicesForStores.getGlobalMemstoreTotalSize() + "\n\n";
+    long totalCellsLen2 = addRowsByKeys(memstore, keys2);
+    long totalHeapOverhead2 = 3
+        * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
 
-    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead1 + totalHeapOverhead2,
+        ((CompactingMemStore) memstore).heapOverhead());
 
     ((CompactingMemStore) memstore).disableCompaction();
     size = memstore.getFlushableSize();
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
-
-    addRowsByKeys(memstore, keys3);
-    assertEquals(1016, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead1 + totalHeapOverhead2,
+        ((CompactingMemStore) memstore).heapOverhead());
+
+    long totalCellsLen3 = addRowsByKeys(memstore, keys3);
+    long totalHeapOverhead3 = 3
+        * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
+        regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead1 + totalHeapOverhead2 + totalHeapOverhead3,
+        ((CompactingMemStore) memstore).heapOverhead());
 
     ((CompactingMemStore) memstore).enableCompaction();
     size = memstore.getFlushableSize();
@@ -203,14 +235,22 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
       Threads.sleep(10);
     }
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(384, regionServicesForStores.getGlobalMemstoreTotalSize());
+    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
+    // Out of total 10, only 4 cells are unique
+    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
+    totalCellsLen3 = 0;// All duplicated cells.
+    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
+        regionServicesForStores.getMemstoreSize());
+    // Only 4 unique cells left
+    assertEquals(4 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY),
+        ((CompactingMemStore) memstore).heapOverhead());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
     assertEquals(4, s.getCellsCount());
-    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
 
@@ -339,24 +379,25 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
         byte[] qf = Bytes.toBytes("testqualifier"+j);
         byte[] val = Bytes.toBytes(keys[i] + j);
         KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
-        hmc.add(kv);
+        hmc.add(kv, null);
       }
     }
   }
 
-  private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
+  private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
     byte[] fam = Bytes.toBytes("testfamily");
     byte[] qf = Bytes.toBytes("testqualifier");
-    long size = hmc.getActive().size();//
+    MemstoreSize memstoreSize = new MemstoreSize();
     for (int i = 0; i < keys.length; i++) {
       long timestamp = System.currentTimeMillis();
       Threads.sleep(1); // to make sure each kv gets a different ts
       byte[] row = Bytes.toBytes(keys[i]);
       byte[] val = Bytes.toBytes(keys[i] + i);
       KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
-      hmc.add(kv);
+      hmc.add(kv, memstoreSize);
       LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
     }
-    regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().size() - size);//
+    regionServicesForStores.addMemstoreSize(memstoreSize);
+    return memstoreSize.getDataSize();
   }
 }


[2/4] hbase git commit: HBASE-16747 Track memstore data size and heap overhead separately.

Posted by an...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 11f43d5..433388d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -105,10 +105,10 @@ public class TestDefaultMemStore {
   public void testPutSameKey() {
     byte[] bytes = Bytes.toBytes(getName());
     KeyValue kv = new KeyValue(bytes, bytes, bytes, bytes);
-    this.memstore.add(kv);
+    this.memstore.add(kv, null);
     byte[] other = Bytes.toBytes("somethingelse");
     KeyValue samekey = new KeyValue(bytes, bytes, bytes, other);
-    this.memstore.add(samekey);
+    this.memstore.add(samekey, null);
     Cell found = this.memstore.getActive().first();
     assertEquals(1, this.memstore.getActive().getCellsCount());
     assertTrue(Bytes.toString(found.getValueArray()), CellUtil.matchingValue(samekey, found));
@@ -118,23 +118,28 @@ public class TestDefaultMemStore {
   public void testPutSameCell() {
     byte[] bytes = Bytes.toBytes(getName());
     KeyValue kv = new KeyValue(bytes, bytes, bytes, bytes);
-    long sizeChangeForFirstCell = this.memstore.add(kv);
-    long sizeChangeForSecondCell = this.memstore.add(kv);
+    MemstoreSize sizeChangeForFirstCell = new MemstoreSize();
+    this.memstore.add(kv, sizeChangeForFirstCell);
+    MemstoreSize sizeChangeForSecondCell = new MemstoreSize();
+    this.memstore.add(kv, sizeChangeForSecondCell);
     // make sure memstore size increase won't double-count MSLAB chunk size
-    assertEquals(AbstractMemStore.heapSizeChange(kv, true), sizeChangeForFirstCell);
+    assertEquals(Segment.getCellLength(kv), sizeChangeForFirstCell.getDataSize());
+    assertEquals(this.memstore.active.heapOverheadChange(kv, true),
+        sizeChangeForFirstCell.getHeapOverhead());
     Segment segment = this.memstore.getActive();
     MemStoreLAB msLab = segment.getMemStoreLAB();
     if (msLab != null) {
       // make sure memstore size increased even when writing the same cell, if using MSLAB
-      assertEquals(segment.getCellLength(kv), sizeChangeForSecondCell);
+      assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize());
       // make sure chunk size increased even when writing the same cell, if using MSLAB
       if (msLab instanceof HeapMemStoreLAB) {
-        assertEquals(2 * segment.getCellLength(kv),
+        assertEquals(2 * Segment.getCellLength(kv),
           ((HeapMemStoreLAB) msLab).getCurrentChunk().getNextFreeOffset());
       }
     } else {
       // make sure no memstore size change w/o MSLAB
-      assertEquals(0, sizeChangeForSecondCell);
+      assertEquals(0, sizeChangeForSecondCell.getDataSize());
+      assertEquals(0, sizeChangeForSecondCell.getHeapOverhead());
     }
   }
 
@@ -244,8 +249,8 @@ public class TestDefaultMemStore {
     final KeyValue kv2 = new KeyValue(two, f, q, v);
 
     // use case 1: both kvs in kvset
-    this.memstore.add(kv1.clone());
-    this.memstore.add(kv2.clone());
+    this.memstore.add(kv1.clone(), null);
+    this.memstore.add(kv2.clone(), null);
     verifyScanAcrossSnapshot2(kv1, kv2);
 
     // use case 2: both kvs in snapshot
@@ -254,9 +259,9 @@ public class TestDefaultMemStore {
 
     // use case 3: first in snapshot second in kvset
     this.memstore = new DefaultMemStore();
-    this.memstore.add(kv1.clone());
+    this.memstore.add(kv1.clone(), null);
     this.memstore.snapshot();
-    this.memstore.add(kv2.clone());
+    this.memstore.add(kv2.clone(), null);
     verifyScanAcrossSnapshot2(kv1, kv2);
   }
 
@@ -302,7 +307,7 @@ public class TestDefaultMemStore {
 
     KeyValue kv1 = new KeyValue(row, f, q1, v);
     kv1.setSequenceId(w.getWriteNumber());
-    memstore.add(kv1);
+    memstore.add(kv1, null);
 
     KeyValueScanner s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
     assertScannerResults(s, new KeyValue[]{});
@@ -315,7 +320,7 @@ public class TestDefaultMemStore {
     w = mvcc.begin();
     KeyValue kv2 = new KeyValue(row, f, q2, v);
     kv2.setSequenceId(w.getWriteNumber());
-    memstore.add(kv2);
+    memstore.add(kv2, null);
 
     s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
     assertScannerResults(s, new KeyValue[]{kv1});
@@ -347,11 +352,11 @@ public class TestDefaultMemStore {
 
     KeyValue kv11 = new KeyValue(row, f, q1, v1);
     kv11.setSequenceId(w.getWriteNumber());
-    memstore.add(kv11);
+    memstore.add(kv11, null);
 
     KeyValue kv12 = new KeyValue(row, f, q2, v1);
     kv12.setSequenceId(w.getWriteNumber());
-    memstore.add(kv12);
+    memstore.add(kv12, null);
     mvcc.completeAndWait(w);
 
     // BEFORE STARTING INSERT 2, SEE FIRST KVS
@@ -362,11 +367,11 @@ public class TestDefaultMemStore {
     w = mvcc.begin();
     KeyValue kv21 = new KeyValue(row, f, q1, v2);
     kv21.setSequenceId(w.getWriteNumber());
-    memstore.add(kv21);
+    memstore.add(kv21, null);
 
     KeyValue kv22 = new KeyValue(row, f, q2, v2);
     kv22.setSequenceId(w.getWriteNumber());
-    memstore.add(kv22);
+    memstore.add(kv22, null);
 
     // BEFORE COMPLETING INSERT 2, SEE FIRST KVS
     s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
@@ -400,11 +405,11 @@ public class TestDefaultMemStore {
 
     KeyValue kv11 = new KeyValue(row, f, q1, v1);
     kv11.setSequenceId(w.getWriteNumber());
-    memstore.add(kv11);
+    memstore.add(kv11, null);
 
     KeyValue kv12 = new KeyValue(row, f, q2, v1);
     kv12.setSequenceId(w.getWriteNumber());
-    memstore.add(kv12);
+    memstore.add(kv12, null);
     mvcc.completeAndWait(w);
 
     // BEFORE STARTING INSERT 2, SEE FIRST KVS
@@ -416,7 +421,7 @@ public class TestDefaultMemStore {
     KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(),
         KeyValue.Type.DeleteColumn);
     kvDel.setSequenceId(w.getWriteNumber());
-    memstore.add(kvDel);
+    memstore.add(kvDel, null);
 
     // BEFORE COMPLETING DELETE, SEE FIRST KVS
     s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
@@ -478,7 +483,7 @@ public class TestDefaultMemStore {
 
         KeyValue kv = new KeyValue(row, f, q1, i, v);
         kv.setSequenceId(w.getWriteNumber());
-        memstore.add(kv);
+        memstore.add(kv, null);
         mvcc.completeAndWait(w);
 
         // Assert that we can read back
@@ -543,9 +548,9 @@ public class TestDefaultMemStore {
     KeyValue key1 = new KeyValue(row, family, qf, stamps[1], values[1]);
     KeyValue key2 = new KeyValue(row, family, qf, stamps[2], values[2]);
 
-    m.add(key0);
-    m.add(key1);
-    m.add(key2);
+    m.add(key0, null);
+    m.add(key1, null);
+    m.add(key2, null);
 
     assertTrue("Expected memstore to hold 3 values, actually has " +
         m.getActive().getCellsCount(), m.getActive().getCellsCount() == 3);
@@ -619,16 +624,16 @@ public class TestDefaultMemStore {
     byte [] val = Bytes.toBytes("testval");
 
     //Setting up memstore
-    memstore.add(new KeyValue(row, fam, qf1, val));
-    memstore.add(new KeyValue(row, fam, qf2, val));
-    memstore.add(new KeyValue(row, fam, qf3, val));
+    memstore.add(new KeyValue(row, fam, qf1, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, val), null);
     //Creating a snapshot
     memstore.snapshot();
     assertEquals(3, memstore.getSnapshot().getCellsCount());
     //Adding value to "new" memstore
     assertEquals(0, memstore.getActive().getCellsCount());
-    memstore.add(new KeyValue(row, fam ,qf4, val));
-    memstore.add(new KeyValue(row, fam ,qf5, val));
+    memstore.add(new KeyValue(row, fam ,qf4, val), null);
+    memstore.add(new KeyValue(row, fam ,qf5, val), null);
     assertEquals(2, memstore.getActive().getCellsCount());
   }
 
@@ -648,14 +653,14 @@ public class TestDefaultMemStore {
     KeyValue put2 = new KeyValue(row, fam, qf1, ts2, val);
     long ts3 = ts2 + 1;
     KeyValue put3 = new KeyValue(row, fam, qf1, ts3, val);
-    memstore.add(put1);
-    memstore.add(put2);
-    memstore.add(put3);
+    memstore.add(put1, null);
+    memstore.add(put2, null);
+    memstore.add(put3, null);
 
     assertEquals(3, memstore.getActive().getCellsCount());
 
     KeyValue del2 = new KeyValue(row, fam, qf1, ts2, KeyValue.Type.Delete, val);
-    memstore.delete(del2);
+    memstore.add(del2, null);
 
     List<Cell> expected = new ArrayList<Cell>();
     expected.add(put3);
@@ -683,15 +688,15 @@ public class TestDefaultMemStore {
     KeyValue put2 = new KeyValue(row, fam, qf1, ts2, val);
     long ts3 = ts2 + 1;
     KeyValue put3 = new KeyValue(row, fam, qf1, ts3, val);
-    memstore.add(put1);
-    memstore.add(put2);
-    memstore.add(put3);
+    memstore.add(put1, null);
+    memstore.add(put2, null);
+    memstore.add(put3, null);
 
     assertEquals(3, memstore.getActive().getCellsCount());
 
     KeyValue del2 =
       new KeyValue(row, fam, qf1, ts2, KeyValue.Type.DeleteColumn, val);
-    memstore.delete(del2);
+    memstore.add(del2, null);
 
     List<Cell> expected = new ArrayList<Cell>();
     expected.add(put3);
@@ -721,14 +726,14 @@ public class TestDefaultMemStore {
     KeyValue put3 = new KeyValue(row, fam, qf3, ts, val);
     KeyValue put4 = new KeyValue(row, fam, qf3, ts+1, val);
 
-    memstore.add(put1);
-    memstore.add(put2);
-    memstore.add(put3);
-    memstore.add(put4);
+    memstore.add(put1, null);
+    memstore.add(put2, null);
+    memstore.add(put3, null);
+    memstore.add(put4, null);
 
     KeyValue del =
       new KeyValue(row, fam, null, ts, KeyValue.Type.DeleteFamily, val);
-    memstore.delete(del);
+    memstore.add(del, null);
 
     List<Cell> expected = new ArrayList<Cell>();
     expected.add(del);
@@ -751,9 +756,9 @@ public class TestDefaultMemStore {
     byte [] qf = Bytes.toBytes("testqualifier");
     byte [] val = Bytes.toBytes("testval");
     long ts = System.nanoTime();
-    memstore.add(new KeyValue(row, fam, qf, ts, val));
+    memstore.add(new KeyValue(row, fam, qf, ts, val), null);
     KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val);
-    memstore.delete(delete);
+    memstore.add(delete, null);
     assertEquals(2, memstore.getActive().getCellsCount());
     assertEquals(delete, memstore.getActive().first());
   }
@@ -761,12 +766,12 @@ public class TestDefaultMemStore {
   @Test
   public void testRetainsDeleteVersion() throws IOException {
     // add a put to memstore
-    memstore.add(KeyValueTestUtil.create("row1", "fam", "a", 100, "dont-care"));
+    memstore.add(KeyValueTestUtil.create("row1", "fam", "a", 100, "dont-care"), null);
 
     // now process a specific delete:
     KeyValue delete = KeyValueTestUtil.create(
         "row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care");
-    memstore.delete(delete);
+    memstore.add(delete, null);
 
     assertEquals(2, memstore.getActive().getCellsCount());
     assertEquals(delete, memstore.getActive().first());
@@ -775,12 +780,12 @@ public class TestDefaultMemStore {
   @Test
   public void testRetainsDeleteColumn() throws IOException {
     // add a put to memstore
-    memstore.add(KeyValueTestUtil.create("row1", "fam", "a", 100, "dont-care"));
+    memstore.add(KeyValueTestUtil.create("row1", "fam", "a", 100, "dont-care"), null);
 
     // now process a specific delete:
     KeyValue delete = KeyValueTestUtil.create("row1", "fam", "a", 100,
         KeyValue.Type.DeleteColumn, "dont-care");
-    memstore.delete(delete);
+    memstore.add(delete, null);
 
     assertEquals(2, memstore.getActive().getCellsCount());
     assertEquals(delete, memstore.getActive().first());
@@ -789,64 +794,17 @@ public class TestDefaultMemStore {
   @Test
   public void testRetainsDeleteFamily() throws IOException {
     // add a put to memstore
-    memstore.add(KeyValueTestUtil.create("row1", "fam", "a", 100, "dont-care"));
+    memstore.add(KeyValueTestUtil.create("row1", "fam", "a", 100, "dont-care"), null);
 
     // now process a specific delete:
     KeyValue delete = KeyValueTestUtil.create("row1", "fam", "a", 100,
         KeyValue.Type.DeleteFamily, "dont-care");
-    memstore.delete(delete);
+    memstore.add(delete, null);
 
     assertEquals(2, memstore.getActive().getCellsCount());
     assertEquals(delete, memstore.getActive().first());
   }
 
-  ////////////////////////////////////
-  //Test for upsert with MSLAB
-  ////////////////////////////////////
-
-  /**
-   * Test a pathological pattern that shows why we can't currently
-   * use the MSLAB for upsert workloads. This test inserts data
-   * in the following pattern:
-   *
-   * - row0001 through row1000 (fills up one 2M Chunk)
-   * - row0002 through row1001 (fills up another 2M chunk, leaves one reference
-   *   to the first chunk
-   * - row0003 through row1002 (another chunk, another dangling reference)
-   *
-   * This causes OOME pretty quickly if we use MSLAB for upsert
-   * since each 2M chunk is held onto by a single reference.
-   */
-  @Test
-  public void testUpsertMSLAB() throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-    conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true);
-    memstore = new DefaultMemStore(conf, CellComparator.COMPARATOR);
-
-    int ROW_SIZE = 2048;
-    byte[] qualifier = new byte[ROW_SIZE - 4];
-
-    MemoryMXBean bean = ManagementFactory.getMemoryMXBean();
-    for (int i = 0; i < 3; i++) { System.gc(); }
-    long usageBefore = bean.getHeapMemoryUsage().getUsed();
-
-    long size = 0;
-    long ts=0;
-
-    for (int newValue = 0; newValue < 1000; newValue++) {
-      for (int row = newValue; row < newValue + 1000; row++) {
-        byte[] rowBytes = Bytes.toBytes(row);
-        size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts);
-      }
-    }
-    System.out.println("Wrote " + ts + " vals");
-    for (int i = 0; i < 3; i++) { System.gc(); }
-    long usageAfter = bean.getHeapMemoryUsage().getUsed();
-    System.out.println("Memory used: " + (usageAfter - usageBefore)
-        + " (heapsize: " + memstore.heapSize() +
-        " size: " + size + ")");
-  }
-
   //////////////////////////////////////////////////////////////////////////////
   // Helpers
   //////////////////////////////////////////////////////////////////////////////
@@ -864,7 +822,7 @@ public class TestDefaultMemStore {
   public void testUpsertMemstoreSize() throws Exception {
     Configuration conf = HBaseConfiguration.create();
     memstore = new DefaultMemStore(conf, CellComparator.COMPARATOR);
-    long oldSize = memstore.size();
+    MemstoreSize oldSize = memstore.size();
 
     List<Cell> l = new ArrayList<Cell>();
     KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
@@ -874,16 +832,16 @@ public class TestDefaultMemStore {
     kv1.setSequenceId(1); kv2.setSequenceId(1);kv3.setSequenceId(1);
     l.add(kv1); l.add(kv2); l.add(kv3);
 
-    this.memstore.upsert(l, 2);// readpoint is 2
-    long newSize = this.memstore.size();
-    assert(newSize > oldSize);
+    this.memstore.upsert(l, 2, null);// readpoint is 2
+    MemstoreSize newSize = this.memstore.size();
+    assert (newSize.getDataSize() > oldSize.getDataSize());
     //The kv1 should be removed.
     assert(memstore.getActive().getCellsCount() == 2);
 
     KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
     kv4.setSequenceId(1);
     l.clear(); l.add(kv4);
-    this.memstore.upsert(l, 3);
+    this.memstore.upsert(l, 3, null);
     assertEquals(newSize, this.memstore.size());
     //The kv2 should be removed.
     assert(memstore.getActive().getCellsCount() == 2);
@@ -910,7 +868,7 @@ public class TestDefaultMemStore {
       assertEquals(t, Long.MAX_VALUE);
 
       // test the case that the timeOfOldestEdit is updated after a KV add
-      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null);
       t = memstore.timeOfOldestEdit();
       assertTrue(t == 1234);
       // snapshot() will reset timeOfOldestEdit. The method will also assert the
@@ -918,7 +876,7 @@ public class TestDefaultMemStore {
       t = runSnapshot(memstore);
 
       // test the case that the timeOfOldestEdit is updated after a KV delete
-      memstore.delete(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null);
       t = memstore.timeOfOldestEdit();
       assertTrue(t == 1234);
       t = runSnapshot(memstore);
@@ -928,7 +886,7 @@ public class TestDefaultMemStore {
       KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
       kv1.setSequenceId(100);
       l.add(kv1);
-      memstore.upsert(l, 1000);
+      memstore.upsert(l, 1000, null);
       t = memstore.timeOfOldestEdit();
       assertTrue(t == 1234);
     } finally {
@@ -1041,7 +999,7 @@ public class TestDefaultMemStore {
       for (int ii = 0; ii < QUALIFIER_COUNT; ii++) {
         byte [] row = Bytes.toBytes(i);
         byte [] qf = makeQualifier(i, ii);
-        hmc.add(new KeyValue(row, FAMILY, qf, timestamp, qf));
+        hmc.add(new KeyValue(row, FAMILY, qf, timestamp, qf), null);
       }
     }
     return ROW_COUNT;
@@ -1088,7 +1046,7 @@ public class TestDefaultMemStore {
       for (int ii = 0; ii < QUALIFIER_COUNT ; ii++) {
         byte [] row = Bytes.toBytes(i);
         byte [] qf = makeQualifier(i, ii);
-        mem.add(new KeyValue(row, FAMILY, qf, timestamp, qf));
+        mem.add(new KeyValue(row, FAMILY, qf, timestamp, qf), null);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
index dfc97e9..283ae7c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -179,7 +180,7 @@ public class TestHMobStore {
     KeyValue[] keys = new KeyValue[] { key1, key2, key3 };
     int maxKeyCount = keys.length;
     StoreFileWriter mobWriter = store.createWriterInTmp(currentDate, maxKeyCount,
-        hcd.getCompactionCompression(), region.getRegionInfo().getStartKey());
+        hcd.getCompactionCompressionType(), region.getRegionInfo().getStartKey());
     mobFilePath = mobWriter.getPath();
 
     mobWriter.append(key1);
@@ -209,12 +210,12 @@ public class TestHMobStore {
     init(name.getMethodName(), conf, false);
 
     //Put data in memstore
-    this.store.add(new KeyValue(row, family, qf1, 1, value));
-    this.store.add(new KeyValue(row, family, qf2, 1, value));
-    this.store.add(new KeyValue(row, family, qf3, 1, value));
-    this.store.add(new KeyValue(row, family, qf4, 1, value));
-    this.store.add(new KeyValue(row, family, qf5, 1, value));
-    this.store.add(new KeyValue(row, family, qf6, 1, value));
+    this.store.add(new KeyValue(row, family, qf1, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf2, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf3, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf4, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf5, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf6, 1, value), null);
 
     Scan scan = new Scan(get);
     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
@@ -223,7 +224,7 @@ public class TestHMobStore {
 
     List<Cell> results = new ArrayList<Cell>();
     scanner.next(results);
-    Collections.sort(results, KeyValue.COMPARATOR);
+    Collections.sort(results, CellComparator.COMPARATOR);
     scanner.close();
 
     //Compare
@@ -244,20 +245,20 @@ public class TestHMobStore {
     init(name.getMethodName(), conf, false);
 
     //Put data in memstore
-    this.store.add(new KeyValue(row, family, qf1, 1, value));
-    this.store.add(new KeyValue(row, family, qf2, 1, value));
+    this.store.add(new KeyValue(row, family, qf1, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf2, 1, value), null);
     //flush
     flush(1);
 
     //Add more data
-    this.store.add(new KeyValue(row, family, qf3, 1, value));
-    this.store.add(new KeyValue(row, family, qf4, 1, value));
+    this.store.add(new KeyValue(row, family, qf3, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf4, 1, value), null);
     //flush
     flush(2);
 
     //Add more data
-    this.store.add(new KeyValue(row, family, qf5, 1, value));
-    this.store.add(new KeyValue(row, family, qf6, 1, value));
+    this.store.add(new KeyValue(row, family, qf5, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf6, 1, value), null);
     //flush
     flush(3);
 
@@ -268,7 +269,7 @@ public class TestHMobStore {
 
     List<Cell> results = new ArrayList<Cell>();
     scanner.next(results);
-    Collections.sort(results, KeyValue.COMPARATOR);
+    Collections.sort(results, CellComparator.COMPARATOR);
     scanner.close();
 
     //Compare
@@ -288,20 +289,20 @@ public class TestHMobStore {
     init(name.getMethodName(), conf, false);
 
     //Put data in memstore
-    this.store.add(new KeyValue(row, family, qf1, 1, value));
-    this.store.add(new KeyValue(row, family, qf2, 1, value));
+    this.store.add(new KeyValue(row, family, qf1, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf2, 1, value), null);
     //flush
     flush(1);
 
     //Add more data
-    this.store.add(new KeyValue(row, family, qf3, 1, value));
-    this.store.add(new KeyValue(row, family, qf4, 1, value));
+    this.store.add(new KeyValue(row, family, qf3, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf4, 1, value), null);
     //flush
     flush(2);
 
     //Add more data
-    this.store.add(new KeyValue(row, family, qf5, 1, value));
-    this.store.add(new KeyValue(row, family, qf6, 1, value));
+    this.store.add(new KeyValue(row, family, qf5, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf6, 1, value), null);
     //flush
     flush(3);
 
@@ -313,7 +314,7 @@ public class TestHMobStore {
 
     List<Cell> results = new ArrayList<Cell>();
     scanner.next(results);
-    Collections.sort(results, KeyValue.COMPARATOR);
+    Collections.sort(results, CellComparator.COMPARATOR);
     scanner.close();
 
     //Compare
@@ -336,20 +337,20 @@ public class TestHMobStore {
     init(name.getMethodName(), conf, false);
 
     //Put data in memstore
-    this.store.add(new KeyValue(row, family, qf1, 1, value));
-    this.store.add(new KeyValue(row, family, qf2, 1, value));
+    this.store.add(new KeyValue(row, family, qf1, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf2, 1, value), null);
     //flush
     flush(1);
 
     //Add more data
-    this.store.add(new KeyValue(row, family, qf3, 1, value));
-    this.store.add(new KeyValue(row, family, qf4, 1, value));
+    this.store.add(new KeyValue(row, family, qf3, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf4, 1, value), null);
     //flush
     flush(2);
 
     //Add more data
-    this.store.add(new KeyValue(row, family, qf5, 1, value));
-    this.store.add(new KeyValue(row, family, qf6, 1, value));
+    this.store.add(new KeyValue(row, family, qf5, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf6, 1, value), null);
 
     Scan scan = new Scan(get);
     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
@@ -358,7 +359,7 @@ public class TestHMobStore {
 
     List<Cell> results = new ArrayList<Cell>();
     scanner.next(results);
-    Collections.sort(results, KeyValue.COMPARATOR);
+    Collections.sort(results, CellComparator.COMPARATOR);
     scanner.close();
 
     //Compare
@@ -385,20 +386,20 @@ public class TestHMobStore {
     init(name.getMethodName(), conf, hcd, false);
 
     //Put data in memstore
-    this.store.add(new KeyValue(row, family, qf1, 1, value));
-    this.store.add(new KeyValue(row, family, qf2, 1, value));
+    this.store.add(new KeyValue(row, family, qf1, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf2, 1, value), null);
     //flush
     flush(1);
 
     //Add more data
-    this.store.add(new KeyValue(row, family, qf3, 1, value));
-    this.store.add(new KeyValue(row, family, qf4, 1, value));
+    this.store.add(new KeyValue(row, family, qf3, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf4, 1, value), null);
     //flush
     flush(2);
 
     //Add more data
-    this.store.add(new KeyValue(row, family, qf5, 1, value));
-    this.store.add(new KeyValue(row, family, qf6, 1, value));
+    this.store.add(new KeyValue(row, family, qf5, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf6, 1, value), null);
     //flush
     flush(3);
 
@@ -410,7 +411,7 @@ public class TestHMobStore {
 
     List<Cell> results = new ArrayList<Cell>();
     scanner.next(results);
-    Collections.sort(results, KeyValue.COMPARATOR);
+    Collections.sort(results, CellComparator.COMPARATOR);
     scanner.close();
 
     //Compare
@@ -505,14 +506,14 @@ public class TestHMobStore {
 
     init(name.getMethodName(), conf, hcd, false);
 
-    this.store.add(new KeyValue(row, family, qf1, 1, value));
-    this.store.add(new KeyValue(row, family, qf2, 1, value));
-    this.store.add(new KeyValue(row, family, qf3, 1, value));
+    this.store.add(new KeyValue(row, family, qf1, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf2, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf3, 1, value), null);
     flush(1);
 
-    this.store.add(new KeyValue(row, family, qf4, 1, value));
-    this.store.add(new KeyValue(row, family, qf5, 1, value));
-    this.store.add(new KeyValue(row, family, qf6, 1, value));
+    this.store.add(new KeyValue(row, family, qf4, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf5, 1, value), null);
+    this.store.add(new KeyValue(row, family, qf6, 1, value), null);
     flush(2);
 
     Collection<StoreFile> storefiles = this.store.getStorefiles();
@@ -526,7 +527,7 @@ public class TestHMobStore {
 
     List<Cell> results = new ArrayList<Cell>();
     scanner.next(results);
-    Collections.sort(results, KeyValue.COMPARATOR);
+    Collections.sort(results, CellComparator.COMPARATOR);
     scanner.close();
     Assert.assertEquals(expected.size(), results.size());
     for(int i=0; i<results.size(); i++) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 0b8eefa..3e5b127 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -150,7 +150,6 @@ import org.apache.hadoop.hbase.test.MetricsAssertHelper;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -356,7 +355,7 @@ public class TestHRegion {
     } finally {
       assertTrue("The regionserver should have thrown an exception", threwIOE);
     }
-    long sz = store.getFlushableSize();
+    long sz = store.getSizeToFlush().getDataSize();
     assertTrue("flushable size should be zero, but it is " + sz, sz == 0);
     HBaseTestingUtility.closeRegionAndWAL(region);
   }
@@ -400,7 +399,7 @@ public class TestHRegion {
     assertTrue(onePutSize > 0);
     region.flush(true);
     assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize());
-    assertEquals("flushable size should be zero", 0, store.getFlushableSize());
+    assertEquals("flushable size should be zero", 0, store.getSizeToFlush().getDataSize());
 
     // save normalCPHost and replaced by mockedCPHost, which will cancel flush requests
     RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
@@ -411,13 +410,14 @@ public class TestHRegion {
     region.put(put);
     region.flush(true);
     assertEquals("memstoreSize should NOT be zero", onePutSize, region.getMemstoreSize());
-    assertEquals("flushable size should NOT be zero", onePutSize, store.getFlushableSize());
+    assertEquals("flushable size should NOT be zero", onePutSize,
+        store.getSizeToFlush().getDataSize());
 
     // set normalCPHost and flush again, the snapshot will be flushed
     region.setCoprocessorHost(normalCPHost);
     region.flush(true);
     assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize());
-    assertEquals("flushable size should be zero", 0, store.getFlushableSize());
+    assertEquals("flushable size should be zero", 0, store.getSizeToFlush().getDataSize());
     HBaseTestingUtility.closeRegionAndWAL(region);
   }
 
@@ -452,9 +452,10 @@ public class TestHRegion {
       fail("Should have failed with IOException");
     } catch (IOException expected) {
     }
-    long expectedSize = onePutSize * 2 - ClassSize.ARRAY;
+    long expectedSize = onePutSize * 2;
     assertEquals("memstoreSize should be incremented", expectedSize, region.getMemstoreSize());
-    assertEquals("flushable size should be incremented", expectedSize, store.getFlushableSize());
+    assertEquals("flushable size should be incremented", expectedSize,
+        store.getSizeToFlush().getDataSize());
 
     region.setCoprocessorHost(null);
     HBaseTestingUtility.closeRegionAndWAL(region);
@@ -524,14 +525,14 @@ public class TestHRegion {
           p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
           p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
           region.put(p2);
-          long expectedSize = sizeOfOnePut * 3- ClassSize.ARRAY;
+          long expectedSize = sizeOfOnePut * 3;
           Assert.assertEquals(expectedSize, region.getMemstoreSize());
           // Do a successful flush.  It will clear the snapshot only.  Thats how flushes work.
           // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
           // it
           region.flush(true);
           // Make sure our memory accounting is right.
-          Assert.assertEquals(sizeOfOnePut * 2 - ClassSize.ARRAY, region.getMemstoreSize());
+          Assert.assertEquals(sizeOfOnePut * 2, region.getMemstoreSize());
         } finally {
           HBaseTestingUtility.closeRegionAndWAL(region);
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index bcdbff6..fe0d350 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -955,8 +955,8 @@ public class TestHRegionReplayEvents {
       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
     }
     Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
-    long newSnapshotSize = store.getSnapshotSize();
-    assertTrue(newSnapshotSize == 0);
+    MemstoreSize newSnapshotSize = store.getSizeOfSnapshot();
+    assertTrue(newSnapshotSize.getDataSize() == 0);
 
     // assert that the region memstore is empty
     long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
index cd4630a..1d7aab1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
@@ -117,9 +117,9 @@ public class TestMemStoreChunkPool {
     DefaultMemStore memstore = new DefaultMemStore();
 
     // Setting up memstore
-    memstore.add(new KeyValue(row, fam, qf1, val));
-    memstore.add(new KeyValue(row, fam, qf2, val));
-    memstore.add(new KeyValue(row, fam, qf3, val));
+    memstore.add(new KeyValue(row, fam, qf1, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, val), null);
 
     // Creating a snapshot
     MemStoreSnapshot snapshot = memstore.snapshot();
@@ -127,8 +127,8 @@ public class TestMemStoreChunkPool {
 
     // Adding value to "new" memstore
     assertEquals(0, memstore.getActive().getCellsCount());
-    memstore.add(new KeyValue(row, fam, qf4, val));
-    memstore.add(new KeyValue(row, fam, qf5, val));
+    memstore.add(new KeyValue(row, fam, qf4, val), null);
+    memstore.add(new KeyValue(row, fam, qf5, val), null);
     assertEquals(2, memstore.getActive().getCellsCount());
     memstore.clearSnapshot(snapshot.getId());
 
@@ -154,9 +154,9 @@ public class TestMemStoreChunkPool {
     DefaultMemStore memstore = new DefaultMemStore();
 
     // Setting up memstore
-    memstore.add(new KeyValue(row, fam, qf1, val));
-    memstore.add(new KeyValue(row, fam, qf2, val));
-    memstore.add(new KeyValue(row, fam, qf3, val));
+    memstore.add(new KeyValue(row, fam, qf1, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, val), null);
 
     // Creating a snapshot
     MemStoreSnapshot snapshot = memstore.snapshot();
@@ -164,8 +164,8 @@ public class TestMemStoreChunkPool {
 
     // Adding value to "new" memstore
     assertEquals(0, memstore.getActive().getCellsCount());
-    memstore.add(new KeyValue(row, fam, qf4, val));
-    memstore.add(new KeyValue(row, fam, qf5, val));
+    memstore.add(new KeyValue(row, fam, qf4, val), null);
+    memstore.add(new KeyValue(row, fam, qf5, val), null);
     assertEquals(2, memstore.getActive().getCellsCount());
 
     // opening scanner before clear the snapshot
@@ -188,8 +188,8 @@ public class TestMemStoreChunkPool {
     // Creating another snapshot
     snapshot = memstore.snapshot();
     // Adding more value
-    memstore.add(new KeyValue(row, fam, qf6, val));
-    memstore.add(new KeyValue(row, fam, qf7, val));
+    memstore.add(new KeyValue(row, fam, qf6, val), null);
+    memstore.add(new KeyValue(row, fam, qf7, val), null);
     // opening scanners
     scanners = memstore.getScanners(0);
     // close scanners before clear the snapshot

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
index f0f8c39..5157868 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -128,7 +128,7 @@ public class TestPerColumnFamilyFlush {
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
     conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
     conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
-      100 * 1024);
+      40 * 1024);
     // Intialize the region
     Region region = initHRegion("testSelectiveFlushWithDataCompaction", conf);
     // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
@@ -151,9 +151,9 @@ public class TestPerColumnFamilyFlush {
     long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3);
 
     // Find the sizes of the memstores of each CF.
-    long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
-    long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
-    long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+    MemstoreSize cf1MemstoreSize = region.getStore(FAMILY1).getSizeOfMemStore();
+    MemstoreSize cf2MemstoreSize = region.getStore(FAMILY2).getSizeOfMemStore();
+    MemstoreSize cf3MemstoreSize = region.getStore(FAMILY3).getSizeOfMemStore();
 
     // Get the overall smallest LSN in the region's memstores.
     long smallestSeqInRegionCurrentMemstore = getWAL(region)
@@ -166,34 +166,33 @@ public class TestPerColumnFamilyFlush {
     // Some other sanity checks.
     assertTrue(smallestSeqCF1 < smallestSeqCF2);
     assertTrue(smallestSeqCF2 < smallestSeqCF3);
-    assertTrue(cf1MemstoreSize > 0);
-    assertTrue(cf2MemstoreSize > 0);
-    assertTrue(cf3MemstoreSize > 0);
+    assertTrue(cf1MemstoreSize.getDataSize() > 0);
+    assertTrue(cf2MemstoreSize.getDataSize() > 0);
+    assertTrue(cf3MemstoreSize.getDataSize() > 0);
 
     // The total memstore size should be the same as the sum of the sizes of
     // memstores of CF1, CF2 and CF3.
-    assertEquals(
-        totalMemstoreSize + (3 * (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)),
-        cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize);
+    assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
+        + cf3MemstoreSize.getDataSize());
 
     // Flush!
     region.flush(false);
 
     // Will use these to check if anything changed.
-    long oldCF2MemstoreSize = cf2MemstoreSize;
-    long oldCF3MemstoreSize = cf3MemstoreSize;
+    MemstoreSize oldCF2MemstoreSize = cf2MemstoreSize;
+    MemstoreSize oldCF3MemstoreSize = cf3MemstoreSize;
 
     // Recalculate everything
-    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
-    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
-    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+    cf1MemstoreSize = region.getStore(FAMILY1).getSizeOfMemStore();
+    cf2MemstoreSize = region.getStore(FAMILY2).getSizeOfMemStore();
+    cf3MemstoreSize = region.getStore(FAMILY3).getSizeOfMemStore();
     totalMemstoreSize = region.getMemstoreSize();
     smallestSeqInRegionCurrentMemstore = getWAL(region)
         .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
 
     // We should have cleared out only CF1, since we chose the flush thresholds
     // and number of puts accordingly.
-    assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize);
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf1MemstoreSize);
     // Nothing should have happened to CF2, ...
     assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
     // ... or CF3
@@ -202,9 +201,7 @@ public class TestPerColumnFamilyFlush {
     // LSN in the memstore of CF2.
     assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2);
     // Of course, this should hold too.
-    assertEquals(
-        totalMemstoreSize + (2 * (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)),
-        cf2MemstoreSize + cf3MemstoreSize);
+    assertEquals(totalMemstoreSize, cf2MemstoreSize.getDataSize() + cf3MemstoreSize.getDataSize());
 
     // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
     for (int i = 1200; i < 2400; i++) {
@@ -217,26 +214,25 @@ public class TestPerColumnFamilyFlush {
     }
 
     // How much does the CF3 memstore occupy? Will be used later.
-    oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+    oldCF3MemstoreSize = region.getStore(FAMILY3).getSizeOfMemStore();
 
     // Flush again
     region.flush(false);
 
     // Recalculate everything
-    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
-    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
-    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+    cf1MemstoreSize = region.getStore(FAMILY1).getSizeOfMemStore();
+    cf2MemstoreSize = region.getStore(FAMILY2).getSizeOfMemStore();
+    cf3MemstoreSize = region.getStore(FAMILY3).getSizeOfMemStore();
     totalMemstoreSize = region.getMemstoreSize();
     smallestSeqInRegionCurrentMemstore = getWAL(region)
         .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
 
     // CF1 and CF2, both should be absent.
-    assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize);
-    assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize);
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf1MemstoreSize);
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSize);
     // CF3 shouldn't have been touched.
     assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
-    assertEquals(totalMemstoreSize + (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD),
-        cf3MemstoreSize);
+    assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize());
     assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3);
 
     // What happens when we hit the memstore limit, but we are not able to find
@@ -288,35 +284,34 @@ public class TestPerColumnFamilyFlush {
     long totalMemstoreSize = region.getMemstoreSize();
 
     // Find the sizes of the memstores of each CF.
-    long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
-    long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
-    long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+    MemstoreSize cf1MemstoreSize = region.getStore(FAMILY1).getSizeOfMemStore();
+    MemstoreSize cf2MemstoreSize = region.getStore(FAMILY2).getSizeOfMemStore();
+    MemstoreSize cf3MemstoreSize = region.getStore(FAMILY3).getSizeOfMemStore();
 
     // Some other sanity checks.
-    assertTrue(cf1MemstoreSize > 0);
-    assertTrue(cf2MemstoreSize > 0);
-    assertTrue(cf3MemstoreSize > 0);
+    assertTrue(cf1MemstoreSize.getDataSize() > 0);
+    assertTrue(cf2MemstoreSize.getDataSize() > 0);
+    assertTrue(cf3MemstoreSize.getDataSize() > 0);
 
     // The total memstore size should be the same as the sum of the sizes of
     // memstores of CF1, CF2 and CF3.
-    assertEquals(
-        totalMemstoreSize + (3 * (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)),
-        cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize);
+    assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
+        + cf3MemstoreSize.getDataSize());
 
     // Flush!
     region.flush(false);
 
-    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
-    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
-    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+    cf1MemstoreSize = region.getStore(FAMILY1).getSizeOfMemStore();
+    cf2MemstoreSize = region.getStore(FAMILY2).getSizeOfMemStore();
+    cf3MemstoreSize = region.getStore(FAMILY3).getSizeOfMemStore();
     totalMemstoreSize = region.getMemstoreSize();
     long smallestSeqInRegionCurrentMemstore =
         region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
 
     // Everything should have been cleared
-    assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize);
-    assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize);
-    assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize);
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf1MemstoreSize);
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSize);
+    assertEquals(MemstoreSize.EMPTY_SIZE, cf3MemstoreSize);
     assertEquals(0, totalMemstoreSize);
     assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
     HBaseTestingUtility.closeRegionAndWAL(region);
@@ -337,10 +332,10 @@ public class TestPerColumnFamilyFlush {
 
   private void doTestLogReplay() throws Exception {
     Configuration conf = TEST_UTIL.getConfiguration();
-    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000);
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 10000);
     // Carefully chosen limits so that the memstore just flushes when we're done
     conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
-    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 10000);
+    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 2500);
     final int numRegionServers = 4;
     try {
       TEST_UTIL.startMiniCluster(numRegionServers);
@@ -378,18 +373,16 @@ public class TestPerColumnFamilyFlush {
       totalMemstoreSize = desiredRegion.getMemstoreSize();
 
       // Find the sizes of the memstores of each CF.
-      cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize();
-      cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize();
-      cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize();
+      cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getSizeOfMemStore().getDataSize();
+      cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getSizeOfMemStore().getDataSize();
+      cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getSizeOfMemStore().getDataSize();
 
       // CF1 Should have been flushed
-      assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize);
+      assertEquals(0, cf1MemstoreSize);
       // CF2 and CF3 shouldn't have been flushed.
       assertTrue(cf2MemstoreSize > 0);
       assertTrue(cf3MemstoreSize > 0);
-      assertEquals(
-          totalMemstoreSize + (2 * (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)),
-          cf2MemstoreSize + cf3MemstoreSize);
+      assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize);
 
       // Wait for the RS report to go across to the master, so that the master
       // is aware of which sequence ids have been flushed, before we kill the RS.
@@ -526,12 +519,9 @@ public class TestPerColumnFamilyFlush {
       });
       LOG.info("Finished waiting on flush after too many WALs...");
       // Individual families should have been flushed.
-      assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
-          desiredRegion.getStore(FAMILY1).getMemStoreSize());
-      assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
-          desiredRegion.getStore(FAMILY2).getMemStoreSize());
-      assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
-          desiredRegion.getStore(FAMILY3).getMemStoreSize());
+      assertEquals(0, desiredRegion.getStore(FAMILY1).getMemStoreSize());
+      assertEquals(0, desiredRegion.getStore(FAMILY2).getMemStoreSize());
+      assertEquals(0, desiredRegion.getStore(FAMILY3).getMemStoreSize());
       // let WAL cleanOldLogs
       assertNull(getWAL(desiredRegion).rollWriter(true));
       assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java
index 21d04d3..5bd2ff1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java
@@ -24,13 +24,10 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.*;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -42,7 +39,6 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -177,6 +173,7 @@ public class TestRegionMergeTransaction {
     HStore storeMock = Mockito.mock(HStore.class);
     when(storeMock.hasReferences()).thenReturn(true);
     when(storeMock.getFamily()).thenReturn(new HColumnDescriptor("cf"));
+    when(storeMock.getSizeToFlush()).thenReturn(new MemstoreSize());
     when(storeMock.close()).thenReturn(ImmutableList.<StoreFile>of());
     this.region_a.stores.put(Bytes.toBytes(""), storeMock);
     RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(this.region_a,

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
index 0ec859c..00e8f0a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
@@ -638,7 +638,7 @@ public class TestReversibleScanners {
       for (int i = 0; i < ROWSIZE; i++) {
         for (int j = 0; j < QUALSIZE; j++) {
           if (i % 2 == 0) {
-            memstore.add(makeKV(i, j));
+            memstore.add(makeKV(i, j), null);
           } else {
             writers[(i + j) % writers.length].append(makeKV(i, j));
           }
@@ -669,7 +669,7 @@ public class TestReversibleScanners {
     for (int i = 0; i < ROWSIZE; i++) {
       for (int j = 0; j < QUALSIZE; j++) {
         if ((i + j) % 2 == 0) {
-          memstore.add(makeKV(i, j));
+          memstore.add(makeKV(i, j), null);
         }
       }
     }
@@ -678,7 +678,7 @@ public class TestReversibleScanners {
     for (int i = 0; i < ROWSIZE; i++) {
       for (int j = 0; j < QUALSIZE; j++) {
         if ((i + j) % 2 == 1) {
-          memstore.add(makeKV(i, j));
+          memstore.add(makeKV(i, j), null);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
index 80be45a..1c31d21 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
@@ -28,13 +28,10 @@ import static org.mockito.Mockito.*;
 import org.mockito.Mockito;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -42,7 +39,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -176,6 +172,7 @@ public class TestSplitTransaction {
     HStore storeMock = Mockito.mock(HStore.class);
     when(storeMock.hasReferences()).thenReturn(true);
     when(storeMock.getFamily()).thenReturn(new HColumnDescriptor("cf"));
+    when(storeMock.getSizeToFlush()).thenReturn(new MemstoreSize());
     when(storeMock.close()).thenReturn(ImmutableList.<StoreFile>of());
     this.parent.stores.put(Bytes.toBytes(""), storeMock);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index e3e62fc..3f05381 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -53,14 +53,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.io.compress.Compression;
@@ -81,7 +79,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
-import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.util.Progressable;
@@ -220,10 +217,11 @@ public class TestStore {
         // Initialize region
         init(name.getMethodName(), conf);
 
-        long size = store.memstore.getFlushableSize();
-        Assert.assertEquals(0, size);
+        MemstoreSize size = store.memstore.getFlushableSize();
+        Assert.assertEquals(0, size.getDataSize());
         LOG.info("Adding some data");
-        long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
+        MemstoreSize kvSize = new MemstoreSize();
+        store.add(new KeyValue(row, family, qf1, 1, (byte[])null), kvSize);
         size = store.memstore.getFlushableSize();
         Assert.assertEquals(kvSize, size);
         // Flush.  Bug #1 from HBASE-10466.  Make sure size calculation on failed flush is right.
@@ -236,7 +234,8 @@ public class TestStore {
         }
         size = store.memstore.getFlushableSize();
         Assert.assertEquals(kvSize, size);
-        store.add(new KeyValue(row, family, qf2, 2, (byte[])null));
+        MemstoreSize kvSize2 = new MemstoreSize();
+        store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2);
         // Even though we add a new kv, we expect the flushable size to be 'same' since we have
         // not yet cleared the snapshot -- the above flush failed.
         Assert.assertEquals(kvSize, size);
@@ -244,10 +243,10 @@ public class TestStore {
         flushStore(store, id++);
         size = store.memstore.getFlushableSize();
         // Size should be the foreground kv size.
-        Assert.assertEquals(kvSize, size);
+        Assert.assertEquals(kvSize2, size);
         flushStore(store, id++);
         size = store.memstore.getFlushableSize();
-        Assert.assertEquals(0, size);
+        Assert.assertEquals(MemstoreSize.EMPTY_SIZE, size);
         return null;
       }
     });
@@ -317,9 +316,9 @@ public class TestStore {
     for (int i = 1; i <= storeFileNum; i++) {
       LOG.info("Adding some data for the store file #" + i);
       timeStamp = EnvironmentEdgeManager.currentTime();
-      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null));
-      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null));
-      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null));
+      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
+      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
+      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
       flush(i);
       edge.incrementTime(sleepTime);
     }
@@ -371,9 +370,9 @@ public class TestStore {
     int storeFileNum = 4;
     for (int i = 1; i <= storeFileNum; i++) {
       LOG.info("Adding some data for the store file #"+i);
-      this.store.add(new KeyValue(row, family, qf1, i, (byte[])null));
-      this.store.add(new KeyValue(row, family, qf2, i, (byte[])null));
-      this.store.add(new KeyValue(row, family, qf3, i, (byte[])null));
+      this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null);
+      this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null);
+      this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null);
       flush(i);
     }
     // after flush; check the lowest time stamp
@@ -424,8 +423,8 @@ public class TestStore {
   public void testEmptyStoreFile() throws IOException {
     init(this.name.getMethodName());
     // Write a store file.
-    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
-    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
+    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
+    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
     flush(1);
     // Now put in place an empty store file.  Its a little tricky.  Have to
     // do manually with hacked in sequence id.
@@ -462,12 +461,12 @@ public class TestStore {
     init(this.name.getMethodName());
 
     //Put data in memstore
-    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
-    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
-    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
-    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
-    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
-    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
+    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
+    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
+    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
+    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
+    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
+    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
 
     //Get
     result = HBaseTestingUtility.getFromStoreFile(store,
@@ -486,20 +485,20 @@ public class TestStore {
     init(this.name.getMethodName());
 
     //Put data in memstore
-    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
-    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
+    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
+    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
     //flush
     flush(1);
 
     //Add more data
-    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
-    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
+    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
+    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
     //flush
     flush(2);
 
     //Add more data
-    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
-    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
+    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
+    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
     //flush
     flush(3);
 
@@ -525,20 +524,20 @@ public class TestStore {
     init(this.name.getMethodName());
 
     //Put data in memstore
-    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
-    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
+    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
+    this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
     //flush
     flush(1);
 
     //Add more data
-    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
-    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
+    this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
+    this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null);
     //flush
     flush(2);
 
     //Add more data
-    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
-    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
+    this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null);
+    this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null);
 
     //Get
     result = HBaseTestingUtility.getFromStoreFile(store,
@@ -565,187 +564,12 @@ public class TestStore {
     }
   }
 
-  //////////////////////////////////////////////////////////////////////////////
-  // IncrementColumnValue tests
-  //////////////////////////////////////////////////////////////////////////////
-  /*
-   * test the internal details of how ICV works, especially during a flush scenario.
-   */
-  @Test
-  public void testIncrementColumnValue_ICVDuringFlush()
-      throws IOException, InterruptedException {
-    init(this.name.getMethodName());
-
-    long oldValue = 1L;
-    long newValue = 3L;
-    this.store.add(new KeyValue(row, family, qf1,
-        System.currentTimeMillis(),
-        Bytes.toBytes(oldValue)));
-
-    // snapshot the store.
-    this.store.snapshot();
-
-    // add other things:
-    this.store.add(new KeyValue(row, family, qf2,
-        System.currentTimeMillis(),
-        Bytes.toBytes(oldValue)));
-
-    // update during the snapshot.
-    long ret = this.store.updateColumnValue(row, family, qf1, newValue);
-
-    // memstore should have grown by some amount.
-    Assert.assertTrue(ret > 0);
-
-    // then flush.
-    flushStore(store, id++);
-    Assert.assertEquals(1, this.store.getStorefiles().size());
-    // from the one we inserted up there, and a new one
-    Assert.assertEquals(2, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
-
-    // how many key/values for this row are there?
-    Get get = new Get(row);
-    get.addColumn(family, qf1);
-    get.setMaxVersions(); // all versions.
-    List<Cell> results = new ArrayList<Cell>();
-
-    results = HBaseTestingUtility.getFromStoreFile(store, get);
-    Assert.assertEquals(2, results.size());
-
-    long ts1 = results.get(0).getTimestamp();
-    long ts2 = results.get(1).getTimestamp();
-
-    Assert.assertTrue(ts1 > ts2);
-
-    Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
-    Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
-  }
-
   @After
   public void tearDown() throws Exception {
     EnvironmentEdgeManagerTestHelper.reset();
   }
 
   @Test
-  public void testICV_negMemstoreSize()  throws IOException {
-      init(this.name.getMethodName());
-
-    long time = 100;
-    ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
-    ee.setValue(time);
-    EnvironmentEdgeManagerTestHelper.injectEdge(ee);
-    long newValue = 3L;
-    long size = 0;
-
-
-    size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1,
-        System.currentTimeMillis(), Bytes.toBytes(newValue)));
-    size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1,
-        System.currentTimeMillis(), Bytes.toBytes(newValue)));
-    size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1,
-        System.currentTimeMillis(), Bytes.toBytes(newValue)));
-    size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1,
-        System.currentTimeMillis(), Bytes.toBytes(newValue)));
-    size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1,
-        System.currentTimeMillis(), Bytes.toBytes(newValue)));
-
-
-    for ( int i = 0 ; i < 10000 ; ++i) {
-      newValue++;
-
-      long ret = this.store.updateColumnValue(row, family, qf1, newValue);
-      long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue);
-
-      if (ret != 0) System.out.println("ret: " + ret);
-      if (ret2 != 0) System.out.println("ret2: " + ret2);
-
-      Assert.assertTrue("ret: " + ret, ret >= 0);
-      size += ret;
-      Assert.assertTrue("ret2: " + ret2, ret2 >= 0);
-      size += ret2;
-
-
-      if (i % 1000 == 0)
-        ee.setValue(++time);
-    }
-
-    long computedSize=0;
-    for (Cell cell : ((AbstractMemStore)this.store.memstore).getActive().getCellSet()) {
-      long kvsize = DefaultMemStore.heapSizeChange(cell, true);
-      //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize());
-      computedSize += kvsize;
-    }
-    Assert.assertEquals(computedSize, size);
-  }
-
-  @Test
-  public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception {
-    ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
-    EnvironmentEdgeManagerTestHelper.injectEdge(mee);
-    init(this.name.getMethodName());
-
-    long oldValue = 1L;
-    long newValue = 3L;
-    this.store.add(new KeyValue(row, family, qf1,
-        EnvironmentEdgeManager.currentTime(),
-        Bytes.toBytes(oldValue)));
-
-    // snapshot the store.
-    this.store.snapshot();
-
-    // update during the snapshot, the exact same TS as the Put (lololol)
-    long ret = this.store.updateColumnValue(row, family, qf1, newValue);
-
-    // memstore should have grown by some amount.
-    Assert.assertTrue(ret > 0);
-
-    // then flush.
-    flushStore(store, id++);
-    Assert.assertEquals(1, this.store.getStorefiles().size());
-    Assert.assertEquals(1, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
-
-    // now increment again:
-    newValue += 1;
-    this.store.updateColumnValue(row, family, qf1, newValue);
-
-    // at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again:
-    newValue += 1;
-    this.store.updateColumnValue(row, family, qf1, newValue);
-
-    // the second TS should be TS=2 or higher., even though 'time=1' right now.
-
-
-    // how many key/values for this row are there?
-    Get get = new Get(row);
-    get.addColumn(family, qf1);
-    get.setMaxVersions(); // all versions.
-    List<Cell> results = new ArrayList<Cell>();
-
-    results = HBaseTestingUtility.getFromStoreFile(store, get);
-    Assert.assertEquals(2, results.size());
-
-    long ts1 = results.get(0).getTimestamp();
-    long ts2 = results.get(1).getTimestamp();
-
-    Assert.assertTrue(ts1 > ts2);
-    Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
-    Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
-
-    mee.setValue(2); // time goes up slightly
-    newValue += 1;
-    this.store.updateColumnValue(row, family, qf1, newValue);
-
-    results = HBaseTestingUtility.getFromStoreFile(store, get);
-    Assert.assertEquals(2, results.size());
-
-    ts1 = results.get(0).getTimestamp();
-    ts2 = results.get(1).getTimestamp();
-
-    Assert.assertTrue(ts1 > ts2);
-    Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
-    Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
-  }
-
-  @Test
   public void testHandleErrorsInFlush() throws Exception {
     LOG.info("Setting up a faulty file system that cannot write");
 
@@ -766,9 +590,9 @@ public class TestStore {
         init(name.getMethodName(), conf);
 
         LOG.info("Adding some data");
-        store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
-        store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
-        store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
+        store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
+        store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null);
+        store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null);
 
         LOG.info("Before flush, we should have no files");
 
@@ -899,7 +723,7 @@ public class TestStore {
 
     List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
     for (Cell kv : kvList1) {
-      this.store.add(KeyValueUtil.ensureKeyValue(kv));
+      this.store.add(kv, null);
     }
 
     this.store.snapshot();
@@ -907,7 +731,7 @@ public class TestStore {
 
     List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
     for(Cell kv : kvList2) {
-      this.store.add(KeyValueUtil.ensureKeyValue(kv));
+      this.store.add(kv, null);
     }
 
     List<Cell> result;
@@ -1049,7 +873,7 @@ public class TestStore {
     assertEquals(0, this.store.getStorefilesCount());
 
     // add some data, flush
-    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
+    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
     flush(1);
     assertEquals(1, this.store.getStorefilesCount());
 
@@ -1097,7 +921,7 @@ public class TestStore {
     assertEquals(0, this.store.getStorefilesCount());
 
     // add some data, flush
-    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
+    this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
     flush(1);
     // add one more file
     addStoreFile();