You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by es...@apache.org on 2018/02/18 07:59:42 UTC

[1/2] hbase git commit: HBASE-18294 Reduce global heap pressure: flush based on heap occupancy

Repository: hbase
Updated Branches:
  refs/heads/master f3ff55a2b -> f3bb9b961


http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
index 7689fcd..1c627f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
@@ -31,17 +31,17 @@ import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * RegionServerAccounting keeps record of some basic real time information about
- * the Region Server. Currently, it keeps record the global memstore size and global memstore heap
- * overhead. It also tracks the replay edits per region.
+ * the Region Server. Currently, it keeps record the global memstore size and global memstore
+ * on-heap and off-heap overhead. It also tracks the replay edits per region.
  */
 @InterfaceAudience.Private
 public class RegionServerAccounting {
   // memstore data size
-  private final LongAdder globalMemstoreDataSize = new LongAdder();
-  // memstore heap size. When off heap MSLAB in place, this will be only heap overhead of the Cell
-  // POJOs and entry overhead of them onto memstore. When on heap MSLAB, this will be include heap
-  // overhead as well as the cell data size. Ya cell data is in on heap area only then.
-  private final LongAdder globalMemstoreHeapSize = new LongAdder();
+  private final LongAdder globalMemStoreDataSize = new LongAdder();
+  // memstore heap size.
+  private final LongAdder globalMemStoreHeapSize = new LongAdder();
+  // memstore off-heap size.
+  private final LongAdder globalMemStoreOffHeapSize = new LongAdder();
 
   // Store the edits size during replaying WAL. Use this to roll back the
   // global memstore size once a region opening failed.
@@ -114,14 +114,21 @@ public class RegionServerAccounting {
    * @return the global Memstore data size in the RegionServer
    */
   public long getGlobalMemStoreDataSize() {
-    return globalMemstoreDataSize.sum();
+    return globalMemStoreDataSize.sum();
   }
 
   /**
    * @return the global memstore heap size in the RegionServer
    */
   public long getGlobalMemStoreHeapSize() {
-    return this.globalMemstoreHeapSize.sum();
+    return this.globalMemStoreHeapSize.sum();
+  }
+
+  /**
+   * @return the global memstore heap size in the RegionServer
+   */
+  public long getGlobalMemStoreOffHeapSize() {
+    return this.globalMemStoreOffHeapSize.sum();
   }
 
   /**
@@ -129,13 +136,15 @@ public class RegionServerAccounting {
    *        the global Memstore size
    */
   public void incGlobalMemStoreSize(MemStoreSize memStoreSize) {
-    globalMemstoreDataSize.add(memStoreSize.getDataSize());
-    globalMemstoreHeapSize.add(memStoreSize.getHeapSize());
+    globalMemStoreDataSize.add(memStoreSize.getDataSize());
+    globalMemStoreHeapSize.add(memStoreSize.getHeapSize());
+    globalMemStoreOffHeapSize.add(memStoreSize.getOffHeapSize());
   }
 
   public void decGlobalMemStoreSize(MemStoreSize memStoreSize) {
-    globalMemstoreDataSize.add(-memStoreSize.getDataSize());
-    globalMemstoreHeapSize.add(-memStoreSize.getHeapSize());
+    globalMemStoreDataSize.add(-memStoreSize.getDataSize());
+    globalMemStoreHeapSize.add(-memStoreSize.getHeapSize());
+    globalMemStoreOffHeapSize.add(-memStoreSize.getOffHeapSize());
   }
 
   /**
@@ -151,13 +160,13 @@ public class RegionServerAccounting {
       }
     } else {
       // If the configured memstore is offheap, check for two things
-      // 1) If the global memstore data size is greater than the configured
+      // 1) If the global memstore off-heap size is greater than the configured
       // 'hbase.regionserver.offheap.global.memstore.size'
       // 2) If the global memstore heap size is greater than the configured onheap
       // global memstore limit 'hbase.regionserver.global.memstore.size'.
       // We do this to avoid OOME incase of scenarios where the heap is occupied with
       // lot of onheap references to the cells in memstore
-      if (getGlobalMemStoreDataSize() >= globalMemStoreLimit) {
+      if (getGlobalMemStoreOffHeapSize() >= globalMemStoreLimit) {
         // Indicates that global memstore size is above the configured
         // 'hbase.regionserver.offheap.global.memstore.size'
         return FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
@@ -181,8 +190,8 @@ public class RegionServerAccounting {
         return FlushType.ABOVE_ONHEAP_LOWER_MARK;
       }
     } else {
-      if (getGlobalMemStoreDataSize() >= globalMemStoreLimitLowMark) {
-        // Indicates that the offheap memstore's data size is greater than the global memstore
+      if (getGlobalMemStoreOffHeapSize() >= globalMemStoreLimitLowMark) {
+        // Indicates that the offheap memstore's size is greater than the global memstore
         // lower limit
         return FlushType.ABOVE_OFFHEAP_LOWER_MARK;
       } else if (getGlobalMemStoreHeapSize() >= globalOnHeapMemstoreLimitLowMark) {
@@ -203,7 +212,7 @@ public class RegionServerAccounting {
     if (memType == MemoryType.HEAP) {
       return (getGlobalMemStoreHeapSize()) * 1.0 / globalMemStoreLimitLowMark;
     } else {
-      return Math.max(getGlobalMemStoreDataSize() * 1.0 / globalMemStoreLimitLowMark,
+      return Math.max(getGlobalMemStoreOffHeapSize() * 1.0 / globalMemStoreLimitLowMark,
           getGlobalMemStoreHeapSize() * 1.0 / globalOnHeapMemstoreLimitLowMark);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
index a1f5755..5b98a27 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
@@ -66,7 +66,7 @@ public class RegionServicesForStores {
   }
 
   public void addMemStoreSize(MemStoreSize size) {
-    region.addAndGetMemStoreSize(size);
+    region.incMemStoreSize(size);
   }
 
   public RegionInfo getRegionInfo() {
@@ -89,6 +89,6 @@ public class RegionServicesForStores {
 
   @VisibleForTesting
   long getMemStoreSize() {
-    return region.getMemStoreSize();
+    return region.getMemStoreDataSize();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
index 5bfab52..66a2ad5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.hbase.Cell;
@@ -48,9 +47,9 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
 @InterfaceAudience.Private
 public abstract class Segment {
 
-  public final static long FIXED_OVERHEAD = ClassSize.align((long)ClassSize.OBJECT
-      + 6 * ClassSize.REFERENCE // cellSet, comparator, memStoreLAB, dataSize,
-                                // heapSize, and timeRangeTracker
+  public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+      + 5 * ClassSize.REFERENCE // cellSet, comparator, memStoreLAB, memStoreSizing,
+                                // and timeRangeTracker
       + Bytes.SIZEOF_LONG // minSequenceId
       + Bytes.SIZEOF_BOOLEAN); // tagsPresent
   public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.ATOMIC_REFERENCE
@@ -62,8 +61,7 @@ public abstract class Segment {
   private MemStoreLAB memStoreLAB;
   // Sum of sizes of all Cells added to this Segment. Cell's heapSize is considered. This is not
   // including the heap overhead of this class.
-  protected final AtomicLong dataSize;
-  protected final AtomicLong heapSize;
+  protected final MemStoreSizing segmentSize;
   protected final TimeRangeTracker timeRangeTracker;
   protected volatile boolean tagsPresent;
 
@@ -71,8 +69,23 @@ public abstract class Segment {
   // and there is no need in true Segments state
   protected Segment(CellComparator comparator, TimeRangeTracker trt) {
     this.comparator = comparator;
-    this.dataSize = new AtomicLong(0);
-    this.heapSize = new AtomicLong(0);
+    this.segmentSize = new MemStoreSizing();
+    this.timeRangeTracker = trt;
+  }
+
+  protected Segment(CellComparator comparator, List<ImmutableSegment> segments,
+      TimeRangeTracker trt) {
+    long dataSize = 0;
+    long heapSize = 0;
+    long OffHeapSize = 0;
+    for (Segment segment : segments) {
+      MemStoreSize memStoreSize = segment.getMemStoreSize();
+      dataSize += memStoreSize.getDataSize();
+      heapSize += memStoreSize.getHeapSize();
+      OffHeapSize += memStoreSize.getOffHeapSize();
+    }
+    this.comparator = comparator;
+    this.segmentSize = new MemStoreSizing(dataSize, heapSize, OffHeapSize);
     this.timeRangeTracker = trt;
   }
 
@@ -82,8 +95,7 @@ public abstract class Segment {
     this.comparator = comparator;
     this.minSequenceId = Long.MAX_VALUE;
     this.memStoreLAB = memStoreLAB;
-    this.dataSize = new AtomicLong(0);
-    this.heapSize = new AtomicLong(0);
+    this.segmentSize = new MemStoreSizing();
     this.tagsPresent = false;
     this.timeRangeTracker = trt;
   }
@@ -93,8 +105,7 @@ public abstract class Segment {
     this.comparator = segment.getComparator();
     this.minSequenceId = segment.getMinSequenceId();
     this.memStoreLAB = segment.getMemStoreLAB();
-    this.dataSize = new AtomicLong(segment.keySize());
-    this.heapSize = new AtomicLong(segment.heapSize.get());
+    this.segmentSize = new MemStoreSizing(segment.getMemStoreSize());
     this.tagsPresent = segment.isTagsPresent();
     this.timeRangeTracker = segment.getTimeRangeTracker();
   }
@@ -134,17 +145,6 @@ public abstract class Segment {
   }
 
   /**
-   * @return the first cell in the segment that has equal or greater key than the given cell
-   */
-  public Cell getFirstAfter(Cell cell) {
-    SortedSet<Cell> snTailSet = tailSet(cell);
-    if (!snTailSet.isEmpty()) {
-      return snTailSet.first();
-    }
-    return null;
-  }
-
-  /**
    * Closing a segment before it is being discarded
    */
   public void close() {
@@ -221,27 +221,39 @@ public abstract class Segment {
     return this;
   }
 
+  public MemStoreSize getMemStoreSize() {
+    return this.segmentSize;
+  }
+
   /**
    * @return Sum of all cell's size.
    */
   public long keySize() {
-    return this.dataSize.get();
+    return this.segmentSize.getDataSize();
   }
 
   /**
    * @return The heap size of this segment.
    */
   public long heapSize() {
-    return this.heapSize.get();
+    return this.segmentSize.getHeapSize();
+  }
+
+  /**
+   * @return The off-heap size of this segment.
+   */
+  public long offHeapSize() {
+    return this.segmentSize.getOffHeapSize();
   }
 
   /**
    * Updates the size counters of the segment by the given delta
    */
   //TODO
-  protected void incSize(long delta, long heapOverhead) {
-    this.dataSize.addAndGet(delta);
-    this.heapSize.addAndGet(heapOverhead);
+  protected void incSize(long delta, long heapOverhead, long offHeapOverhead) {
+    synchronized (this) {
+      this.segmentSize.incMemStoreSize(delta, heapOverhead, offHeapOverhead);
+    }
   }
 
   public long getMinSequenceId() {
@@ -303,9 +315,10 @@ public abstract class Segment {
       cellSize = getCellLength(cellToAdd);
     }
     long heapSize = heapSizeChange(cellToAdd, succ);
-    incSize(cellSize, heapSize);
+    long offHeapSize = offHeapSizeChange(cellToAdd, succ);
+    incSize(cellSize, heapSize, offHeapSize);
     if (memstoreSizing != null) {
-      memstoreSizing.incMemStoreSize(cellSize, heapSize);
+      memstoreSizing.incMemStoreSize(cellSize, heapSize, offHeapSize);
     }
     getTimeRangeTracker().includeTimestamp(cellToAdd);
     minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId());
@@ -327,10 +340,48 @@ public abstract class Segment {
    *         heap size itself and additional overhead because of addition on to CSLM.
    */
   protected long heapSizeChange(Cell cell, boolean succ) {
+    long res = 0;
     if (succ) {
-      return ClassSize
-          .align(indexEntrySize() + PrivateCellUtil.estimatedHeapSizeOf(cell));
+      boolean onHeap = true;
+      MemStoreLAB memStoreLAB = getMemStoreLAB();
+      if(memStoreLAB != null) {
+        onHeap = memStoreLAB.isOnHeap();
+      }
+      res += indexEntryOnHeapSize(onHeap);
+      if(onHeap) {
+        res += PrivateCellUtil.estimatedSizeOfCell(cell);
+      }
+      res = ClassSize.align(res);
     }
+    return res;
+  }
+
+  protected long offHeapSizeChange(Cell cell, boolean succ) {
+    long res = 0;
+    if (succ) {
+      boolean offHeap = false;
+      MemStoreLAB memStoreLAB = getMemStoreLAB();
+      if(memStoreLAB != null) {
+        offHeap = memStoreLAB.isOffHeap();
+      }
+      res += indexEntryOffHeapSize(offHeap);
+      if(offHeap) {
+        res += PrivateCellUtil.estimatedSizeOfCell(cell);
+      }
+      res = ClassSize.align(res);
+    }
+    return res;
+  }
+
+  protected long indexEntryOnHeapSize(boolean onHeap) {
+    // in most cases index is allocated on-heap
+    // override this method when it is not always the case, e.g., in CCM
+    return indexEntrySize();
+  }
+
+  protected long indexEntryOffHeapSize(boolean offHeap) {
+    // in most cases index is allocated on-heap
+    // override this method when it is not always the case, e.g., in CCM
     return 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 0b9b547..1624810 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -600,7 +600,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
             // Update the progress of the scanner context
             scannerContext.incrementSizeProgress(cellSize,
-              PrivateCellUtil.estimatedHeapSizeOf(cell));
+              PrivateCellUtil.estimatedSizeOfCell(cell));
             scannerContext.incrementBatchProgress(1);
 
             if (matcher.isUserScan() && totalBytesRead > maxRowSize) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
index e7c7caf..1d4dc1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
@@ -168,7 +168,7 @@ public class WALEdit implements HeapSize {
   public long heapSize() {
     long ret = ClassSize.ARRAYLIST;
     for (Cell cell : cells) {
-      ret += PrivateCellUtil.estimatedHeapSizeOf(cell);
+      ret += PrivateCellUtil.estimatedSizeOfCell(cell);
     }
     return ret;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java
index f66a828..2e2d978 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java
@@ -99,7 +99,7 @@ public class TestGlobalMemStoreSize {
       long globalMemStoreSize = 0;
       for (RegionInfo regionInfo :
           ProtobufUtil.getOnlineRegions(null, server.getRSRpcServices())) {
-        globalMemStoreSize += server.getRegion(regionInfo.getEncodedName()).getMemStoreSize();
+        globalMemStoreSize += server.getRegion(regionInfo.getEncodedName()).getMemStoreDataSize();
       }
       assertEquals(server.getRegionServerAccounting().getGlobalMemStoreDataSize(),
         globalMemStoreSize);
@@ -130,7 +130,7 @@ public class TestGlobalMemStoreSize {
         for (RegionInfo regionInfo :
             ProtobufUtil.getOnlineRegions(null, server.getRSRpcServices())) {
           HRegion r = server.getRegion(regionInfo.getEncodedName());
-          long l = r.getMemStoreSize();
+          long l = r.getMemStoreDataSize();
           if (l > 0) {
             // Only meta could have edits at this stage.  Give it another flush
             // clear them.

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
index 3038744..965243f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
@@ -391,7 +391,7 @@ public class TestPartialResultsFromClientSide {
       // Estimate the cell heap size. One difference is that on server side, the KV Heap size is
       // estimated differently in case the cell is backed up by MSLAB byte[] (no overhead for
       // backing array). Thus below calculation is a bit brittle.
-      CELL_HEAP_SIZE = PrivateCellUtil.estimatedHeapSizeOf(result.rawCells()[0])
+      CELL_HEAP_SIZE = PrivateCellUtil.estimatedSizeOfCell(result.rawCells()[0])
           - (ClassSize.ARRAY+3);
       if (LOG.isInfoEnabled()) LOG.info("Cell heap size: " + CELL_HEAP_SIZE);
       scanner.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java
index 162be35..5a3ba82 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FilterList.Operator;
@@ -156,7 +155,7 @@ public class TestServerSideScanMetricsFromClientSide {
       assertTrue(result.rawCells() != null);
       assertTrue(result.rawCells().length == 1);
 
-      CELL_HEAP_SIZE = PrivateCellUtil.estimatedHeapSizeOf(result.rawCells()[0]);
+      CELL_HEAP_SIZE = PrivateCellUtil.estimatedSizeOfCell(result.rawCells()[0]);
       scanner.close();
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
index fbb87bb..762dbd1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
@@ -215,30 +215,30 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
     ASYNC_CONN.getTable(tableName)
         .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1")))
         .join();
-    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreSize() > 0);
+    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
     // flush region and wait flush operation finished.
     LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
     admin.flushRegion(hri.getRegionName()).get();
     LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
     Threads.sleepWithoutInterrupt(500);
-    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreSize() > 0) {
+    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
       Threads.sleep(50);
     }
     // check the memstore.
-    assertEquals(0, regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreSize());
+    assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
 
     // write another put into the specific region
     ASYNC_CONN.getTable(tableName)
         .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2")))
         .join();
-    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreSize() > 0);
+    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
     admin.flush(tableName).get();
     Threads.sleepWithoutInterrupt(500);
-    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreSize() > 0) {
+    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
       Threads.sleep(50);
     }
     // check the memstore.
-    assertEquals(0, regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreSize());
+    assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
   }
 
   private void waitUntilMobCompactionFinished(TableName tableName)

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
index 48d9a93..d6f32f5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
@@ -36,9 +36,7 @@ import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
 import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.MemStoreSize;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -67,7 +65,7 @@ public class TestClientPushback {
   private static final TableName tableName = TableName.valueOf("client-pushback");
   private static final byte[] family = Bytes.toBytes("f");
   private static final byte[] qualifier = Bytes.toBytes("q");
-  private static final long flushSizeBytes = 256;
+  private static final long flushSizeBytes = 512;
 
   @BeforeClass
   public static void setupCluster() throws Exception{
@@ -110,7 +108,7 @@ public class TestClientPushback {
     mutator.flush();
 
     // get the current load on RS. Hopefully memstore isn't flushed since we wrote the the data
-    int load = (int) ((((HRegion) region).addAndGetMemStoreSize(new MemStoreSize(0, 0)) * 100)
+    int load = (int) ((region.getMemStoreHeapSize() * 100)
         / flushSizeBytes);
     LOG.debug("Done writing some data to "+tableName);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java
index dc3f8da..207e1fc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java
@@ -93,7 +93,7 @@ public class TestFlushFromClient {
       t.put(puts);
     }
     assertFalse(getRegionInfo().isEmpty());
-    assertTrue(getRegionInfo().stream().allMatch(r -> r.getMemStoreSize() != 0));
+    assertTrue(getRegionInfo().stream().allMatch(r -> r.getMemStoreDataSize() != 0));
   }
 
   @After
@@ -108,7 +108,7 @@ public class TestFlushFromClient {
   public void testFlushTable() throws Exception {
     try (Admin admin = TEST_UTIL.getAdmin()) {
       admin.flush(tableName);
-      assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreSize() != 0));
+      assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
     }
   }
 
@@ -116,7 +116,7 @@ public class TestFlushFromClient {
   public void testAsyncFlushTable() throws Exception {
     AsyncAdmin admin = asyncConn.getAdmin();
     admin.flush(tableName).get();
-    assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreSize() != 0));
+    assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
   }
 
   @Test
@@ -125,7 +125,7 @@ public class TestFlushFromClient {
       for (HRegion r : getRegionInfo()) {
         admin.flushRegion(r.getRegionInfo().getRegionName());
         TimeUnit.SECONDS.sleep(1);
-        assertEquals(0, r.getMemStoreSize());
+        assertEquals(0, r.getMemStoreDataSize());
       }
     }
   }
@@ -136,7 +136,7 @@ public class TestFlushFromClient {
     for (HRegion r : getRegionInfo()) {
       admin.flushRegion(r.getRegionInfo().getRegionName()).get();
       TimeUnit.SECONDS.sleep(1);
-      assertEquals(0, r.getMemStoreSize());
+      assertEquals(0, r.getMemStoreDataSize());
     }
   }
 
@@ -148,7 +148,7 @@ public class TestFlushFromClient {
             .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
             .collect(Collectors.toList())) {
         admin.flushRegionServer(rs.getServerName());
-        assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreSize() != 0));
+        assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
       }
     }
   }
@@ -161,7 +161,7 @@ public class TestFlushFromClient {
       .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
       .collect(Collectors.toList())) {
       admin.flushRegionServer(rs.getServerName()).get();
-      assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreSize() != 0));
+      assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSizeFailures.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSizeFailures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSizeFailures.java
index a336274..cadce8a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSizeFailures.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSizeFailures.java
@@ -53,7 +53,7 @@ public class TestSizeFailures {
   private static byte [] FAMILY = Bytes.toBytes("testFamily");
   protected static int SLAVES = 1;
   private static TableName TABLENAME;
-  private static final int NUM_ROWS = 1000 * 1000, NUM_COLS = 10;
+  private static final int NUM_ROWS = 1000 * 1000, NUM_COLS = 9;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -129,7 +129,7 @@ public class TestSizeFailures {
       long rowsObserved = entry.getKey();
       long entriesObserved = entry.getValue();
 
-      // Verify that we see 1M rows and 10M cells
+      // Verify that we see 1M rows and 9M cells
       assertEquals(NUM_ROWS, rowsObserved);
       assertEquals(NUM_ROWS * NUM_COLS, entriesObserved);
     }
@@ -152,7 +152,7 @@ public class TestSizeFailures {
       long rowsObserved = entry.getKey();
       long entriesObserved = entry.getValue();
 
-      // Verify that we see 1M rows and 10M cells
+      // Verify that we see 1M rows and 9M cells
       assertEquals(NUM_ROWS, rowsObserved);
       assertEquals(NUM_ROWS * NUM_COLS, entriesObserved);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestNegativeMemStoreSizeWithSlowCoprocessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestNegativeMemStoreSizeWithSlowCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestNegativeMemStoreSizeWithSlowCoprocessor.java
index 336d342..4a92d4c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestNegativeMemStoreSizeWithSlowCoprocessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestNegativeMemStoreSizeWithSlowCoprocessor.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.MemStoreSize;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -111,7 +110,7 @@ public class TestNegativeMemStoreSizeWithSlowCoprocessor {
 
       if (Bytes.equals(put.getRow(), Bytes.toBytes("row2"))) {
         region.flush(false);
-        Assert.assertTrue(region.addAndGetMemStoreSize(new MemStoreSize()) >= 0);
+        Assert.assertTrue(region.getMemStoreDataSize() >= 0);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index d68191c..505c2f0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -699,7 +699,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells.
     int oneCellOnCSLMHeapSize = 120;
     int oneCellOnCAHeapSize = 88;
-    assertEquals(totalCellsLen1, region.getMemStoreSize());
+    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
     long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
     assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
@@ -780,7 +780,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
     int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells.
     int oneCellOnCSLMHeapSize = 120;
-    assertEquals(totalCellsLen1, region.getMemStoreSize());
+    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
     long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize;
     assertEquals(totalHeapSize, memstore.heapSize());
 
@@ -838,7 +838,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
     }
     regionServicesForStores.addMemStoreSize(new MemStoreSize(hmc.getActive().keySize() - size,
-        hmc.getActive().heapSize() - heapOverhead));
+        hmc.getActive().heapSize() - heapOverhead, 0));
     return totalLen;
   }
 
@@ -859,7 +859,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
     }
     regionServicesForStores.addMemStoreSize(new MemStoreSize(hmc.getActive().keySize() - size,
-            hmc.getActive().heapSize() - heapOverhead));
+            hmc.getActive().heapSize() - heapOverhead, 0));
     return totalLen;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
index a8561de..12b078a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
@@ -221,7 +221,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
     long cellBeforeFlushSize = cellBeforeFlushSize();
     long cellAfterFlushSize = cellAfterFlushSize();
     long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
-    assertEquals(totalCellsLen1, region.getMemStoreSize());
+    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
     assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
 
     MemStoreSize size = memstore.getFlushableSize();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
index 2c12341..b0302f6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
@@ -321,7 +321,7 @@ public class TestEndToEndSplitTransaction {
     admin.flushRegion(regionName);
     log("blocking until flush is complete: " + Bytes.toStringBinary(regionName));
     Threads.sleepWithoutInterrupt(500);
-    while (rs.getOnlineRegion(regionName).getMemStoreSize() > 0) {
+    while (rs.getOnlineRegion(regionName).getMemStoreDataSize() > 0) {
       Threads.sleep(50);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 88e1aa2..31dfa2a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -310,7 +310,7 @@ public class TestHRegion {
     region.put(put);
     // Close with something in memstore and something in the snapshot.  Make sure all is cleared.
     region.close();
-    assertEquals(0, region.getMemStoreSize());
+    assertEquals(0, region.getMemStoreDataSize());
     HBaseTestingUtility.closeRegionAndWAL(region);
   }
 
@@ -391,14 +391,14 @@ public class TestHRegion {
     HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
         COLUMN_FAMILY_BYTES);
     HStore store = region.getStore(COLUMN_FAMILY_BYTES);
-    assertEquals(0, region.getMemStoreSize());
+    assertEquals(0, region.getMemStoreDataSize());
 
     // Put one value
     byte [] value = Bytes.toBytes(method);
     Put put = new Put(value);
     put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
     region.put(put);
-    long onePutSize = region.getMemStoreSize();
+    long onePutSize = region.getMemStoreDataSize();
     assertTrue(onePutSize > 0);
 
     RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
@@ -414,7 +414,7 @@ public class TestHRegion {
     } catch (IOException expected) {
     }
     long expectedSize = onePutSize * 2;
-    assertEquals("memstoreSize should be incremented", expectedSize, region.getMemStoreSize());
+    assertEquals("memstoreSize should be incremented", expectedSize, region.getMemStoreDataSize());
     assertEquals("flushable size should be incremented", expectedSize,
         store.getFlushableSize().getDataSize());
 
@@ -459,13 +459,13 @@ public class TestHRegion {
           // Initialize region
           region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal,
               COLUMN_FAMILY_BYTES);
-          long size = region.getMemStoreSize();
+          long size = region.getMemStoreDataSize();
           Assert.assertEquals(0, size);
           // Put one item into memstore.  Measure the size of one item in memstore.
           Put p1 = new Put(row);
           p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[]) null));
           region.put(p1);
-          final long sizeOfOnePut = region.getMemStoreSize();
+          final long sizeOfOnePut = region.getMemStoreDataSize();
           // Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
           try {
             LOG.info("Flushing");
@@ -478,7 +478,7 @@ public class TestHRegion {
           // Make it so all writes succeed from here on out
           ffs.fault.set(false);
           // Check sizes.  Should still be the one entry.
-          Assert.assertEquals(sizeOfOnePut, region.getMemStoreSize());
+          Assert.assertEquals(sizeOfOnePut, region.getMemStoreDataSize());
           // Now add two entries so that on this next flush that fails, we can see if we
           // subtract the right amount, the snapshot size only.
           Put p2 = new Put(row);
@@ -486,13 +486,13 @@ public class TestHRegion {
           p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
           region.put(p2);
           long expectedSize = sizeOfOnePut * 3;
-          Assert.assertEquals(expectedSize, region.getMemStoreSize());
+          Assert.assertEquals(expectedSize, region.getMemStoreDataSize());
           // Do a successful flush.  It will clear the snapshot only.  Thats how flushes work.
           // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
           // it
           region.flush(true);
           // Make sure our memory accounting is right.
-          Assert.assertEquals(sizeOfOnePut * 2, region.getMemStoreSize());
+          Assert.assertEquals(sizeOfOnePut * 2, region.getMemStoreDataSize());
         } finally {
           HBaseTestingUtility.closeRegionAndWAL(region);
         }
@@ -524,7 +524,7 @@ public class TestHRegion {
           // Initialize region
           region = initHRegion(tableName, null, null, false,
               Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
-          long size = region.getMemStoreSize();
+          long size = region.getMemStoreDataSize();
           Assert.assertEquals(0, size);
           // Put one item into memstore.  Measure the size of one item in memstore.
           Put p1 = new Put(row);

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 44ff84d..26b57b0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -366,7 +366,7 @@ public class TestHRegionReplayEvents {
         verifyData(secondaryRegion, 0, lastReplayed, cq, families);
         HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
         long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
-        long regionMemstoreSize = secondaryRegion.getMemStoreSize();
+        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
         long storeFlushableSize = store.getFlushableSize().getHeapSize();
         long storeSize = store.getSize();
         long storeSizeUncompressed = store.getStoreSizeUncompressed();
@@ -395,7 +395,7 @@ public class TestHRegionReplayEvents {
           assertTrue(storeFlushableSize > newFlushableSize);
 
           // assert that the region memstore is smaller now
-          long newRegionMemstoreSize = secondaryRegion.getMemStoreSize();
+          long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
           assertTrue(regionMemstoreSize > newRegionMemstoreSize);
 
           // assert that the store sizes are bigger
@@ -465,7 +465,7 @@ public class TestHRegionReplayEvents {
         // first verify that everything is replayed and visible before flush event replay
         HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
         long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
-        long regionMemstoreSize = secondaryRegion.getMemStoreSize();
+        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
         long storeFlushableSize = store.getFlushableSize().getHeapSize();
 
         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
@@ -505,7 +505,7 @@ public class TestHRegionReplayEvents {
     assertNotNull(secondaryRegion.getPrepareFlushResult());
     assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
       startFlushDesc.getFlushSequenceNumber());
-    assertTrue(secondaryRegion.getMemStoreSize() > 0); // memstore is not empty
+    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
     verifyData(secondaryRegion, 0, numRows, cq, families);
 
     // Test case 2: replay a flush start marker with a smaller seqId
@@ -518,7 +518,7 @@ public class TestHRegionReplayEvents {
     assertNotNull(secondaryRegion.getPrepareFlushResult());
     assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
       startFlushDesc.getFlushSequenceNumber());
-    assertTrue(secondaryRegion.getMemStoreSize() > 0); // memstore is not empty
+    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
     verifyData(secondaryRegion, 0, numRows, cq, families);
 
     // Test case 3: replay a flush start marker with a larger seqId
@@ -531,7 +531,7 @@ public class TestHRegionReplayEvents {
     assertNotNull(secondaryRegion.getPrepareFlushResult());
     assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
       startFlushDesc.getFlushSequenceNumber());
-    assertTrue(secondaryRegion.getMemStoreSize() > 0); // memstore is not empty
+    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
     verifyData(secondaryRegion, 0, numRows, cq, families);
 
     LOG.info("-- Verifying edits from secondary");
@@ -600,7 +600,7 @@ public class TestHRegionReplayEvents {
     for (HStore s : secondaryRegion.getStores()) {
       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
     }
-    long regionMemstoreSize = secondaryRegion.getMemStoreSize();
+    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
 
     // Test case 1: replay the a flush commit marker smaller than what we have prepared
     LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
@@ -620,7 +620,7 @@ public class TestHRegionReplayEvents {
     assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
 
     // assert that the region memstore is same as before
-    long newRegionMemstoreSize = secondaryRegion.getMemStoreSize();
+    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
     assertEquals(regionMemstoreSize, newRegionMemstoreSize);
 
     assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped
@@ -690,7 +690,7 @@ public class TestHRegionReplayEvents {
     for (HStore s : secondaryRegion.getStores()) {
       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
     }
-    long regionMemstoreSize = secondaryRegion.getMemStoreSize();
+    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
 
     // Test case 1: replay the a flush commit marker larger than what we have prepared
     LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
@@ -710,7 +710,7 @@ public class TestHRegionReplayEvents {
     assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
 
     // assert that the region memstore is smaller than before, but not empty
-    long newRegionMemstoreSize = secondaryRegion.getMemStoreSize();
+    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
     assertTrue(newRegionMemstoreSize > 0);
     assertTrue(regionMemstoreSize > newRegionMemstoreSize);
 
@@ -791,7 +791,7 @@ public class TestHRegionReplayEvents {
     for (HStore s : secondaryRegion.getStores()) {
       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
     }
-    long regionMemstoreSize = secondaryRegion.getMemStoreSize();
+    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
 
     // Test case 1: replay a flush commit marker without start flush marker
     assertNull(secondaryRegion.getPrepareFlushResult());
@@ -820,7 +820,7 @@ public class TestHRegionReplayEvents {
     }
 
     // assert that the region memstore is same as before (we could not drop)
-    long newRegionMemstoreSize = secondaryRegion.getMemStoreSize();
+    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
     if (droppableMemstore) {
       assertTrue(0 == newRegionMemstoreSize);
     } else {
@@ -890,7 +890,7 @@ public class TestHRegionReplayEvents {
     for (HStore s : secondaryRegion.getStores()) {
       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
     }
-    long regionMemstoreSize = secondaryRegion.getMemStoreSize();
+    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
     assertTrue(regionMemstoreSize == 0);
 
     // now replay the region open event that should contain new file locations
@@ -907,7 +907,7 @@ public class TestHRegionReplayEvents {
     assertTrue(newFlushableSize == MutableSegment.DEEP_OVERHEAD);
 
     // assert that the region memstore is empty
-    long newRegionMemstoreSize = secondaryRegion.getMemStoreSize();
+    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
     assertTrue(newRegionMemstoreSize == 0);
 
     assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
@@ -986,7 +986,7 @@ public class TestHRegionReplayEvents {
     assertTrue(newSnapshotSize.getDataSize() == 0);
 
     // assert that the region memstore is empty
-    long newRegionMemstoreSize = secondaryRegion.getMemStoreSize();
+    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
     assertTrue(newRegionMemstoreSize == 0);
 
     assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
@@ -1434,7 +1434,7 @@ public class TestHRegionReplayEvents {
     LOG.info("-- Replaying edits in secondary");
 
     // Test case 4: replay some edits, ensure that memstore is dropped.
-    assertTrue(secondaryRegion.getMemStoreSize() == 0);
+    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
     putDataWithFlushes(primaryRegion, 400, 400, 0);
     numRows = 400;
 
@@ -1452,11 +1452,11 @@ public class TestHRegionReplayEvents {
       }
     }
 
-    assertTrue(secondaryRegion.getMemStoreSize() > 0);
+    assertTrue(secondaryRegion.getMemStoreDataSize() > 0);
 
     secondaryRegion.refreshStoreFiles();
 
-    assertTrue(secondaryRegion.getMemStoreSize() == 0);
+    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
 
     LOG.info("-- Verifying edits from primary");
     verifyData(primaryRegion, 0, numRows, cq, families);

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index ea40200..9479890 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -265,7 +265,7 @@ public class TestHStore {
         MemStoreSizing kvSize = new MemStoreSizing();
         store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
         // add the heap size of active (mutable) segment
-        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD);
+        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0);
         size = store.memstore.getFlushableSize();
         assertEquals(kvSize, size);
         // Flush.  Bug #1 from HBASE-10466.  Make sure size calculation on failed flush is right.
@@ -278,12 +278,12 @@ public class TestHStore {
         }
         // due to snapshot, change mutable to immutable segment
         kvSize.incMemStoreSize(0,
-            CSLMImmutableSegment.DEEP_OVERHEAD_CSLM-MutableSegment.DEEP_OVERHEAD);
+            CSLMImmutableSegment.DEEP_OVERHEAD_CSLM-MutableSegment.DEEP_OVERHEAD, 0);
         size = store.memstore.getFlushableSize();
         assertEquals(kvSize, size);
         MemStoreSizing kvSize2 = new MemStoreSizing();
         store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2);
-        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD);
+        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0);
         // Even though we add a new kv, we expect the flushable size to be 'same' since we have
         // not yet cleared the snapshot -- the above flush failed.
         assertEquals(kvSize, size);

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
index 353cc28..fded9ba 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -149,7 +149,7 @@ public class TestPerColumnFamilyFlush {
       }
     }
 
-    long totalMemstoreSize = region.getMemStoreSize();
+    long totalMemstoreSize = region.getMemStoreDataSize();
 
     // Find the smallest LSNs for edits wrt to each CF.
     long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1);
@@ -192,7 +192,7 @@ public class TestPerColumnFamilyFlush {
     cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
     cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
     cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
-    totalMemstoreSize = region.getMemStoreSize();
+    totalMemstoreSize = region.getMemStoreDataSize();
     smallestSeqInRegionCurrentMemstore = getWAL(region)
         .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
 
@@ -230,7 +230,7 @@ public class TestPerColumnFamilyFlush {
     cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
     cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
     cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
-    totalMemstoreSize = region.getMemStoreSize();
+    totalMemstoreSize = region.getMemStoreDataSize();
     smallestSeqInRegionCurrentMemstore = getWAL(region)
         .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
 
@@ -265,7 +265,7 @@ public class TestPerColumnFamilyFlush {
 
     // Since we won't find any CF above the threshold, and hence no specific
     // store to flush, we should flush all the memstores.
-    assertEquals(0, region.getMemStoreSize());
+    assertEquals(0, region.getMemStoreDataSize());
     HBaseTestingUtility.closeRegionAndWAL(region);
   }
 
@@ -289,7 +289,7 @@ public class TestPerColumnFamilyFlush {
       }
     }
 
-    long totalMemstoreSize = region.getMemStoreSize();
+    long totalMemstoreSize = region.getMemStoreDataSize();
 
     // Find the sizes of the memstores of each CF.
     MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
@@ -312,7 +312,7 @@ public class TestPerColumnFamilyFlush {
     cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
     cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
     cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
-    totalMemstoreSize = region.getMemStoreSize();
+    totalMemstoreSize = region.getMemStoreDataSize();
     long smallestSeqInRegionCurrentMemstore =
         region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
 
@@ -381,7 +381,7 @@ public class TestPerColumnFamilyFlush {
 
       long totalMemstoreSize;
       long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
-      totalMemstoreSize = desiredRegion.getMemStoreSize();
+      totalMemstoreSize = desiredRegion.getMemStoreDataSize();
 
       // Find the sizes of the memstores of each CF.
       cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize().getDataSize();
@@ -504,12 +504,12 @@ public class TestPerColumnFamilyFlush {
 
         @Override
         public boolean evaluate() throws Exception {
-          return desiredRegion.getMemStoreSize() == 0;
+          return desiredRegion.getMemStoreDataSize() == 0;
         }
 
         @Override
         public String explainFailure() throws Exception {
-          long memstoreSize = desiredRegion.getMemStoreSize();
+          long memstoreSize = desiredRegion.getMemStoreDataSize();
           if (memstoreSize > 0) {
             return "Still have unflushed entries in memstore, memstore size is " + memstoreSize;
           }
@@ -551,7 +551,7 @@ public class TestPerColumnFamilyFlush {
       put.addColumn(FAMILY3, qf, value3);
       table.put(put);
       // slow down to let regionserver flush region.
-      while (region.getMemStoreSize() > memstoreFlushSize) {
+      while (region.getMemStoreHeapSize() > memstoreFlushSize) {
         Thread.sleep(100);
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAccounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAccounting.java
index ede9cae..7bd9e16 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAccounting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAccounting.java
@@ -42,7 +42,7 @@ public class TestRegionServerAccounting {
     // try for default cases
     RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
     MemStoreSize memstoreSize =
-        new MemStoreSize(3L * 1024 * 1024 * 1024, 1L * 1024 * 1024 * 1024);
+        new MemStoreSize((3L * 1024L * 1024L * 1024L), (1L * 1024L * 1024L * 1024L), 0);
     regionServerAccounting.incGlobalMemStoreSize(memstoreSize);
     assertEquals(FlushType.ABOVE_ONHEAP_HIGHER_MARK,
       regionServerAccounting.isAboveHighWaterMark());
@@ -55,7 +55,7 @@ public class TestRegionServerAccounting {
     // try for default cases
     RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
     MemStoreSize memstoreSize =
-        new MemStoreSize(3L * 1024 * 1024 * 1024, 1L * 1024 * 1024 * 1024);
+        new MemStoreSize((3L * 1024L * 1024L * 1024L), (1L * 1024L * 1024L * 1024L), 0);
     regionServerAccounting.incGlobalMemStoreSize(memstoreSize);
     assertEquals(FlushType.ABOVE_ONHEAP_LOWER_MARK,
       regionServerAccounting.isAboveLowWaterMark());
@@ -65,12 +65,12 @@ public class TestRegionServerAccounting {
   public void testOffheapMemstoreHigherWaterMarkLimitsDueToDataSize() {
     Configuration conf = HBaseConfiguration.create();
     // setting 1G as offheap data size
-    conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1L * 1024));
+    conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1L * 1024L));
     // try for default cases
     RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
     // this will breach offheap limit as data size is higher and not due to heap size
     MemStoreSize memstoreSize =
-        new MemStoreSize(3L * 1024 * 1024 * 1024, 1L * 1024 * 1024 * 1024);
+        new MemStoreSize((3L * 1024L * 1024L * 1024L), 0, (1L * 1024L * 1024L * 1024L));
     regionServerAccounting.incGlobalMemStoreSize(memstoreSize);
     assertEquals(FlushType.ABOVE_OFFHEAP_HIGHER_MARK,
       regionServerAccounting.isAboveHighWaterMark());
@@ -81,12 +81,12 @@ public class TestRegionServerAccounting {
     Configuration conf = HBaseConfiguration.create();
     conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.2f);
     // setting 1G as offheap data size
-    conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1L * 1024));
+    conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1L * 1024L));
     // try for default cases
     RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
     // this will breach higher limit as heap size is higher and not due to offheap size
     MemStoreSize memstoreSize =
-        new MemStoreSize(3L * 1024 * 1024, 2L * 1024 * 1024 * 1024);
+        new MemStoreSize((3L * 1024L * 1024L), (2L * 1024L * 1024L * 1024L), 0);
     regionServerAccounting.incGlobalMemStoreSize(memstoreSize);
     assertEquals(FlushType.ABOVE_ONHEAP_HIGHER_MARK,
       regionServerAccounting.isAboveHighWaterMark());
@@ -96,12 +96,12 @@ public class TestRegionServerAccounting {
   public void testOffheapMemstoreLowerWaterMarkLimitsDueToDataSize() {
     Configuration conf = HBaseConfiguration.create();
     // setting 1G as offheap data size
-    conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1L * 1024));
+    conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1L * 1024L));
     // try for default cases
     RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
     // this will breach offheap limit as data size is higher and not due to heap size
     MemStoreSize memstoreSize =
-        new MemStoreSize(3L * 1024 * 1024 * 1024, 1L * 1024 * 1024 * 1024);
+        new MemStoreSize((3L * 1024L * 1024L * 1024L), 0, (1L * 1024L * 1024L * 1024L));
     regionServerAccounting.incGlobalMemStoreSize(memstoreSize);
     assertEquals(FlushType.ABOVE_OFFHEAP_LOWER_MARK,
       regionServerAccounting.isAboveLowWaterMark());
@@ -112,12 +112,12 @@ public class TestRegionServerAccounting {
     Configuration conf = HBaseConfiguration.create();
     conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.2f);
     // setting 1G as offheap data size
-    conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1L * 1024));
+    conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1L * 1024L));
     // try for default cases
     RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
     // this will breach higher limit as heap size is higher and not due to offheap size
     MemStoreSize memstoreSize =
-        new MemStoreSize(3L * 1024 * 1024, 2L * 1024 * 1024 * 1024);
+        new MemStoreSize((3L * 1024L * 1024L), (2L * 1024L * 1024L * 1024L), 0);
     regionServerAccounting.incGlobalMemStoreSize(memstoreSize);
     assertEquals(FlushType.ABOVE_ONHEAP_LOWER_MARK,
       regionServerAccounting.isAboveLowWaterMark());

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index ca65914..5eb8fa8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -264,8 +264,8 @@ public class TestWALLockup {
         @Override
         public void run() {
           try {
-            if (region.getMemStoreSize() <= 0) {
-              throw new IOException("memstore size=" + region.getMemStoreSize());
+            if (region.getMemStoreDataSize() <= 0) {
+              throw new IOException("memstore size=" + region.getMemStoreDataSize());
             }
             region.flush(false);
           } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
index 3f73d37..15bf2a4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
@@ -172,7 +172,7 @@ public class TestWalAndCompactingMemStoreFlush {
       region.put(createPut(2, i));
     }
 
-    long totalMemstoreSize = region.getMemStoreSize();
+    long totalMemstoreSize = region.getMemStoreDataSize();
 
     // Find the smallest LSNs for edits wrt to each CF.
     long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
@@ -363,13 +363,13 @@ public class TestWalAndCompactingMemStoreFlush {
     s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: "
         + smallestSeqInRegionCurrentMemstorePhaseV
         + ". After additional inserts and last flush, the entire region size is:" + region
-        .getMemStoreSize()
+        .getMemStoreDataSize()
         + "\n----------------------------------\n";
 
     // Since we won't find any CF above the threshold, and hence no specific
     // store to flush, we should flush all the memstores
     // Also compacted memstores are flushed to disk.
-    assertEquals(0, region.getMemStoreSize());
+    assertEquals(0, region.getMemStoreDataSize());
     System.out.println(s);
     HBaseTestingUtility.closeRegionAndWAL(region);
   }
@@ -411,7 +411,7 @@ public class TestWalAndCompactingMemStoreFlush {
     /*------------------------------------------------------------------------------*/
     /*------------------------------------------------------------------------------*/
     /* PHASE I - collect sizes */
-    long totalMemstoreSizePhaseI = region.getMemStoreSize();
+    long totalMemstoreSizePhaseI = region.getMemStoreDataSize();
     // Find the smallest LSNs for edits wrt to each CF.
     long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
     long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
@@ -474,7 +474,7 @@ public class TestWalAndCompactingMemStoreFlush {
         .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
     // Find the smallest LSNs for edits wrt to each CF.
     long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
-    long totalMemstoreSizePhaseII = region.getMemStoreSize();
+    long totalMemstoreSizePhaseII = region.getMemStoreDataSize();
 
     /*------------------------------------------------------------------------------*/
     /* PHASE II - validation */
@@ -517,7 +517,7 @@ public class TestWalAndCompactingMemStoreFlush {
     /* PHASE III - collect sizes */
     // How much does the CF1 memstore occupy now? Will be used later.
     MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
-    long totalMemstoreSizePhaseIII = region.getMemStoreSize();
+    long totalMemstoreSizePhaseIII = region.getMemStoreDataSize();
 
     /*------------------------------------------------------------------------------*/
     /* PHASE III - validation */
@@ -575,7 +575,7 @@ public class TestWalAndCompactingMemStoreFlush {
     MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
     long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
         .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
-    long totalMemstoreSizePhaseV = region.getMemStoreSize();
+    long totalMemstoreSizePhaseV = region.getMemStoreDataSize();
 
     /*------------------------------------------------------------------------------*/
     /* PHASE V - validation */
@@ -663,7 +663,7 @@ public class TestWalAndCompactingMemStoreFlush {
     ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).setCompositeSnapshot(false);
     ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).setCompositeSnapshot(false);
 
-    long totalMemstoreSize = region.getMemStoreSize();
+    long totalMemstoreSize = region.getMemStoreDataSize();
 
     // Find the sizes of the memstores of each CF.
     MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
@@ -794,7 +794,7 @@ public class TestWalAndCompactingMemStoreFlush {
       region.put(createPut(2, i));
     }
 
-    long totalMemstoreSize = region.getMemStoreSize();
+    long totalMemstoreSize = region.getMemStoreDataSize();
 
     // test in-memory flashing into CAM here
     ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).setIndexType(


[2/2] hbase git commit: HBASE-18294 Reduce global heap pressure: flush based on heap occupancy

Posted by es...@apache.org.
HBASE-18294 Reduce global heap pressure: flush based on heap occupancy


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f3bb9b96
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f3bb9b96
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f3bb9b96

Branch: refs/heads/master
Commit: f3bb9b9613beaefbee0a53be1fb66b94fde1ce19
Parents: f3ff55a
Author: eshcar <es...@yahoo-inc.com>
Authored: Sun Feb 18 09:55:44 2018 +0200
Committer: eshcar <es...@yahoo-inc.com>
Committed: Sun Feb 18 09:55:44 2018 +0200

----------------------------------------------------------------------
 .../hadoop/hbase/client/ConnectionUtils.java    |   2 +-
 .../apache/hadoop/hbase/client/Mutation.java    |   2 +-
 .../org/apache/hadoop/hbase/client/Result.java  |   2 +-
 .../apache/hadoop/hbase/ByteBufferKeyValue.java |   2 +-
 .../java/org/apache/hadoop/hbase/CellUtil.java  |   2 +-
 .../apache/hadoop/hbase/PrivateCellUtil.java    |   7 +-
 .../hbase/util/MapReduceExtendedCell.java       |   2 +-
 .../hadoop/hbase/io/hfile/HFileBlockIndex.java  |   2 +-
 .../hbase/regionserver/AbstractMemStore.java    |   4 +-
 .../regionserver/CSLMImmutableSegment.java      |   3 +-
 .../regionserver/CellArrayImmutableSegment.java |   9 +-
 .../regionserver/CellChunkImmutableSegment.java |  52 ++++++-
 .../hbase/regionserver/CompactingMemStore.java  |   8 +-
 .../hbase/regionserver/CompactionPipeline.java  |  47 +++---
 .../regionserver/CompositeImmutableSegment.java |  12 +-
 .../hbase/regionserver/DefaultMemStore.java     |   4 +-
 .../regionserver/FlushAllLargeStoresPolicy.java |   2 +-
 .../regionserver/FlushLargeStoresPolicy.java    |  52 ++++---
 .../FlushNonSloppyStoresFirstPolicy.java        |   2 +-
 .../hadoop/hbase/regionserver/HRegion.java      | 144 ++++++++++---------
 .../hbase/regionserver/HRegionServer.java       |  33 ++++-
 .../hadoop/hbase/regionserver/HStore.java       |   2 +-
 .../regionserver/ImmutableMemStoreLAB.java      |  12 ++
 .../hbase/regionserver/ImmutableSegment.java    |   4 +
 .../hbase/regionserver/MemStoreFlusher.java     |  92 +++++++++---
 .../hadoop/hbase/regionserver/MemStoreLAB.java  |   8 +-
 .../hbase/regionserver/MemStoreLABImpl.java     |  10 ++
 .../hadoop/hbase/regionserver/MemStoreSize.java |  52 ++++++-
 .../hbase/regionserver/MemStoreSizing.java      |  58 +++-----
 .../hbase/regionserver/MemStoreSnapshot.java    |  16 +--
 .../MetricsTableWrapperAggregateImpl.java       |   2 +-
 .../hbase/regionserver/MutableSegment.java      |   7 +-
 .../hadoop/hbase/regionserver/Region.java       |  16 ++-
 .../regionserver/RegionServerAccounting.java    |  45 +++---
 .../regionserver/RegionServicesForStores.java   |   4 +-
 .../hadoop/hbase/regionserver/Segment.java      | 115 ++++++++++-----
 .../hadoop/hbase/regionserver/StoreScanner.java |   2 +-
 .../org/apache/hadoop/hbase/wal/WALEdit.java    |   2 +-
 .../hadoop/hbase/TestGlobalMemStoreSize.java    |   4 +-
 .../hbase/TestPartialResultsFromClientSide.java |   2 +-
 ...TestServerSideScanMetricsFromClientSide.java |   3 +-
 .../hbase/client/TestAsyncRegionAdminApi.java   |  12 +-
 .../hadoop/hbase/client/TestClientPushback.java |   6 +-
 .../hbase/client/TestFlushFromClient.java       |  14 +-
 .../hadoop/hbase/client/TestSizeFailures.java   |   6 +-
 ...NegativeMemStoreSizeWithSlowCoprocessor.java |   3 +-
 .../regionserver/TestCompactingMemStore.java    |   8 +-
 .../TestCompactingToCellFlatMapMemStore.java    |   2 +-
 .../TestEndToEndSplitTransaction.java           |   2 +-
 .../hadoop/hbase/regionserver/TestHRegion.java  |  20 +--
 .../regionserver/TestHRegionReplayEvents.java   |  36 ++---
 .../hadoop/hbase/regionserver/TestHStore.java   |   6 +-
 .../regionserver/TestPerColumnFamilyFlush.java  |  20 +--
 .../TestRegionServerAccounting.java             |  20 +--
 .../hbase/regionserver/TestWALLockup.java       |   4 +-
 .../TestWalAndCompactingMemStoreFlush.java      |  18 +--
 56 files changed, 649 insertions(+), 377 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 1a093f8..c9e994f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -320,7 +320,7 @@ public final class ConnectionUtils {
     long estimatedHeapSizeOfResult = 0;
     // We don't make Iterator here
     for (Cell cell : rs.rawCells()) {
-      estimatedHeapSizeOfResult += PrivateCellUtil.estimatedHeapSizeOf(cell);
+      estimatedHeapSizeOfResult += PrivateCellUtil.estimatedSizeOfCell(cell);
     }
     return estimatedHeapSizeOfResult;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index 4398fd6..09000ac 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -488,7 +488,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
           size * ClassSize.REFERENCE);
 
       for(Cell cell : entry.getValue()) {
-        heapsize += PrivateCellUtil.estimatedHeapSizeOf(cell);
+        heapsize += PrivateCellUtil.estimatedSizeOfCell(cell);
       }
     }
     heapsize += getAttributeSize();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index d30c25f..832689e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -859,7 +859,7 @@ public class Result implements CellScannable, CellScanner {
       return size;
     }
     for (Cell c : result.rawCells()) {
-      size += PrivateCellUtil.estimatedHeapSizeOf(c);
+      size += PrivateCellUtil.estimatedSizeOfCell(c);
     }
     return size;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferKeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferKeyValue.java
index c82ed8d..760d02c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferKeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferKeyValue.java
@@ -255,7 +255,7 @@ public class ByteBufferKeyValue extends ByteBufferExtendedCell {
     if (this.buf.hasArray()) {
       return ClassSize.align(FIXED_OVERHEAD + length);
     }
-    return ClassSize.align(FIXED_OVERHEAD);
+    return ClassSize.align(FIXED_OVERHEAD) + KeyValueUtil.length(this);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 7e13885..de39fcc 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -1016,7 +1016,7 @@ public final class CellUtil {
    */
   @Deprecated
   public static long estimatedHeapSizeOf(final Cell cell) {
-    return PrivateCellUtil.estimatedHeapSizeOf(cell);
+    return PrivateCellUtil.estimatedSizeOfCell(cell);
   }
 
   /********************* tags *************************************/

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java
index d48ab60..4ebe26c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java
@@ -250,7 +250,7 @@ public final class PrivateCellUtil {
 
     @Override
     public long heapSize() {
-      long sum = HEAP_SIZE_OVERHEAD + estimatedHeapSizeOf(cell);
+      long sum = HEAP_SIZE_OVERHEAD + estimatedSizeOfCell(cell);
       if (this.tags != null) {
         sum += ClassSize.sizeOf(this.tags);
       }
@@ -446,7 +446,7 @@ public final class PrivateCellUtil {
 
     @Override
     public long heapSize() {
-      long sum = HEAP_SIZE_OVERHEAD + estimatedHeapSizeOf(cell);
+      long sum = HEAP_SIZE_OVERHEAD + estimatedSizeOfCell(cell);
       // this.tags is on heap byte[]
       if (this.tags != null) {
         sum += ClassSize.sizeOf(this.tags);
@@ -2783,10 +2783,11 @@ public final class PrivateCellUtil {
    * {@link HeapSize} we call {@link HeapSize#heapSize()} so cell can give a correct value. In other
    * cases we just consider the bytes occupied by the cell components ie. row, CF, qualifier,
    * timestamp, type, value and tags.
+   * Note that this can be the JVM heap space (on-heap) or the OS heap (off-heap)
    * @param cell
    * @return estimate of the heap space
    */
-  public static long estimatedHeapSizeOf(final Cell cell) {
+  public static long estimatedSizeOfCell(final Cell cell) {
     if (cell instanceof HeapSize) {
       return ((HeapSize) cell).heapSize();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceExtendedCell.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceExtendedCell.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceExtendedCell.java
index 73eb7d8..75b57f4 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceExtendedCell.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceExtendedCell.java
@@ -241,7 +241,7 @@ public class MapReduceExtendedCell extends ByteBufferExtendedCell {
 
   @Override
   public long heapSize() {
-    return PrivateCellUtil.estimatedHeapSizeOf(cell);
+    return PrivateCellUtil.estimatedSizeOfCell(cell);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index 7b8815f..e8818be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -266,7 +266,7 @@ public class HFileBlockIndex {
 
         // Adding blockKeys
         for (Cell key : blockKeys) {
-          heapSize += ClassSize.align(PrivateCellUtil.estimatedHeapSizeOf(key));
+          heapSize += ClassSize.align(PrivateCellUtil.estimatedSizeOfCell(key));
         }
       }
       // Add comparator and the midkey atomicreference

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index 6dbe0a8..e6fd04d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -171,7 +171,9 @@ public abstract class AbstractMemStore implements MemStore {
   }
 
   MemStoreSizing getSnapshotSizing() {
-    return new MemStoreSizing(this.snapshot.keySize(), this.snapshot.heapSize());
+    return new MemStoreSizing(this.snapshot.keySize(),
+        this.snapshot.heapSize(),
+        this.snapshot.offHeapSize());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java
index b5fe033..6af84cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java
@@ -39,7 +39,8 @@ public class CSLMImmutableSegment extends ImmutableSegment {
   protected CSLMImmutableSegment(Segment segment) {
     super(segment);
     // update the segment metadata heap size
-    incSize(0, -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM);
+    long indexOverhead = -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM;
+    incSize(0, indexOverhead, 0); // CSLM is always on-heap
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
index 7e00899..4631200 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
@@ -45,7 +45,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
   protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
       MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) {
     super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL
-    incSize(0, DEEP_OVERHEAD_CAM);
+    incSize(0, DEEP_OVERHEAD_CAM, 0); // CAM is always on-heap
     // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
     initializeCellSet(numOfCells, iterator, action);
   }
@@ -58,7 +58,8 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
   protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing,
       MemStoreCompactionStrategy.Action action) {
     super(segment); // initiailize the upper class
-    incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM);
+    long indexOverhead = DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
+    incSize(0, indexOverhead, 0); // CAM is always on-heap
     int numOfCells = segment.getCellsCount();
     // build the new CellSet based on CellChunkMap and update the CellSet of this Segment
     reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
@@ -66,8 +67,8 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
     // arrange the meta-data size, decrease all meta-data sizes related to SkipList;
     // add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes)
     long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
-    incSize(0, newSegmentSizeDelta);
-    memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta);
+    incSize(0, newSegmentSizeDelta, 0);
+    memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
index bf9b191..53458f1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
@@ -53,7 +53,15 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
   protected CellChunkImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
       MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) {
     super(null, comparator, memStoreLAB); // initialize the CellSet with NULL
-    incSize(0, DEEP_OVERHEAD_CCM); // initiate the heapSize with the size of the segment metadata
+    long indexOverhead = DEEP_OVERHEAD_CCM;
+    // memStoreLAB cannot be null in this class
+    boolean onHeap = getMemStoreLAB().isOnHeap();
+    // initiate the heapSize with the size of the segment metadata
+    if(onHeap) {
+      incSize(0, indexOverhead, 0);
+    } else {
+      incSize(0, 0, indexOverhead);
+    }
     // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
     initializeCellSet(numOfCells, iterator, action);
   }
@@ -66,7 +74,15 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
   protected CellChunkImmutableSegment(CSLMImmutableSegment segment,
       MemStoreSizing memstoreSizing, MemStoreCompactionStrategy.Action action) {
     super(segment); // initiailize the upper class
-    incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
+    long indexOverhead = -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM + DEEP_OVERHEAD_CCM;
+    // memStoreLAB cannot be null in this class
+    boolean onHeap = getMemStoreLAB().isOnHeap();
+    // initiate the heapSize with the size of the segment metadata
+    if(onHeap) {
+      incSize(0, indexOverhead, 0);
+    } else {
+      incSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM);
+    }
     int numOfCells = segment.getCellsCount();
     // build the new CellSet based on CellChunkMap
     reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
@@ -75,9 +91,32 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
     // add sizes of CellChunkMap entry, decrease also Cell object sizes
     // (reinitializeCellSet doesn't take the care for the sizes)
     long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    if(onHeap) {
+      incSize(0, newSegmentSizeDelta, 0);
+      memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0);
+    } else {
+      incSize(0, 0, newSegmentSizeDelta);
+      memstoreSizing.incMemStoreSize(0, 0, newSegmentSizeDelta);
+
+    }
+  }
 
-    incSize(0, newSegmentSizeDelta);
-    memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta);
+  @Override
+  protected long indexEntryOnHeapSize(boolean onHeap) {
+    if(onHeap) {
+      return indexEntrySize();
+    }
+    // else the index is allocated off-heap
+    return 0;
+  }
+
+  @Override
+  protected long indexEntryOffHeapSize(boolean offHeap) {
+    if(offHeap) {
+      return indexEntrySize();
+    }
+    // else the index is allocated on-heap
+    return 0;
   }
 
   @Override
@@ -257,13 +296,16 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
     // The actual size of the cell is not added yet, and will be added (only in compaction)
     // in initializeCellSet#updateMetaInfo().
     long oldHeapSize = heapSizeChange(cell, true);
+    long oldOffHeapSize = offHeapSizeChange(cell, true);
     long oldCellSize = getCellLength(cell);
     cell = maybeCloneWithAllocator(cell, true);
     long newHeapSize = heapSizeChange(cell, true);
+    long newOffHeapSize = offHeapSizeChange(cell, true);
     long newCellSize = getCellLength(cell);
     long heapOverhead = newHeapSize - oldHeapSize;
+    long offHeapOverhead = newOffHeapSize - oldOffHeapSize;
     //TODO: maybe need to update the dataSize of the region
-    incSize(newCellSize - oldCellSize, heapOverhead);
+    incSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead);
     return cell;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 3cb4103..bcecdc7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -157,9 +157,9 @@ public class CompactingMemStore extends AbstractMemStore {
   @Override
   public MemStoreSize size() {
     MemStoreSizing memstoreSizing = new MemStoreSizing();
-    memstoreSizing.incMemStoreSize(this.active.keySize(), this.active.heapSize());
+    memstoreSizing.incMemStoreSize(active.getMemStoreSize());
     for (Segment item : pipeline.getSegments()) {
-      memstoreSizing.incMemStoreSize(item.keySize(), item.heapSize());
+      memstoreSizing.incMemStoreSize(item.getMemStoreSize());
     }
     return memstoreSizing;
   }
@@ -231,13 +231,13 @@ public class CompactingMemStore extends AbstractMemStore {
       // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
       if (compositeSnapshot) {
         snapshotSizing = pipeline.getPipelineSizing();
-        snapshotSizing.incMemStoreSize(this.active.keySize(), this.active.heapSize());
+        snapshotSizing.incMemStoreSize(active.getMemStoreSize());
       } else {
         snapshotSizing = pipeline.getTailSizing();
       }
     }
     return snapshotSizing.getDataSize() > 0 ? snapshotSizing
-        : new MemStoreSize(this.active.keySize(), this.active.heapSize());
+        : new MemStoreSize(active.getMemStoreSize());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 461f900..adf57d3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -138,16 +138,25 @@ public class CompactionPipeline {
       if(segment != null) newDataSize = segment.keySize();
       long dataSizeDelta = suffixDataSize - newDataSize;
       long suffixHeapSize = getSegmentsHeapSize(suffix);
+      long suffixOffHeapSize = getSegmentsOffHeapSize(suffix);
       long newHeapSize = 0;
-      if(segment != null) newHeapSize = segment.heapSize();
+      long newOffHeapSize = 0;
+      if(segment != null) {
+        newHeapSize = segment.heapSize();
+        newOffHeapSize = segment.offHeapSize();
+      }
+      long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
       long heapSizeDelta = suffixHeapSize - newHeapSize;
-      region.addMemStoreSize(new MemStoreSizing(-dataSizeDelta, -heapSizeDelta));
-      LOG.debug("Suffix data size={}, new segment data size={}, suffix heap size={}," +
-              "new segment heap size={}",
-          suffixDataSize,
-          newDataSize,
-          suffixHeapSize,
-          newHeapSize);
+      region.addMemStoreSize(new MemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta));
+      LOG.debug("Suffix data size={}, new segment data size={}, "
+              + "suffix heap size={}," + "new segment heap size={}"
+              + "suffix off heap size={}," + "new segment off heap size={}"
+          , suffixDataSize
+          , newDataSize
+          , suffixHeapSize
+          , newHeapSize
+          , suffixOffHeapSize
+          , newOffHeapSize);
     }
     return true;
   }
@@ -160,6 +169,14 @@ public class CompactionPipeline {
     return res;
   }
 
+  private static long getSegmentsOffHeapSize(List<? extends Segment> list) {
+    long res = 0;
+    for (Segment segment : list) {
+      res += segment.offHeapSize();
+    }
+    return res;
+  }
+
   private static long getSegmentsKeySize(List<? extends Segment> list) {
     long res = 0;
     for (Segment segment : list) {
@@ -201,7 +218,8 @@ public class CompactionPipeline {
           if(region != null) {
             // update the global memstore size counter
             // upon flattening there is no change in the data size
-            region.addMemStoreSize(new MemStoreSize(0, newMemstoreAccounting.getHeapSize()));
+            region.addMemStoreSize(new MemStoreSize(0, newMemstoreAccounting.getHeapSize(),
+                newMemstoreAccounting.getOffHeapSize()));
           }
           LOG.debug("Compaction pipeline segment {} flattened", s);
           return true;
@@ -239,19 +257,16 @@ public class CompactionPipeline {
   public MemStoreSizing getTailSizing() {
     LinkedList<? extends Segment> localCopy = readOnlyCopy;
     if (localCopy.isEmpty()) return new MemStoreSizing();
-    return new MemStoreSizing(localCopy.peekLast().keySize(), localCopy.peekLast().heapSize());
+    return new MemStoreSizing(localCopy.peekLast().getMemStoreSize());
   }
 
   public MemStoreSizing getPipelineSizing() {
-    long keySize = 0;
-    long heapSize = 0;
+    MemStoreSizing memStoreSizing = new MemStoreSizing();
     LinkedList<? extends Segment> localCopy = readOnlyCopy;
-    if (localCopy.isEmpty()) return new MemStoreSizing();
     for (Segment segment : localCopy) {
-      keySize += segment.keySize();
-      heapSize += segment.heapSize();
+      memStoreSizing.incMemStoreSize(segment.getMemStoreSize());
     }
-    return new MemStoreSizing(keySize, heapSize);
+    return memStoreSizing;
   }
 
   private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
index 8bd990a..b6bbb59 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
@@ -43,7 +43,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
   private long keySize = 0;
 
   public CompositeImmutableSegment(CellComparator comparator, List<ImmutableSegment> segments) {
-    super(comparator);
+    super(comparator, segments);
     this.segments = segments;
     for (ImmutableSegment s : segments) {
       this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax());
@@ -87,14 +87,6 @@ public class CompositeImmutableSegment extends ImmutableSegment {
   }
 
   /**
-   * @return the first cell in the segment that has equal or greater key than the given cell
-   */
-  @Override
-  public Cell getFirstAfter(Cell cell) {
-    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
-  }
-
-  /**
    * Closing a segment before it is being discarded
    */
   @Override
@@ -206,7 +198,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
    * Updates the heap size counter of the segment by the given delta
    */
   @Override
-  protected void incSize(long delta, long heapOverhead) {
+  protected void incSize(long delta, long heapOverhead, long offHeapOverhead) {
     throw new IllegalStateException("Not supported by CompositeImmutableScanner");
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 061e4d0..9ef6a6c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -106,7 +106,7 @@ public class DefaultMemStore extends AbstractMemStore {
   public MemStoreSize getFlushableSize() {
     MemStoreSize snapshotSize = getSnapshotSize();
     return snapshotSize.getDataSize() > 0 ? snapshotSize
-        : new MemStoreSize(keySize(), heapSize());
+        : new MemStoreSize(active.getMemStoreSize());
   }
 
   @Override
@@ -155,7 +155,7 @@ public class DefaultMemStore extends AbstractMemStore {
 
   @Override
   public MemStoreSize size() {
-    return new MemStoreSize(this.active.keySize(), this.active.heapSize());
+    return new MemStoreSize(active.getMemStoreSize());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
index 0f01178..1ca20a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
@@ -44,7 +44,7 @@ public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy {
       // Family number might also be zero in some of our unit test case
       return;
     }
-    this.flushSizeLowerBound = getFlushSizeLowerBound(region);
+    setFlushSizeLowerBounds(region);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
index 2d2de24..4da1857 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
@@ -43,10 +43,11 @@ public abstract class FlushLargeStoresPolicy extends FlushPolicy {
 
   protected long flushSizeLowerBound = -1;
 
-  protected long getFlushSizeLowerBound(HRegion region) { int familyNumber = region.getTableDescriptor().getColumnFamilyCount();
+  protected void setFlushSizeLowerBounds(HRegion region) {
+    int familyNumber = region.getTableDescriptor().getColumnFamilyCount();
     // For multiple families, lower bound is the "average flush size" by default
     // unless setting in configuration is larger.
-    long flushSizeLowerBound = region.getMemStoreFlushSize() / familyNumber;
+    flushSizeLowerBound = region.getMemStoreFlushSize() / familyNumber;
     long minimumLowerBound =
         getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
           DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN);
@@ -57,36 +58,45 @@ public abstract class FlushLargeStoresPolicy extends FlushPolicy {
     String flushedSizeLowerBoundString =
         region.getTableDescriptor().getValue(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
     if (flushedSizeLowerBoundString == null) {
-      LOG.debug("No {} set in table {} descriptor;" +
-          "using region.getMemStoreFlushSize/# of families ({}) instead.",
-          HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
-          region.getTableDescriptor().getTableName(),
-          StringUtils.humanSize(flushSizeLowerBound) + ")");
+      LOG.debug("No {} set in table {} descriptor;"
+          + "using region.getMemStoreFlushHeapSize/# of families ({}) "
+          + "instead."
+          , HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND
+          , region.getTableDescriptor().getTableName()
+          , StringUtils.humanSize(flushSizeLowerBound)
+          + ")");
     } else {
       try {
         flushSizeLowerBound = Long.parseLong(flushedSizeLowerBoundString);
       } catch (NumberFormatException nfe) {
         // fall back for fault setting
-        LOG.warn("Number format exception parsing {} for table {}: {}, {}; " +
-            "using region.getMemStoreFlushSize/# of families ({}) instead.",
-            HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
-            region.getTableDescriptor().getTableName(),
-            flushedSizeLowerBoundString,
-            nfe,
-            flushSizeLowerBound);
+        LOG.warn("Number format exception parsing {} for table {}: {}, {}; "
+                + "using region.getMemStoreFlushHeapSize/# of families ({}) "
+                + "and region.getMemStoreFlushOffHeapSize/# of families ({}) "
+                + "instead."
+            , HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND
+            , region.getTableDescriptor().getTableName()
+            , flushedSizeLowerBoundString
+            , nfe
+            , flushSizeLowerBound
+        );
 
       }
     }
-    return flushSizeLowerBound;
   }
 
   protected boolean shouldFlush(HStore store) {
-    if (store.getMemStoreSize().getDataSize() > this.flushSizeLowerBound) {
-      LOG.debug("Flush {} of {}; memstoreSize={} > lowerBound={}",
-          store.getColumnFamilyName(),
-          region.getRegionInfo().getEncodedName(),
-          store.getMemStoreSize().getDataSize(),
-          this.flushSizeLowerBound);
+    if (store.getMemStoreSize().getHeapSize()
+        + store.getMemStoreSize().getOffHeapSize() > this.flushSizeLowerBound) {
+      LOG.debug("Flush {} of {}; "
+              + "heap memstoreSize={} +"
+              + "off heap memstoreSize={} > memstore lowerBound={}"
+          , store.getColumnFamilyName()
+          , region.getRegionInfo().getEncodedName()
+          , store.getMemStoreSize().getHeapSize()
+          , store.getMemStoreSize().getOffHeapSize()
+          , this.flushSizeLowerBound
+      );
       return true;
     }
     return false;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
index ed23e3d..e95de9d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
@@ -63,7 +63,7 @@ public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy {
   @Override
   protected void configureForRegion(HRegion region) {
     super.configureForRegion(region);
-    this.flushSizeLowerBound = getFlushSizeLowerBound(region);
+    setFlushSizeLowerBounds(region);
     for (HStore store : region.stores.values()) {
       if (store.isSloppyMemStore()) {
         sloppyStores.add(store);

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 9464fdb..e26cc43 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -65,7 +65,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -287,7 +286,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   // TODO: account for each registered handler in HeapSize computation
   private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap();
 
-  private final AtomicLong memstoreDataSize = new AtomicLong(0);// Track data size in all memstores
+  // Track data size in all memstores
+  private final MemStoreSizing memStoreSize = new MemStoreSizing();
   private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this);
 
   // Debug possible data loss due to WAL off
@@ -829,12 +829,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     if (flushSize <= 0) {
       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
-        TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE);
+          TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE);
     }
     this.memstoreFlushSize = flushSize;
-    this.blockingMemStoreSize = this.memstoreFlushSize *
-        conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
-                HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
+    long mult = conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
+        HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
+    this.blockingMemStoreSize = this.memstoreFlushSize * mult;
   }
 
   /**
@@ -1192,32 +1192,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   /**
    * Increase the size of mem store in this region and the size of global mem
    * store
-   * @return the size of memstore in this region
    */
-  public long addAndGetMemStoreSize(MemStoreSize memstoreSize) {
+  public void incMemStoreSize(MemStoreSize memStoreSize) {
     if (this.rsAccounting != null) {
-      rsAccounting.incGlobalMemStoreSize(memstoreSize);
+      rsAccounting.incGlobalMemStoreSize(memStoreSize);
     }
-    long size = this.memstoreDataSize.addAndGet(memstoreSize.getDataSize());
-    checkNegativeMemStoreDataSize(size, memstoreSize.getDataSize());
-    return size;
+    long dataSize;
+    synchronized (this.memStoreSize) {
+      this.memStoreSize.incMemStoreSize(memStoreSize);
+      dataSize = this.memStoreSize.getDataSize();
+    }
+    checkNegativeMemStoreDataSize(dataSize, memStoreSize.getDataSize());
   }
 
-  public void decrMemStoreSize(MemStoreSize memstoreSize) {
+  public void decrMemStoreSize(MemStoreSize memStoreSize) {
     if (this.rsAccounting != null) {
-      rsAccounting.decGlobalMemStoreSize(memstoreSize);
+      rsAccounting.decGlobalMemStoreSize(memStoreSize);
+    }
+    long size;
+    synchronized (this.memStoreSize) {
+      this.memStoreSize.decMemStoreSize(memStoreSize);
+      size = this.memStoreSize.getDataSize();
     }
-    long size = this.memstoreDataSize.addAndGet(-memstoreSize.getDataSize());
-    checkNegativeMemStoreDataSize(size, -memstoreSize.getDataSize());
+    checkNegativeMemStoreDataSize(size, -memStoreSize.getDataSize());
   }
 
-  private void checkNegativeMemStoreDataSize(long memstoreDataSize, long delta) {
-    // This is extremely bad if we make memstoreSize negative. Log as much info on the offending
+  private void checkNegativeMemStoreDataSize(long memStoreDataSize, long delta) {
+    // This is extremely bad if we make memStoreSize negative. Log as much info on the offending
     // caller as possible. (memStoreSize might be a negative value already -- freeing memory)
-    if (memstoreDataSize < 0) {
+    if (memStoreDataSize < 0) {
       LOG.error("Asked to modify this region's (" + this.toString()
-          + ") memstoreSize to a negative value which is incorrect. Current memstoreSize="
-          + (memstoreDataSize - delta) + ", delta=" + delta, new Exception());
+          + ") memStoreSize to a negative value which is incorrect. Current memStoreSize="
+          + (memStoreDataSize - delta) + ", delta=" + delta, new Exception());
     }
   }
 
@@ -1250,8 +1256,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   @Override
-  public long getMemStoreSize() {
-    return memstoreDataSize.get();
+  public long getMemStoreDataSize() {
+    return memStoreSize.getDataSize();
+  }
+
+  @Override
+  public long getMemStoreHeapSize() {
+    return memStoreSize.getHeapSize();
+  }
+
+  @Override
+  public long getMemStoreOffHeapSize() {
+    return memStoreSize.getOffHeapSize();
   }
 
   /** @return store services for this region, to access services required by store level needs */
@@ -1521,7 +1537,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         int failedfFlushCount = 0;
         int flushCount = 0;
         long tmp = 0;
-        long remainingSize = this.memstoreDataSize.get();
+        long remainingSize = this.memStoreSize.getDataSize();
         while (remainingSize > 0) {
           try {
             internalFlushcache(status);
@@ -1530,7 +1546,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                   " (carrying snapshot?) " + this);
             }
             flushCount++;
-            tmp = this.memstoreDataSize.get();
+            tmp = this.memStoreSize.getDataSize();
             if (tmp >= remainingSize) {
               failedfFlushCount++;
             }
@@ -1570,7 +1586,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
               getRegionServerServices().abort("Assertion failed while closing store "
                 + getRegionInfo().getRegionNameAsString() + " " + store
                 + ". flushableSize expected=0, actual= " + flushableSize
-                + ". Current memstoreSize=" + getMemStoreSize() + ". Maybe a coprocessor "
+                + ". Current memStoreSize=" + getMemStoreDataSize() + ". Maybe a coprocessor "
                 + "operation failed and left the memstore in a partially updated state.", null);
             }
           }
@@ -1613,9 +1629,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       this.closed.set(true);
       if (!canFlush) {
-        this.decrMemStoreSize(new MemStoreSizing(memstoreDataSize.get(), getMemStoreHeapSize()));
-      } else if (memstoreDataSize.get() != 0) {
-        LOG.error("Memstore size is " + memstoreDataSize.get());
+        this.decrMemStoreSize(new MemStoreSize(memStoreSize));
+      } else if (memStoreSize.getDataSize() != 0) {
+        LOG.error("Memstore data size is " + memStoreSize.getDataSize());
       }
       if (coprocessorHost != null) {
         status.setStatus("Running coprocessor post-close hooks");
@@ -1635,10 +1651,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
-  private long getMemStoreHeapSize() {
-    return stores.values().stream().mapToLong(s -> s.getMemStoreSize().getHeapSize()).sum();
-  }
-
   /** Wait for all current flushes and compactions of the region to complete */
   // TODO HBASE-18906. Check the usage (if any) in Phoenix and expose this or give alternate way for
   // Phoenix needs.
@@ -1752,7 +1764,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     * @return True if its worth doing a flush before we put up the close flag.
     */
   private boolean worthPreFlushing() {
-    return this.memstoreDataSize.get() >
+    return this.memStoreSize.getDataSize() >
       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
   }
 
@@ -2370,12 +2382,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // bulk loaded file between memory and existing hfiles. It wants a good seqeunceId that belongs
     // to no other that it can use to associate with the bulk load. Hence this little dance below
     // to go get one.
-    if (this.memstoreDataSize.get() <= 0) {
+    if (this.memStoreSize.getDataSize() <= 0) {
       // Take an update lock so no edits can come into memory just yet.
       this.updatesLock.writeLock().lock();
       WriteEntry writeEntry = null;
       try {
-        if (this.memstoreDataSize.get() <= 0) {
+        if (this.memStoreSize.getDataSize() <= 0) {
           // Presume that if there are still no edits in the memstore, then there are no edits for
           // this region out in the WAL subsystem so no need to do any trickery clearing out
           // edits in the WAL sub-system. Up the sequence number so the resulting flush id is for
@@ -2511,8 +2523,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             .append(StringUtils.byteDesc(store.getFlushableSize().getDataSize()));
       }
     }
-    LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
-        " column families, memstore=" + StringUtils.byteDesc(this.memstoreDataSize.get()) +
+    LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + " column families," +
+        " memstore data size=" + StringUtils.byteDesc(this.memStoreSize.getDataSize()) +
+        " memstore heap size=" + StringUtils.byteDesc(this.memStoreSize.getHeapSize()) +
         ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
         ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId));
   }
@@ -2699,11 +2712,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     long time = EnvironmentEdgeManager.currentTime() - startTime;
-    long memstoresize = this.memstoreDataSize.get();
-    String msg = "Finished memstore flush of ~"
-        + StringUtils.byteDesc(prepareResult.totalFlushableSize.getDataSize()) + "/"
-        + prepareResult.totalFlushableSize.getDataSize() + ", currentsize="
-        + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
+    long flushableDataSize = prepareResult.totalFlushableSize.getDataSize();
+    long flushableHeapSize = prepareResult.totalFlushableSize.getHeapSize();
+    long memstoresize = this.memStoreSize.getDataSize();
+    String msg = "Finished memstore flush."
+        + " Flushed data size ~" + StringUtils.byteDesc(flushableDataSize) + "/" + flushableDataSize
+        + " Flushed Heap size ~" + StringUtils.byteDesc(flushableHeapSize) + "/" + flushableHeapSize
+        + ", currentsize=" + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
         + " for region " + this + " in " + time + "ms, sequenceid="
         + flushOpSeqId +  ", compaction requested=" + compactionRequested
         + ((wal == null) ? "; wal=null" : "");
@@ -3037,7 +3052,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         return true;
       });
       // update memStore size
-      region.addAndGetMemStoreSize(memStoreAccounting);
+      region.incMemStoreSize(memStoreAccounting);
     }
 
     public boolean isDone() {
@@ -3806,8 +3821,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           initialized = true;
         }
         doMiniBatchMutate(batchOp);
-        long newSize = this.getMemStoreSize();
-        requestFlushIfNeeded(newSize);
+        requestFlushIfNeeded();
       }
     } finally {
       batchOp.closeRegionOperation();
@@ -4165,7 +4179,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // If catalog region, do not impose resource constraints or block updates.
     if (this.getRegionInfo().isMetaRegion()) return;
 
-    if (this.memstoreDataSize.get() > this.blockingMemStoreSize) {
+    if (this.memStoreSize.getHeapSize()
+        + this.memStoreSize.getOffHeapSize() > this.blockingMemStoreSize) {
       blockedRequestsCount.increment();
       requestFlush();
       // Don't print current limit because it will vary too much. The message is used as a key
@@ -4293,8 +4308,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @param size
    * @return True if size is over the flush threshold
    */
-  private boolean isFlushSize(final long size) {
-    return size > this.memstoreFlushSize;
+  private boolean isFlushSize(MemStoreSize size) {
+    return size.getHeapSize() + size.getOffHeapSize() > getMemStoreFlushSize();
   }
 
   /**
@@ -4585,7 +4600,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(),
                 memstoreSize);
           }
-          flush = isFlushSize(this.addAndGetMemStoreSize(memstoreSize));
+          incMemStoreSize(memstoreSize);
+          flush = isFlushSize(this.memStoreSize);
           if (flush) {
             internalFlushcache(null, currentEditSeqId, stores.values(), status, false,
               FlushLifeCycleTracker.DUMMY);
@@ -6522,7 +6538,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             scannerContext.incrementBatchProgress(results.size());
             for (Cell cell : results) {
               scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
-                PrivateCellUtil.estimatedHeapSizeOf(cell));
+                PrivateCellUtil.estimatedSizeOfCell(cell));
             }
           }
 
@@ -7264,7 +7280,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return null;
     }
     ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
-    stats.setMemStoreLoad((int) (Math.min(100, (this.memstoreDataSize.get() * 100) / this
+    stats.setMemStoreLoad((int) (Math.min(100, (this.memStoreSize.getHeapSize() * 100) / this
         .memstoreFlushSize)));
     if (rsServices.getHeapMemoryManager() != null) {
       // the HeapMemoryManager uses -0.0 to signal a problem asking the JVM,
@@ -7412,8 +7428,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     } finally {
       closeRegionOperation();
       if (!mutations.isEmpty()) {
-        long newSize = this.addAndGetMemStoreSize(memstoreAccounting);
-        requestFlushIfNeeded(newSize);
+        this.incMemStoreSize(memstoreAccounting);
+        requestFlushIfNeeded();
       }
     }
   }
@@ -7566,9 +7582,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         rowLock.release();
       }
       // Request a cache flush if over the limit.  Do it outside update lock.
-      if (isFlushSize(addAndGetMemStoreSize(memstoreAccounting))) {
-        requestFlush();
-      }
+      incMemStoreSize(memstoreAccounting);
+      requestFlushIfNeeded();
       closeRegionOperation(op);
       if (this.metricsRegion != null) {
         switch (op) {
@@ -7894,7 +7909,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
       ClassSize.OBJECT + // closeLock
       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
-      (4 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL,
+      (3 * ClassSize.ATOMIC_LONG) + // numPutsWithoutWAL, dataInMemoryWithoutWAL,
                                     // compactionsFailed
       (2 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, scannerReadPoints
       WriteState.HEAP_SIZE + // writestate
@@ -7935,8 +7950,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
     if (coprocessorServiceHandlers.containsKey(serviceName)) {
       LOG.error("Coprocessor service " + serviceName +
-              " already registered, rejecting request from " + instance
-      );
+          " already registered, rejecting request from " + instance);
       return false;
     }
 
@@ -8211,8 +8225,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     dataInMemoryWithoutWAL.add(mutationSize);
   }
 
-  private void lock(final Lock lock)
-      throws RegionTooBusyException, InterruptedIOException {
+  private void lock(final Lock lock) throws RegionTooBusyException, InterruptedIOException {
     lock(lock, 1);
   }
 
@@ -8401,6 +8414,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return this.memstoreFlushSize;
   }
 
+
   //// method for debugging tests
   void throwException(String title, String regionName) {
     StringBuilder buf = new StringBuilder();
@@ -8416,7 +8430,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
     buf.append("end-of-stores");
     buf.append(", memstore size ");
-    buf.append(getMemStoreSize());
+    buf.append(getMemStoreDataSize());
     if (getRegionInfo().getRegionNameAsString().startsWith(regionName)) {
       throw new RuntimeException(buf.toString());
     }
@@ -8447,8 +8461,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       RpcServer.getRequestUser().orElse(null));
   }
 
-  private void requestFlushIfNeeded(long memstoreTotalSize) throws RegionTooBusyException {
-    if (memstoreTotalSize > this.getMemStoreFlushSize()) {
+  private void requestFlushIfNeeded() throws RegionTooBusyException {
+    if(isFlushSize(memStoreSize)) {
       requestFlush();
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 0d59b12..e2aac03 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1606,7 +1606,7 @@ public class HRegionServer extends HasThread implements
     int storefiles = 0;
     int storeUncompressedSizeMB = 0;
     int storefileSizeMB = 0;
-    int memstoreSizeMB = (int) (r.getMemStoreSize() / 1024 / 1024);
+    int memstoreSizeMB = (int) (r.getMemStoreDataSize() / 1024 / 1024);
     long storefileIndexSizeKB = 0;
     int rootLevelIndexSizeKB = 0;
     int totalStaticIndexSizeKB = 0;
@@ -2743,11 +2743,11 @@ public class HRegionServer extends HasThread implements
   }
 
   /**
-   * @return A new Map of online regions sorted by region size with the first entry being the
-   * biggest.  If two regions are the same size, then the last one found wins; i.e. this method
-   * may NOT return all regions.
+   * @return A new Map of online regions sorted by region off-heap size with the first entry being
+   *   the biggest.  If two regions are the same size, then the last one found wins; i.e. this
+   *   method may NOT return all regions.
    */
-  SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
+  SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOffHeapSize() {
     // we'll sort the regions in reverse
     SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(
         new Comparator<Long>() {
@@ -2758,7 +2758,28 @@ public class HRegionServer extends HasThread implements
         });
     // Copy over all regions. Regions are sorted by size with biggest first.
     for (HRegion region : this.onlineRegions.values()) {
-      sortedRegions.put(region.getMemStoreSize(), region);
+      sortedRegions.put(region.getMemStoreOffHeapSize(), region);
+    }
+    return sortedRegions;
+  }
+
+  /**
+   * @return A new Map of online regions sorted by region heap size with the first entry being the
+   *   biggest.  If two regions are the same size, then the last one found wins; i.e. this method
+   *   may NOT return all regions.
+   */
+  SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOnHeapSize() {
+    // we'll sort the regions in reverse
+    SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(
+        new Comparator<Long>() {
+          @Override
+          public int compare(Long a, Long b) {
+            return -1 * a.compareTo(b);
+          }
+        });
+    // Copy over all regions. Regions are sorted by size with biggest first.
+    for (HRegion region : this.onlineRegions.values()) {
+      sortedRegions.put(region.getMemStoreHeapSize(), region);
     }
     return sortedRegions;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index f283a65..bef50b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2188,7 +2188,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
       this.cacheFlushCount = snapshot.getCellsCount();
       this.cacheFlushSize = snapshot.getDataSize();
       committedFiles = new ArrayList<>(1);
-      return new MemStoreSize(snapshot.getDataSize(), snapshot.getHeapSize());
+      return new MemStoreSize(snapshot.getMemStoreSize());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
index 871f526..71648a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
@@ -133,4 +133,16 @@ public class ImmutableMemStoreLAB implements MemStoreLAB {
       checkAndCloseMSLABs(count);
     }
   }
+
+  @Override
+  public boolean isOnHeap() {
+    return !isOffHeap();
+  }
+
+  @Override
+  public boolean isOffHeap() {
+    return ChunkCreator.getInstance().isOffheap();
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index c899eab..b781aab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -48,6 +48,10 @@ public abstract class ImmutableSegment extends Segment {
     super(comparator, TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC));
   }
 
+  protected ImmutableSegment(CellComparator comparator, List<ImmutableSegment> segments) {
+    super(comparator, segments, TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC));
+  }
+
   /**------------------------------------------------------------------------
    * C-tor to be used to build the derived classes
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index d7c7c5a..6e4191e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -87,6 +87,8 @@ class MemStoreFlusher implements FlushRequester {
   private final FlushHandler[] flushHandlers;
   private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1);
 
+  private FlushType flushType;
+
   /**
    * @param conf
    * @param server
@@ -116,6 +118,10 @@ class MemStoreFlusher implements FlushRequester {
     return this.updatesBlockedMsHighWater;
   }
 
+  public void setFlushType(FlushType flushType) {
+    this.flushType = flushType;
+  }
+
   /**
    * The memstore across all regions has exceeded the low water mark. Pick
    * one region to flush and flush it synchronously (this is called from the
@@ -123,7 +129,17 @@ class MemStoreFlusher implements FlushRequester {
    * @return true if successful
    */
   private boolean flushOneForGlobalPressure() {
-    SortedMap<Long, HRegion> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize();
+    SortedMap<Long, HRegion> regionsBySize = null;
+    switch(flushType) {
+      case ABOVE_OFFHEAP_HIGHER_MARK:
+      case ABOVE_OFFHEAP_LOWER_MARK:
+        regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize();
+        break;
+      case ABOVE_ONHEAP_HIGHER_MARK:
+      case ABOVE_ONHEAP_LOWER_MARK:
+      default:
+        regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize();
+    }
     Set<HRegion> excludedRegions = new HashSet<>();
 
     double secondaryMultiplier
@@ -147,8 +163,25 @@ class MemStoreFlusher implements FlushRequester {
       }
 
       HRegion regionToFlush;
+      long bestAnyRegionSize;
+      long bestFlushableRegionSize;
+      switch(flushType) {
+        case ABOVE_OFFHEAP_HIGHER_MARK:
+        case ABOVE_OFFHEAP_LOWER_MARK:
+          bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize();
+          bestFlushableRegionSize = bestFlushableRegion.getMemStoreOffHeapSize();
+          break;
+        case ABOVE_ONHEAP_HIGHER_MARK:
+        case ABOVE_ONHEAP_LOWER_MARK:
+          bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize();
+          bestFlushableRegionSize = bestFlushableRegion.getMemStoreHeapSize();
+          break;
+        default:
+          bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize();
+          bestFlushableRegionSize = bestFlushableRegion.getMemStoreDataSize();
+      }
       if (bestFlushableRegion != null &&
-          bestAnyRegion.getMemStoreSize() > 2 * bestFlushableRegion.getMemStoreSize()) {
+          bestAnyRegionSize > 2 * bestFlushableRegionSize) {
         // Even if it's not supposed to be flushed, pick a region if it's more than twice
         // as big as the best flushable one - otherwise when we're under pressure we make
         // lots of little flushes and cause lots of compactions, etc, which just makes
@@ -157,9 +190,10 @@ class MemStoreFlusher implements FlushRequester {
           LOG.debug("Under global heap pressure: " + "Region "
               + bestAnyRegion.getRegionInfo().getRegionNameAsString()
               + " has too many " + "store files, but is "
-              + TraditionalBinaryPrefix.long2String(bestAnyRegion.getMemStoreSize(), "", 1)
+              + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1)
               + " vs best flushable region's "
-              + TraditionalBinaryPrefix.long2String(bestFlushableRegion.getMemStoreSize(), "", 1)
+              + TraditionalBinaryPrefix.long2String(
+              bestFlushableRegionSize, "", 1)
               + ". Choosing the bigger.");
         }
         regionToFlush = bestAnyRegion;
@@ -171,19 +205,36 @@ class MemStoreFlusher implements FlushRequester {
         }
       }
 
+      long regionToFlushSize;
+      long bestRegionReplicaSize;
+      switch(flushType) {
+        case ABOVE_OFFHEAP_HIGHER_MARK:
+        case ABOVE_OFFHEAP_LOWER_MARK:
+          regionToFlushSize = regionToFlush.getMemStoreOffHeapSize();
+          bestRegionReplicaSize = bestRegionReplica.getMemStoreOffHeapSize();
+          break;
+        case ABOVE_ONHEAP_HIGHER_MARK:
+        case ABOVE_ONHEAP_LOWER_MARK:
+          regionToFlushSize = regionToFlush.getMemStoreHeapSize();
+          bestRegionReplicaSize = bestRegionReplica.getMemStoreHeapSize();
+          break;
+        default:
+          regionToFlushSize = regionToFlush.getMemStoreDataSize();
+          bestRegionReplicaSize = bestRegionReplica.getMemStoreDataSize();
+      }
+
       Preconditions.checkState(
-        (regionToFlush != null && regionToFlush.getMemStoreSize() > 0) ||
-        (bestRegionReplica != null && bestRegionReplica.getMemStoreSize() > 0));
+        (regionToFlush != null && regionToFlushSize > 0) ||
+        (bestRegionReplica != null && bestRegionReplicaSize > 0));
 
       if (regionToFlush == null ||
           (bestRegionReplica != null &&
            ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
-           (bestRegionReplica.getMemStoreSize()
-               > secondaryMultiplier * regionToFlush.getMemStoreSize()))) {
+           (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize))) {
         LOG.info("Refreshing storefiles of region " + bestRegionReplica +
-            " due to global heap pressure. Total memstore datasize=" +
+            " due to global heap pressure. Total memstore off heap size=" +
             TraditionalBinaryPrefix.long2String(
-              server.getRegionServerAccounting().getGlobalMemStoreDataSize(), "", 1) +
+              server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
             " memstore heap size=" + TraditionalBinaryPrefix.long2String(
               server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1));
         flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
@@ -194,11 +245,15 @@ class MemStoreFlusher implements FlushRequester {
         }
       } else {
         LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " +
-            "Total Memstore size=" +
+            "Flush type=" + flushType.toString() +
+            "Total Memstore Heap size=" +
             TraditionalBinaryPrefix.long2String(
-              server.getRegionServerAccounting().getGlobalMemStoreDataSize(), "", 1) +
+                server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1) +
+            "Total Memstore Off-Heap size=" +
+            TraditionalBinaryPrefix.long2String(
+                server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
             ", Region memstore size=" +
-            TraditionalBinaryPrefix.long2String(regionToFlush.getMemStoreSize(), "", 1));
+            TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
         flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY);
 
         if (!flushedOne) {
@@ -582,6 +637,7 @@ class MemStoreFlusher implements FlushRequester {
         try {
           flushType = isAboveHighWaterMark();
           while (flushType != FlushType.NORMAL && !server.isStopped()) {
+            server.cacheFlusher.setFlushType(flushType);
             if (!blocked) {
               startTime = EnvironmentEdgeManager.currentTime();
               if (!server.getRegionServerAccounting().isOffheap()) {
@@ -592,7 +648,7 @@ class MemStoreFlusher implements FlushRequester {
                 switch (flushType) {
                 case ABOVE_OFFHEAP_HIGHER_MARK:
                   logMsg("the global offheap memstore datasize",
-                      server.getRegionServerAccounting().getGlobalMemStoreDataSize(),
+                      server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
                       server.getRegionServerAccounting().getGlobalMemStoreLimit());
                   break;
                 case ABOVE_ONHEAP_HIGHER_MARK:
@@ -633,8 +689,12 @@ class MemStoreFlusher implements FlushRequester {
           LOG.info("Unblocking updates for server " + server.toString());
         }
       }
-    } else if (isAboveLowWaterMark() != FlushType.NORMAL) {
-      wakeupFlushThread();
+    } else {
+      flushType = isAboveLowWaterMark();
+      if (flushType != FlushType.NORMAL) {
+        server.cacheFlusher.setFlushType(flushType);
+        wakeupFlushThread();
+      }
     }
     if(scope!= null) {
       scope.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
index 6bc8886..8b77981 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
@@ -114,7 +114,7 @@ public interface MemStoreLAB {
   */
   Chunk getNewExternalJumboChunk(int size);
 
-  public static MemStoreLAB newInstance(Configuration conf) {
+  static MemStoreLAB newInstance(Configuration conf) {
     MemStoreLAB memStoreLAB = null;
     if (isEnabled(conf)) {
       String className = conf.get(MSLAB_CLASS_NAME, MemStoreLABImpl.class.getName());
@@ -124,7 +124,11 @@ public interface MemStoreLAB {
     return memStoreLAB;
   }
 
-  public static boolean isEnabled(Configuration conf) {
+  static boolean isEnabled(Configuration conf) {
     return conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT);
   }
+
+  boolean isOnHeap();
+
+  boolean isOffHeap();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index f7728ac..4ff0480 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -317,6 +317,16 @@ public class MemStoreLABImpl implements MemStoreLAB {
     return c;
   }
 
+  @Override
+  public boolean isOnHeap() {
+    return !isOffHeap();
+  }
+
+  @Override
+  public boolean isOffHeap() {
+    return this.chunkCreator.isOffheap();
+  }
+
   @VisibleForTesting
   Chunk getCurrentChunk() {
     return this.curChunk.get();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java
index 557a61a..382e6e9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java
@@ -27,29 +27,58 @@ import org.apache.yetus.audience.InterfaceAudience;
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
 public class MemStoreSize {
+  // MemStore size tracks 3 sizes:
+  // (1) data size: the aggregated size of all key-value not including meta data such as
+  // index, time range etc.
+  // (2) heap size: the aggregated size of all data that is allocated on-heap including all
+  // key-values that reside on-heap and the metadata that resides on-heap
+  // (3) off-heap size: the aggregated size of all data that is allocated off-heap including all
+  // key-values that reside off-heap and the metadata that resides off-heap
+  //
+  // 3 examples to illustrate their usage:
+  // Consider a store with 100MB of key-values allocated on-heap and 20MB of metadata allocated
+  // on-heap. The counters are <100MB, 120MB, 0>, respectively.
+  // Consider a store with 100MB of key-values allocated off-heap and 20MB of metadata
+  // allocated on-heap (e.g, CAM index). The counters are <100MB, 20MB, 100MB>, respectively.
+  // Consider a store with 100MB of key-values from which 95MB are allocated off-heap and 5MB
+  // are allocated on-heap (e.g., due to upserts) and 20MB of metadata from which 15MB allocated
+  // off-heap (e.g, CCM index) and 5MB allocated on-heap (e.g, CSLM index in active).
+  // The counters are <100MB, 10MB, 110MB>, respectively.
+
   /**
    *'dataSize' tracks the Cell's data bytes size alone (Key bytes, value bytes). A cell's data can
    * be in on heap or off heap area depending on the MSLAB and its configuration to be using on heap
    * or off heap LABs
    */
-  protected long dataSize;
+  protected volatile long dataSize;
 
   /** 'heapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead.
    * When Cells in on heap area, this will include the cells data size as well.
    */
-  protected long heapSize;
+  protected volatile long heapSize;
+
+  /** off-heap size: the aggregated size of all data that is allocated off-heap including all
+   * key-values that reside off-heap and the metadata that resides off-heap
+   */
+  protected volatile long offHeapSize;
 
   public MemStoreSize() {
-    this(0L, 0L);
+    this(0L, 0L, 0L);
   }
 
-  public MemStoreSize(long dataSize, long heapSize) {
+  public MemStoreSize(long dataSize, long heapSize, long offHeapSize) {
     this.dataSize = dataSize;
     this.heapSize = heapSize;
+    this.offHeapSize = offHeapSize;
   }
 
+  protected MemStoreSize(MemStoreSize memStoreSize) {
+    this.dataSize = memStoreSize.dataSize;
+    this.heapSize = memStoreSize.heapSize;
+    this.offHeapSize = memStoreSize.offHeapSize;
+  }
   public boolean isEmpty() {
-    return this.dataSize == 0 && this.heapSize == 0;
+    return this.dataSize == 0 && this.heapSize == 0 && this.offHeapSize == 0;
   }
 
   public long getDataSize() {
@@ -60,24 +89,33 @@ public class MemStoreSize {
     return this.heapSize;
   }
 
+  public long getOffHeapSize() {
+    return this.offHeapSize;
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
     MemStoreSize other = (MemStoreSize) obj;
-    return this.dataSize == other.dataSize && this.heapSize == other.heapSize;
+    return this.dataSize == other.dataSize
+        && this.heapSize == other.heapSize
+        && this.offHeapSize == other.offHeapSize;
   }
 
   @Override
   public int hashCode() {
     long h = 13 * this.dataSize;
     h = h + 14 * this.heapSize;
+    h = h + 15 * this.offHeapSize;
     return (int) h;
   }
 
   @Override
   public String toString() {
-    return "dataSize=" + this.dataSize + " , heapSize=" + this.heapSize;
+    return "dataSize=" + this.dataSize
+        + " , heapSize=" + this.heapSize
+        + " , offHeapSize=" + this.offHeapSize;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java
index b13201d..0b3e925 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java
@@ -28,23 +28,14 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public class MemStoreSizing extends MemStoreSize {
   public static final MemStoreSizing DUD = new MemStoreSizing() {
-    @Override
-    public void incMemStoreSize(MemStoreSize delta) {
-      incMemStoreSize(delta.getDataSize(), delta.getHeapSize());
-    }
 
-    @Override
-    public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta) {
+    @Override public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta,
+        long offHeapSizeDelta) {
       throw new RuntimeException("I'm a dud, you can't use me!");
     }
 
-    @Override
-    public void decMemStoreSize(MemStoreSize delta) {
-      decMemStoreSize(delta.getDataSize(), delta.getHeapSize());
-    }
-
-    @Override
-    public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta) {
+    @Override public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta,
+        long offHeapSizeDelta) {
       throw new RuntimeException("I'm a dud, you can't use me!");
     }
   };
@@ -53,51 +44,38 @@ public class MemStoreSizing extends MemStoreSize {
     super();
   }
 
-  public MemStoreSizing(long dataSize, long heapSize) {
-    super(dataSize, heapSize);
+  public MemStoreSizing(long dataSize, long heapSize, long offHeapSize) {
+    super(dataSize, heapSize, offHeapSize);
   }
 
-  public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta) {
+  public MemStoreSizing(MemStoreSize memStoreSize) {
+    super(memStoreSize);
+  }
+
+  public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
     this.dataSize += dataSizeDelta;
     this.heapSize += heapSizeDelta;
+    this.offHeapSize += offHeapSizeDelta;
   }
 
   public void incMemStoreSize(MemStoreSize delta) {
-    incMemStoreSize(delta.getDataSize(), delta.getHeapSize());
+    incMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize());
   }
 
-  public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta) {
+  public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) {
     this.dataSize -= dataSizeDelta;
     this.heapSize -= heapSizeDelta;
+    this.offHeapSize -= offHeapSizeDelta;
   }
 
   public void decMemStoreSize(MemStoreSize delta) {
-    decMemStoreSize(delta.getDataSize(), delta.getHeapSize());
+    decMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize());
   }
 
   public void empty() {
     this.dataSize = 0L;
     this.heapSize = 0L;
+    this.offHeapSize = 0L;
   }
 
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == null || (getClass() != obj.getClass())) {
-      return false;
-    }
-    MemStoreSizing other = (MemStoreSizing) obj;
-    return this.dataSize == other.dataSize && this.heapSize == other.heapSize;
-  }
-
-  @Override
-  public int hashCode() {
-    long h = 13 * this.dataSize;
-    h = h + 14 * this.heapSize;
-    return (int) h;
-  }
-
-  @Override
-  public String toString() {
-    return "dataSize=" + this.dataSize + " , heapSize=" + this.heapSize;
-  }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
index 1a0317d..cbd60e5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
@@ -30,8 +30,7 @@ import java.util.List;
 public class MemStoreSnapshot implements Closeable {
   private final long id;
   private final int cellsCount;
-  private final long dataSize;
-  private final long heapSize;
+  private final MemStoreSize memStoreSize;
   private final TimeRangeTracker timeRangeTracker;
   private final List<KeyValueScanner> scanners;
   private final boolean tagsPresent;
@@ -39,8 +38,7 @@ public class MemStoreSnapshot implements Closeable {
   public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
     this.id = id;
     this.cellsCount = snapshot.getCellsCount();
-    this.dataSize = snapshot.keySize();
-    this.heapSize = snapshot.heapSize();
+    this.memStoreSize = snapshot.getMemStoreSize();
     this.timeRangeTracker = snapshot.getTimeRangeTracker();
     this.scanners = snapshot.getScanners(Long.MAX_VALUE, Long.MAX_VALUE);
     this.tagsPresent = snapshot.isTagsPresent();
@@ -60,15 +58,12 @@ public class MemStoreSnapshot implements Closeable {
     return cellsCount;
   }
 
-  /**
-   * @return Total memory size occupied by this snapshot.
-   */
   public long getDataSize() {
-    return dataSize;
+    return memStoreSize.getDataSize();
   }
 
-  public long getHeapSize() {
-    return heapSize;
+  public MemStoreSize getMemStoreSize() {
+    return memStoreSize;
   }
 
   /**
@@ -100,4 +95,5 @@ public class MemStoreSnapshot implements Closeable {
       }
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
index 168b42e..f06f747 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
@@ -72,7 +72,7 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
         for (Store store : r.getStores()) {
           tempStorefilesSize += store.getStorefilesSize();
         }
-        metricsTable.setMemStoresSize(metricsTable.getMemStoresSize() + r.getMemStoreSize());
+        metricsTable.setMemStoresSize(metricsTable.getMemStoresSize() + r.getMemStoreDataSize());
         metricsTable.setStoreFilesSize(metricsTable.getStoreFilesSize() + tempStorefilesSize);
         metricsTable.setTableSize(metricsTable.getMemStoresSize() + metricsTable.getStoreFilesSize());
         metricsTable.setReadRequestsCount(metricsTable.getReadRequestsCount() + r.getReadRequestsCount());

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
index fe7bdf9..1349921 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
@@ -44,7 +44,7 @@ public class MutableSegment extends Segment {
 
   protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
     super(cellSet, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.SYNC));
-    incSize(0,DEEP_OVERHEAD); // update the mutable segment metadata
+    incSize(0, DEEP_OVERHEAD, 0); // update the mutable segment metadata
   }
 
   /**
@@ -88,9 +88,10 @@ public class MutableSegment extends Segment {
             // removed cell is from MSLAB or not. Will do once HBASE-16438 is in
             int cellLen = getCellLength(cur);
             long heapSize = heapSizeChange(cur, true);
-            this.incSize(-cellLen, -heapSize);
+            long offHeapSize = offHeapSizeChange(cur, true);
+            this.incSize(-cellLen, -heapSize, -offHeapSize);
             if (memStoreSizing != null) {
-              memStoreSizing.decMemStoreSize(cellLen, heapSize);
+              memStoreSizing.decMemStoreSize(cellLen, heapSize, offHeapSize);
             }
             it.remove();
           } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3bb9b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 52d01fe..27771ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -150,7 +150,21 @@ public interface Region extends ConfigurationObserver {
    *         the memstores of this Region. Means size in bytes for key, value and tags within Cells.
    *         It wont consider any java heap overhead for the cell objects or any other.
    */
-  long getMemStoreSize();
+  long getMemStoreDataSize();
+
+  /**
+   * @return memstore heap size for this region, in bytes. It accounts data size of cells
+   *         added to the memstores of this Region, as well as java heap overhead for the cell
+   *         objects or any other.
+   */
+  long getMemStoreHeapSize();
+
+  /**
+   * @return memstore off-heap size for this region, in bytes. It accounts data size of cells
+   *         added to the memstores of this Region, as well as overhead for the cell
+   *         objects or any other that is allocated off-heap.
+   */
+  long getMemStoreOffHeapSize();
 
   /** @return the number of mutations processed bypassing the WAL */
   long getNumMutationsWithoutWAL();