You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2015/06/17 19:00:48 UTC

hbase git commit: HBASE-13876 Improving performance of HeapMemoryManager

Repository: hbase
Updated Branches:
  refs/heads/branch-1 5bf7945b5 -> 212cbf8f5


HBASE-13876 Improving performance of HeapMemoryManager


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/212cbf8f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/212cbf8f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/212cbf8f

Branch: refs/heads/branch-1
Commit: 212cbf8f5c3e26cc002088a118f9e99158b276bb
Parents: 5bf7945
Author: Elliott Clark <ec...@apache.org>
Authored: Tue Jun 16 13:06:15 2015 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Wed Jun 17 09:59:19 2015 -0700

----------------------------------------------------------------------
 .../regionserver/DefaultHeapMemoryTuner.java    | 248 +++++++++++++++++--
 .../hbase/regionserver/HRegionServer.java       |   3 +-
 .../hbase/regionserver/HeapMemoryManager.java   |  51 +++-
 .../hbase/util/RollingStatCalculator.java       | 113 +++++++++
 .../regionserver/TestHeapMemoryManager.java     | 199 +++++++++++++--
 5 files changed, 555 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/212cbf8f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java
index 5e97b80..93a95b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java
@@ -24,63 +24,234 @@ import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
 import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY;
 import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult;
+import org.apache.hadoop.hbase.util.RollingStatCalculator;
 
 /**
- * The default implementation for the HeapMemoryTuner. This will do simple checks to decide
- * whether there should be changes in the heap size of memstore/block cache. When there is no block
- * cache eviction at all but there are flushes because of global heap pressure, it will increase the
- * memstore heap size and decrease block cache size. The step value for this heap size change can be
- * specified using the config <i>hbase.regionserver.heapmemory.autotuner.step</i>. When there is no
- * memstore flushes because of heap pressure but there is block cache evictions it will increase the
- * block cache heap.
+ * The default implementation for the HeapMemoryTuner. This will do statistical checks on
+ * number of evictions, cache misses and flushes to decide whether there should be changes
+ * in the heap size of memstore/block cache. During each tuner operation tuner takes a step
+ * which can either be INCREASE_BLOCK_CACHE_SIZE (increase block cache size),
+ * INCREASE_MEMSTORE_SIZE (increase memstore size) and by default it is NEUTRAL (no change).
+ * We say block cache is sufficient when there is no block cache eviction at all or major amount of
+ * memory allocated to block cache is empty, similarly we say memory allocated for memstore is
+ * sufficient when there is no memstore flushes because of heap pressure or major amount of
+ * memory allocated to memstore is empty. If both are sufficient we do nothing, if exactly one of
+ * them is found to be sufficient we decrease its size by <i>step</i> and increase the other by
+ * same amount. If none of them is sufficient we do statistical analysis on number of cache misses
+ * and flushes to determine tuner direction. Based on these statistics we decide the tuner
+ * direction. If we are not confident about which step direction to take we do nothing and wait for
+ * next iteration. On expectation we will be tuning for at least 22% tuner calls. The number of
+ * past periods to consider for statistics calculation can be specified in config by
+ * <i>hbase.regionserver.heapmemory.autotuner.lookup.periods</i>. Also these many initial calls to
+ * tuner will be ignored (cache is warming up and we leave the system to reach steady state).
+ * After the tuner takes a step, in next call we insure that last call was indeed helpful and did
+ * not do us any harm. If not then we revert the previous step. The step size is dynamic and it
+ * changes based on current and previous tuning direction. When last tuner step was NEUTRAL
+ * and current tuning step is not NEUTRAL then we assume we are restarting the tuning process and
+ * step size is changed to maximum allowed size which can be specified  in config by
+ * <i>hbase.regionserver.heapmemory.autotuner.step.max</i>. If we are reverting the previous step
+ * then we decrease step size to half. This decrease is similar to binary search where we try to
+ * reach the most desired value. The minimum step size can be specified  in config by
+ * <i>hbase.regionserver.heapmemory.autotuner.step.max</i>. In other cases we leave step size
+ * unchanged.
  */
 @InterfaceAudience.Private
 class DefaultHeapMemoryTuner implements HeapMemoryTuner {
-
-  public static final String STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step";
-  public static final float DEFAULT_STEP_VALUE = 0.02f; // 2%
-
-  private static final TunerResult TUNER_RESULT = new TunerResult(true);
+  public static final String MAX_STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step.max";
+  public static final String MIN_STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step.min";
+  public static final String SUFFICIENT_MEMORY_LEVEL_KEY =
+      "hbase.regionserver.heapmemory.autotuner.sufficient.memory.level";
+  public static final String LOOKUP_PERIODS_KEY =
+      "hbase.regionserver.heapmemory.autotuner.lookup.periods";
+  public static final String NUM_PERIODS_TO_IGNORE =
+      "hbase.regionserver.heapmemory.autotuner.ignored.periods";
+  // Maximum step size that the tuner can take
+  public static final float DEFAULT_MAX_STEP_VALUE = 0.08f; // 8%
+  // Minimum step size that the tuner can take
+  public static final float DEFAULT_MIN_STEP_VALUE = 0.005f; // 0.5%
+  // If current block cache size or memstore size in use is below this level relative to memory
+  // provided to it then corresponding component will be considered to have sufficient memory
+  public static final float DEFAULT_SUFFICIENT_MEMORY_LEVEL_VALUE = 0.5f; // 50%
+  // Number of tuner periods that will be considered while calculating mean and deviation
+  // If set to zero, all stats will be calculated from the start
+  public static final int DEFAULT_LOOKUP_PERIODS = 60;
+  public static final int DEFAULT_NUM_PERIODS_IGNORED = 60;
   private static final TunerResult NO_OP_TUNER_RESULT = new TunerResult(false);
 
+  private Log LOG = LogFactory.getLog(DefaultHeapMemoryTuner.class);
+  private TunerResult TUNER_RESULT = new TunerResult(true);
   private Configuration conf;
-  private float step = DEFAULT_STEP_VALUE;
+  private float sufficientMemoryLevel = DEFAULT_SUFFICIENT_MEMORY_LEVEL_VALUE;
+  private float maximumStepSize = DEFAULT_MAX_STEP_VALUE;
+  private float minimumStepSize = DEFAULT_MIN_STEP_VALUE;
+  private int tunerLookupPeriods = DEFAULT_LOOKUP_PERIODS;
+  private int numPeriodsToIgnore = DEFAULT_NUM_PERIODS_IGNORED;
+  // Counter to ignore few initial periods while cache is still warming up
+  // Memory tuner will do no operation for the first "tunerLookupPeriods"
+  private int ignoreInitialPeriods = 0;
 
   private float globalMemStorePercentMinRange;
   private float globalMemStorePercentMaxRange;
   private float blockCachePercentMinRange;
   private float blockCachePercentMaxRange;
+  // Store statistics about the corresponding parameters for memory tuning
+  private RollingStatCalculator rollingStatsForCacheMisses;
+  private RollingStatCalculator rollingStatsForFlushes;
+  private RollingStatCalculator rollingStatsForEvictions;
+  // Set step size to max value for tuning, this step size will adjust dynamically while tuning
+  private float step = DEFAULT_MAX_STEP_VALUE;
+  private StepDirection prevTuneDirection = StepDirection.NEUTRAL;
 
   @Override
   public TunerResult tune(TunerContext context) {
     long blockedFlushCount = context.getBlockedFlushCount();
     long unblockedFlushCount = context.getUnblockedFlushCount();
     long evictCount = context.getEvictCount();
-    boolean memstoreSufficient = blockedFlushCount == 0 && unblockedFlushCount == 0;
-    boolean blockCacheSufficient = evictCount == 0;
-    if (memstoreSufficient && blockCacheSufficient) {
+    long cacheMissCount = context.getCacheMissCount();
+    long totalFlushCount = blockedFlushCount+unblockedFlushCount;
+    rollingStatsForCacheMisses.insertDataValue(cacheMissCount);
+    rollingStatsForFlushes.insertDataValue(totalFlushCount);
+    rollingStatsForEvictions.insertDataValue(evictCount);
+    StepDirection newTuneDirection = StepDirection.NEUTRAL;
+    if (ignoreInitialPeriods < numPeriodsToIgnore) {
+      // Ignoring the first few tuner periods
+      ignoreInitialPeriods++;
       return NO_OP_TUNER_RESULT;
     }
+    String tunerLog = "";
+    // We can consider memstore or block cache to be sufficient if
+    // we are using only a minor fraction of what have been already provided to it.
+    boolean earlyMemstoreSufficientCheck = totalFlushCount == 0
+            || context.getCurMemStoreUsed() < context.getCurMemStoreSize()*sufficientMemoryLevel;
+    boolean earlyBlockCacheSufficientCheck = evictCount == 0 ||
+            context.getCurBlockCacheUsed() < context.getCurBlockCacheSize()*sufficientMemoryLevel;
     float newMemstoreSize;
     float newBlockCacheSize;
-    if (memstoreSufficient) {
-      // Increase the block cache size and corresponding decrease in memstore size
-      newBlockCacheSize = context.getCurBlockCacheSize() + step;
-      newMemstoreSize = context.getCurMemStoreSize() - step;
-    } else if (blockCacheSufficient) {
-      // Increase the memstore size and corresponding decrease in block cache size
-      newBlockCacheSize = context.getCurBlockCacheSize() - step;
-      newMemstoreSize = context.getCurMemStoreSize() + step;
+    if (earlyMemstoreSufficientCheck && earlyBlockCacheSufficientCheck) {
+      // Both memstore and block cache memory seems to be sufficient. No operation required.
+      newTuneDirection = StepDirection.NEUTRAL;
+    } else if (earlyMemstoreSufficientCheck) {
+      // Increase the block cache size and corresponding decrease in memstore size.
+      newTuneDirection = StepDirection.INCREASE_BLOCK_CACHE_SIZE;
+    } else if (earlyBlockCacheSufficientCheck) {
+      // Increase the memstore size and corresponding decrease in block cache size.
+      newTuneDirection = StepDirection.INCREASE_MEMSTORE_SIZE;
     } else {
-      return NO_OP_TUNER_RESULT;
-      // As of now not making any tuning in write/read heavy scenario.
+      // Early checks for sufficient memory failed. Tuning memory based on past statistics.
+      // Boolean indicator to show if we need to revert previous step or not.
+      boolean isReverting = false;
+      switch (prevTuneDirection) {
+      // Here we are using number of evictions rather than cache misses because it is more
+      // strong indicator for deficient cache size. Improving caching is what we
+      // would like to optimize for in steady state.
+      case INCREASE_BLOCK_CACHE_SIZE:
+        if ((double)evictCount > rollingStatsForEvictions.getMean() ||
+            (double)totalFlushCount > rollingStatsForFlushes.getMean() +
+            rollingStatsForFlushes.getDeviation()/2.00) {
+          // Reverting previous step as it was not useful.
+          // Tuning failed to decrease evictions or tuning resulted in large number of flushes.
+          newTuneDirection = StepDirection.INCREASE_MEMSTORE_SIZE;
+          tunerLog += "Reverting previous tuning.";
+          if ((double)evictCount > rollingStatsForEvictions.getMean()) {
+            tunerLog += " As could not decrease evctions sufficiently.";
+          } else {
+            tunerLog += " As number of flushes rose significantly.";
+          }
+          isReverting = true;
+        }
+        break;
+      case INCREASE_MEMSTORE_SIZE:
+        if ((double)totalFlushCount > rollingStatsForFlushes.getMean() ||
+            (double)evictCount > rollingStatsForEvictions.getMean() +
+            rollingStatsForEvictions.getDeviation()/2.00) {
+          // Reverting previous step as it was not useful.
+          // Tuning failed to decrease flushes or tuning resulted in large number of evictions.
+          newTuneDirection = StepDirection.INCREASE_BLOCK_CACHE_SIZE;
+          tunerLog += "Reverting previous tuning.";
+          if ((double)totalFlushCount > rollingStatsForFlushes.getMean()) {
+            tunerLog += " As could not decrease flushes sufficiently.";
+          } else {
+            tunerLog += " As number of evictions rose significantly.";
+          }
+          isReverting = true;
+        }
+        break;
+      default:
+        // Last step was neutral, revert doesn't not apply here.
+        break;
+      }
+      // If we are not reverting. We try to tune memory sizes by looking at cache misses / flushes.
+      if (!isReverting){
+        // mean +- deviation/2 is considered to be normal
+        // below it its consider low and above it is considered high.
+        // We can safely assume that the number cache misses, flushes are normally distributed over
+        // past periods and hence on all the above mentioned classes (normal, high and low)
+        // are equally likely with 33% probability each. Hence there is very good probability that
+        // we will not always fall in default step.
+        if ((double)cacheMissCount < rollingStatsForCacheMisses.getMean() -
+            rollingStatsForCacheMisses.getDeviation()/2.00 &&
+            (double)totalFlushCount < rollingStatsForFlushes.getMean() -
+            rollingStatsForFlushes.getDeviation()/2.00) {
+          // Everything is fine no tuning required
+          newTuneDirection = StepDirection.NEUTRAL;
+        } else if ((double)cacheMissCount > rollingStatsForCacheMisses.getMean() +
+            rollingStatsForCacheMisses.getDeviation()/2.00 &&
+            (double)totalFlushCount < rollingStatsForFlushes.getMean() -
+            rollingStatsForFlushes.getDeviation()/2.00) {
+          // more misses , increasing cache size
+          newTuneDirection = StepDirection.INCREASE_BLOCK_CACHE_SIZE;
+          tunerLog +=
+              "Increasing block cache size as observed increase in number of cache misses.";
+        } else if ((double)cacheMissCount < rollingStatsForCacheMisses.getMean() -
+            rollingStatsForCacheMisses.getDeviation()/2.00 &&
+            (double)totalFlushCount > rollingStatsForFlushes.getMean() +
+            rollingStatsForFlushes.getDeviation()/2.00) {
+          // more flushes , increasing memstore size
+          newTuneDirection = StepDirection.INCREASE_MEMSTORE_SIZE;
+          tunerLog += "Increasing memstore size as observed increase in number of flushes.";
+        } else {
+          // Default. Not enough facts to do tuning.
+          newTuneDirection = StepDirection.NEUTRAL;
+        }
+      }
+    }
+    // Adjusting step size for tuning to get to steady state.
+    // Even if the step size was 4% and 32 GB memory size, we will be shifting 1 GB back and forth
+    // per tuner operation and it can affect the performance of cluster
+    if (prevTuneDirection == StepDirection.NEUTRAL && newTuneDirection != StepDirection.NEUTRAL) {
+      // Restarting the tuning from steady state.
+      step = maximumStepSize;
+    } else if (prevTuneDirection != newTuneDirection) {
+      // Decrease the step size to reach the steady state. Similar procedure as binary search.
+      step = step/2.00f;
+      if (step < minimumStepSize) {
+        // Ensure step size does not gets too small.
+        step = minimumStepSize;
+      }
     }
+    // Increase / decrease the memstore / block cahce sizes depending on new tuner step.
+    switch (newTuneDirection) {
+    case INCREASE_BLOCK_CACHE_SIZE:
+        newBlockCacheSize = context.getCurBlockCacheSize() + step;
+        newMemstoreSize = context.getCurMemStoreSize() - step;
+        break;
+    case INCREASE_MEMSTORE_SIZE:
+        newBlockCacheSize = context.getCurBlockCacheSize() - step;
+        newMemstoreSize = context.getCurMemStoreSize() + step;
+        break;
+    default:
+        prevTuneDirection = StepDirection.NEUTRAL;
+        return NO_OP_TUNER_RESULT;
+    }
+    // Check we are within max/min bounds.
     if (newMemstoreSize > globalMemStorePercentMaxRange) {
       newMemstoreSize = globalMemStorePercentMaxRange;
     } else if (newMemstoreSize < globalMemStorePercentMinRange) {
@@ -93,6 +264,10 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
     }
     TUNER_RESULT.setBlockCacheSize(newBlockCacheSize);
     TUNER_RESULT.setMemstoreSize(newMemstoreSize);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(tunerLog);
+    }
+    prevTuneDirection = newTuneDirection;
     return TUNER_RESULT;
   }
 
@@ -104,7 +279,12 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
-    this.step = conf.getFloat(STEP_KEY, DEFAULT_STEP_VALUE);
+    this.maximumStepSize = conf.getFloat(MAX_STEP_KEY, DEFAULT_MAX_STEP_VALUE);
+    this.minimumStepSize = conf.getFloat(MIN_STEP_KEY, DEFAULT_MIN_STEP_VALUE);
+    this.step = this.maximumStepSize;
+    this.sufficientMemoryLevel = conf.getFloat(SUFFICIENT_MEMORY_LEVEL_KEY,
+        DEFAULT_SUFFICIENT_MEMORY_LEVEL_VALUE);
+    this.tunerLookupPeriods = conf.getInt(LOOKUP_PERIODS_KEY, DEFAULT_LOOKUP_PERIODS);
     this.blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY,
         conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT));
     this.blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY,
@@ -113,5 +293,19 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
         HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false));
     this.globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY,
         HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false));
+    // Default value of periods to ignore is number of lookup periods
+    this.numPeriodsToIgnore = conf.getInt(NUM_PERIODS_TO_IGNORE, this.tunerLookupPeriods);
+    this.rollingStatsForCacheMisses = new RollingStatCalculator(this.tunerLookupPeriods);
+    this.rollingStatsForFlushes = new RollingStatCalculator(this.tunerLookupPeriods);
+    this.rollingStatsForEvictions = new RollingStatCalculator(this.tunerLookupPeriods);
+  }
+
+  private enum StepDirection{
+    // block cache size was increased
+    INCREASE_BLOCK_CACHE_SIZE,
+    // memstore size was increased
+    INCREASE_MEMSTORE_SIZE,
+    // no operation was performed
+    NEUTRAL
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/212cbf8f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 3b9cc03..af3cb5e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1375,7 +1375,8 @@ public class HRegionServer extends HasThread implements
   }
 
   private void startHeapMemoryManager() {
-    this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this);
+    this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher,
+        this, this.regionServerAccounting);
     if (this.hMemManager != null) {
       this.hMemManager.start(getChoreService());
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/212cbf8f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
index fe9df71..ba6e959 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
@@ -76,6 +76,7 @@ public class HeapMemoryManager {
   private final ResizableBlockCache blockCache;
   private final FlushRequester memStoreFlusher;
   private final Server server;
+  private final RegionServerAccounting regionServerAccounting;
 
   private HeapMemoryTunerChore heapMemTunerChore = null;
   private final boolean tunerOn;
@@ -85,21 +86,23 @@ public class HeapMemoryManager {
   private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
 
   public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher,
-      Server server) {
+                Server server, RegionServerAccounting regionServerAccounting) {
     BlockCache blockCache = CacheConfig.instantiateBlockCache(conf);
     if (blockCache instanceof ResizableBlockCache) {
-      return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server);
+      return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server,
+                 regionServerAccounting);
     }
     return null;
   }
 
   @VisibleForTesting
   HeapMemoryManager(ResizableBlockCache blockCache, FlushRequester memStoreFlusher,
-      Server server) {
+                Server server, RegionServerAccounting regionServerAccounting) {
     Configuration conf = server.getConfiguration();
     this.blockCache = blockCache;
     this.memStoreFlusher = memStoreFlusher;
     this.server = server;
+    this.regionServerAccounting = regionServerAccounting;
     this.tunerOn = doInit(conf);
     this.defaultChorePeriod = conf.getInt(HBASE_RS_HEAP_MEMORY_TUNER_PERIOD,
       HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD);
@@ -217,6 +220,7 @@ public class HeapMemoryManager {
     private AtomicLong blockedFlushCount = new AtomicLong();
     private AtomicLong unblockedFlushCount = new AtomicLong();
     private long evictCount = 0L;
+    private long cacheMissCount = 0L;
     private TunerContext tunerContext = new TunerContext();
     private boolean alarming = false;
 
@@ -265,11 +269,21 @@ public class HeapMemoryManager {
     }
 
     private void tune() {
-      long curEvictCount = blockCache.getStats().getEvictedCount();
+      // TODO check if we can increase the memory boundaries
+      // while remaining in the limits
+      long curEvictCount;
+      long curCacheMisCount;
+      curEvictCount = blockCache.getStats().getEvictedCount();
       tunerContext.setEvictCount(curEvictCount - evictCount);
       evictCount = curEvictCount;
+      curCacheMisCount = blockCache.getStats().getMissCachingCount();
+      tunerContext.setCacheMissCount(curCacheMisCount-cacheMissCount);
+      cacheMissCount = curCacheMisCount;
       tunerContext.setBlockedFlushCount(blockedFlushCount.getAndSet(0));
       tunerContext.setUnblockedFlushCount(unblockedFlushCount.getAndSet(0));
+      tunerContext.setCurBlockCacheUsed((float)blockCache.getCurrentSize() / maxHeapSize);
+      tunerContext.setCurMemStoreUsed(
+                 (float)regionServerAccounting.getGlobalMemstoreSize() / maxHeapSize);
       tunerContext.setCurBlockCacheSize(blockCachePercent);
       tunerContext.setCurMemStoreSize(globalMemStorePercent);
       TunerResult result = null;
@@ -322,6 +336,8 @@ public class HeapMemoryManager {
           globalMemStorePercent = memstoreSize;
           memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize);
         }
+      } else if (LOG.isDebugEnabled()) {
+        LOG.debug("No changes made by HeapMemoryTuner.");
       }
     }
 
@@ -350,6 +366,9 @@ public class HeapMemoryManager {
     private long blockedFlushCount;
     private long unblockedFlushCount;
     private long evictCount;
+    private long cacheMissCount;
+    private float curBlockCacheUsed;
+    private float curMemStoreUsed;
     private float curMemStoreSize;
     private float curBlockCacheSize;
 
@@ -392,6 +411,30 @@ public class HeapMemoryManager {
     public void setCurBlockCacheSize(float curBlockCacheSize) {
       this.curBlockCacheSize = curBlockCacheSize;
     }
+
+    public long getCacheMissCount() {
+      return cacheMissCount;
+    }
+
+    public void setCacheMissCount(long cacheMissCount) {
+      this.cacheMissCount = cacheMissCount;
+    }
+
+    public float getCurBlockCacheUsed() {
+      return curBlockCacheUsed;
+    }
+
+    public void setCurBlockCacheUsed(float curBlockCacheUsed) {
+      this.curBlockCacheUsed = curBlockCacheUsed;
+    }
+
+    public float getCurMemStoreUsed() {
+      return curMemStoreUsed;
+    }
+
+    public void setCurMemStoreUsed(float d) {
+        this.curMemStoreUsed = d;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/212cbf8f/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RollingStatCalculator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RollingStatCalculator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RollingStatCalculator.java
new file mode 100644
index 0000000..554d6f5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RollingStatCalculator.java
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+/**
+ * This class maintains mean and variation for any sequence of input provided to it.
+ * It is initialized with number of rolling periods which basically means the number of past
+ * inputs whose data will be considered to maintain mean and variation.
+ * It will use O(N) memory to maintain these statistics, where N is number of look up periods it
+ * was initialized with.
+ * If zero is passed during initialization then it will maintain mean and variance from the
+ * start. It will use O(1) memory only. But note that since it will maintain mean / variance
+ * from the start the statistics may behave like constants and may ignore short trends.
+ * All operations are O(1) except the initialization which is O(N).
+ */
+public class RollingStatCalculator {
+  private double currentSum;
+  private double currentSqrSum;
+  // Total number of data values whose statistic is currently present
+  private long numberOfDataValues;
+  private int rollingPeriod;
+  private int currentIndexPosition;
+  // to be used only if we have non-zero rolling period
+  private long [] dataValues;
+
+  /**
+   * Creates a RollingStatCalculator with given number of rolling periods.
+   * @param rollingPeriod
+   */
+  public RollingStatCalculator(int rollingPeriod) {
+    this.rollingPeriod = rollingPeriod;
+    this.dataValues = fillWithZeros(rollingPeriod);
+    this.currentSum = 0.0;
+    this.currentSqrSum = 0.0;
+    this.currentIndexPosition = 0;
+    this.numberOfDataValues = 0;
+  }
+
+  /**
+   * Inserts given data value to array of data values to be considered for statistics calculation
+   * @param data
+   */
+  public void insertDataValue(long data) {
+    // if current number of data points already equals rolling period and rolling period is
+    // non-zero then remove one data and update the statistics
+    if(numberOfDataValues >= rollingPeriod && rollingPeriod > 0) {
+      this.removeData(dataValues[currentIndexPosition]);
+    }
+    numberOfDataValues++;
+    currentSum = currentSum + (double)data;
+    currentSqrSum = currentSqrSum + ((double)data * data);
+    if (rollingPeriod >0)
+    {
+      dataValues[currentIndexPosition] = data;
+      currentIndexPosition = (currentIndexPosition + 1) % rollingPeriod;
+    }
+  }
+
+  /**
+   * Update the statistics after removing the given data value
+   * @param data
+   */
+  private void removeData(long data) {
+    currentSum = currentSum - (double)data;
+    currentSqrSum = currentSqrSum - ((double)data * data);
+    numberOfDataValues--;
+  }
+
+  /**
+   * @return mean of the data values that are in the current list of data values
+   */
+  public double getMean() {
+    return this.currentSum / (double)numberOfDataValues;
+  }
+
+  /**
+   * @return deviation of the data values that are in the current list of data values
+   */
+  public double getDeviation() {
+    double variance = (currentSqrSum - (currentSum*currentSum)/(double)(numberOfDataValues))/
+        numberOfDataValues;
+    return Math.sqrt(variance);
+  }
+
+  /**
+   * @param size
+   * @return an array of given size initialized with zeros
+   */
+  private long [] fillWithZeros(int size) {
+    long [] zeros = new long [size];
+    for (int i=0; i<size; i++) {
+      zeros[i] = 0L;
+    }
+    return zeros;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/212cbf8f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
index b514581..10e125e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
@@ -60,7 +60,7 @@ public class TestHeapMemoryManager {
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f);
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
     HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0),
-        new MemstoreFlusherStub(0), new RegionServerStub(conf));
+        new MemstoreFlusherStub(0), new RegionServerStub(conf), new RegionServerAccountingStub());
     assertFalse(manager.isTunerOn());
   }
 
@@ -70,7 +70,7 @@ public class TestHeapMemoryManager {
     conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
     conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.05f);
     HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0),
-        new MemstoreFlusherStub(0), new RegionServerStub(conf));
+        new MemstoreFlusherStub(0), new RegionServerStub(conf), new RegionServerAccountingStub());
     assertFalse(manager.isTunerOn());
   }
 
@@ -82,7 +82,8 @@ public class TestHeapMemoryManager {
     conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.06f);
     try {
-      new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf));
+      new HeapMemoryManager(blockCache, memStoreFlusher,
+          new RegionServerStub(conf), new RegionServerAccountingStub());
       fail();
     } catch (RuntimeException e) {
     }
@@ -90,25 +91,96 @@ public class TestHeapMemoryManager {
     conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.2f);
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
     try {
-      new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf));
+      new HeapMemoryManager(blockCache, memStoreFlusher,
+          new RegionServerStub(conf), new RegionServerAccountingStub());
       fail();
     } catch (RuntimeException e) {
     }
   }
 
   @Test
+  public void testWhenClusterIsWriteHeavyWithEmptyMemstore() throws Exception {
+    BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
+    MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
+    RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
+    // Empty block cache and memstore
+    blockCache.setTestBlockSize(0);
+    regionServerAccounting.setTestMemstoreSize(0);
+    Configuration conf = HBaseConfiguration.create();
+    conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
+    conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
+    conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
+    conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
+    conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
+    conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
+    // Let the system start with default values for memstore heap and block cache size.
+    HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
+        new RegionServerStub(conf), regionServerAccounting);
+    long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
+    long oldBlockCacheSize = blockCache.maxSize;
+    final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
+    heapMemoryManager.start(choreService);
+    memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
+    memStoreFlusher.requestFlush(null, false);
+    memStoreFlusher.requestFlush(null, false);
+    memStoreFlusher.requestFlush(null, false);
+    memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
+    memStoreFlusher.requestFlush(null, false);
+    Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
+    // No changes should be made by tuner as we already have lot of empty space
+    assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
+    assertEquals(oldBlockCacheSize, blockCache.maxSize);
+  }
+
+  @Test
+  public void testWhenClusterIsReadHeavyWithEmptyBlockCache() throws Exception {
+    BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
+    MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
+    RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
+    // Empty block cache and memstore
+    blockCache.setTestBlockSize(0);
+    regionServerAccounting.setTestMemstoreSize(0);
+    Configuration conf = HBaseConfiguration.create();
+    conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
+    conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
+    conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
+    conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
+    conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
+    conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
+    // Let the system start with default values for memstore heap and block cache size.
+    HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
+        new RegionServerStub(conf), regionServerAccounting);
+    long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
+    long oldBlockCacheSize = blockCache.maxSize;
+    final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
+    heapMemoryManager.start(choreService);
+    blockCache.evictBlock(null);
+    blockCache.evictBlock(null);
+    blockCache.evictBlock(null);
+    Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
+    // No changes should be made by tuner as we already have lot of empty space
+    assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
+    assertEquals(oldBlockCacheSize, blockCache.maxSize);
+  }
+
+  @Test
   public void testWhenClusterIsWriteHeavy() throws Exception {
     BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
     MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
+    RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
+    // Empty block cache and but nearly filled memstore
+    blockCache.setTestBlockSize(0);
+    regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8));
     Configuration conf = HBaseConfiguration.create();
     conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
     conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
     conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
+    conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
     // Let the system start with default values for memstore heap and block cache size.
     HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
-        new RegionServerStub(conf));
+        new RegionServerStub(conf), regionServerAccounting);
     long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
     long oldBlockCacheSize = blockCache.maxSize;
     final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
@@ -120,9 +192,9 @@ public class TestHeapMemoryManager {
     memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
     memStoreFlusher.requestFlush(null, false);
     Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
-    assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize,
+    assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
         memStoreFlusher.memstoreSize);
-    assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldBlockCacheSize,
+    assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE), oldBlockCacheSize,
         blockCache.maxSize);
     oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
     oldBlockCacheSize = blockCache.maxSize;
@@ -131,9 +203,9 @@ public class TestHeapMemoryManager {
     memStoreFlusher.requestFlush(null, false);
     memStoreFlusher.requestFlush(null, false);
     Thread.sleep(1500);
-    assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize,
+    assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
         memStoreFlusher.memstoreSize);
-    assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldBlockCacheSize,
+    assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE), oldBlockCacheSize,
         blockCache.maxSize);
   }
 
@@ -141,15 +213,20 @@ public class TestHeapMemoryManager {
   public void testWhenClusterIsReadHeavy() throws Exception {
     BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
     MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
+    RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
+    // Empty memstore and but nearly filled block cache
+    blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8));
+    regionServerAccounting.setTestMemstoreSize(0);
     Configuration conf = HBaseConfiguration.create();
     conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
     conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
     conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
+    conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
     // Let the system start with default values for memstore heap and block cache size.
     HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
-        new RegionServerStub(conf));
+        new RegionServerStub(conf), new RegionServerAccountingStub());
     long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
     long oldBlockCacheSize = blockCache.maxSize;
     final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
@@ -158,18 +235,63 @@ public class TestHeapMemoryManager {
     blockCache.evictBlock(null);
     blockCache.evictBlock(null);
     Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
-    assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldMemstoreHeapSize,
+    assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE), oldMemstoreHeapSize,
         memStoreFlusher.memstoreSize);
-    assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldBlockCacheSize,
+    assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldBlockCacheSize,
         blockCache.maxSize);
     oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
     oldBlockCacheSize = blockCache.maxSize;
     // Do some more evictions before the next run of HeapMemoryTuner
     blockCache.evictBlock(null);
     Thread.sleep(1500);
-    assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldMemstoreHeapSize,
+    assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE), oldMemstoreHeapSize,
+        memStoreFlusher.memstoreSize);
+    assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldBlockCacheSize,
+        blockCache.maxSize);
+  }
+
+  @Test
+  public void testWhenClusterIsHavingMoreWritesThanReads() throws Exception {
+    BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
+    MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
+    RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
+    // Both memstore and block cache are nearly filled
+    blockCache.setTestBlockSize(0);
+    regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8));
+    blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8));
+    Configuration conf = HBaseConfiguration.create();
+    conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
+    conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
+    conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
+    conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
+    conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
+    conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
+    // Let the system start with default values for memstore heap and block cache size.
+    HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
+        new RegionServerStub(conf), regionServerAccounting);
+    long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
+    long oldBlockCacheSize = blockCache.maxSize;
+    final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
+    heapMemoryManager.start(choreService);
+    memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
+    memStoreFlusher.requestFlush(null, false);
+    memStoreFlusher.requestFlush(null, false);
+    memStoreFlusher.requestFlush(null, false);
+    blockCache.evictBlock(null);
+    memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
+    memStoreFlusher.requestFlush(null, false);
+    Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
+    // No changes should happen as there is undefined increase in flushes and evictions
+    assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
+    assertEquals(oldBlockCacheSize, blockCache.maxSize);
+    // Do some more flushes before the next run of HeapMemoryTuner
+    memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
+    memStoreFlusher.requestFlush(null, false);
+    memStoreFlusher.requestFlush(null, false);
+    Thread.sleep(1500);
+    assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
         memStoreFlusher.memstoreSize);
-    assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldBlockCacheSize,
+    assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE), oldBlockCacheSize,
         blockCache.maxSize);
   }
 
@@ -183,11 +305,12 @@ public class TestHeapMemoryManager {
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f);
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.02f);
     conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
+    conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
     conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class,
         HeapMemoryTuner.class);
     // Let the system start with default values for memstore heap and block cache size.
     HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
-        new RegionServerStub(conf));
+        new RegionServerStub(conf), new RegionServerAccountingStub());
     final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
     heapMemoryManager.start(choreService);
     // Now we wants to be in write mode. Set bigger memstore size from CustomHeapMemoryTuner
@@ -214,10 +337,11 @@ public class TestHeapMemoryManager {
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.1f);
     conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
+    conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
     conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class,
         HeapMemoryTuner.class);
     HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
-        new RegionServerStub(conf));
+        new RegionServerStub(conf), new RegionServerAccountingStub());
     final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
     heapMemoryManager.start(choreService);
     CustomHeapMemoryTuner.memstoreSize = 0.78f;
@@ -239,10 +363,11 @@ public class TestHeapMemoryManager {
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.1f);
     conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
+    conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
     conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class,
         HeapMemoryTuner.class);
     HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
-        new RegionServerStub(conf));
+        new RegionServerStub(conf), new RegionServerAccountingStub());
     long oldMemstoreSize = memStoreFlusher.memstoreSize;
     long oldBlockCacheSize = blockCache.maxSize;
     final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
@@ -264,7 +389,7 @@ public class TestHeapMemoryManager {
     conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.1f);
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.1f);
-
+    conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
     conf.setFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, 0.4F);
     conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.3F);
     conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0.1F);
@@ -275,8 +400,8 @@ public class TestHeapMemoryManager {
         HeapMemoryTuner.class);
 
     try {
-      heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(
-          conf));
+      heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
+          new RegionServerStub(conf), new RegionServerAccountingStub());
       fail("Should have failed as the collective heap memory need is above 80%");
     } catch (Exception e) {
     }
@@ -284,8 +409,8 @@ public class TestHeapMemoryManager {
     // Change the max/min ranges for memstore and bock cache so as to pass the criteria check
     conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.6f);
     conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.6f);
-    heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(
-        conf));
+    heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
+        new RegionServerStub(conf), new RegionServerAccountingStub());
     long oldMemstoreSize = memStoreFlusher.memstoreSize;
     long oldBlockCacheSize = blockCache.maxSize;
     final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
@@ -309,18 +434,23 @@ public class TestHeapMemoryManager {
     assertEquals(expected, currentHeapSpace);
   }
 
-  private void assertHeapSpaceDelta(float expectedDeltaPercent, long oldHeapSpace, long newHeapSpace) {
-    long expctedMinDelta = (long) (this.maxHeapSize * expectedDeltaPercent);
+  private void assertHeapSpaceDelta(double expectedDeltaPercent, long oldHeapSpace, long newHeapSpace) {
+    double expctedMinDelta = (double) (this.maxHeapSize * expectedDeltaPercent);
+    // Tolerable error
+    double error = 0.999;
     if (expectedDeltaPercent > 0) {
-      assertTrue(expctedMinDelta <= (newHeapSpace - oldHeapSpace));
+      assertTrue(expctedMinDelta*error <= (double)(newHeapSpace - oldHeapSpace));
+      assertTrue(expctedMinDelta/error >= (double)(newHeapSpace - oldHeapSpace));
     } else {
-      assertTrue(expctedMinDelta <= (oldHeapSpace - newHeapSpace));
+      assertTrue(-expctedMinDelta*error <= (double)(oldHeapSpace - newHeapSpace));
+      assertTrue(-expctedMinDelta/error >= (double)(oldHeapSpace - newHeapSpace));
     }
   }
 
   private static class BlockCacheStub implements ResizableBlockCache {
     CacheStats stats = new CacheStats("test");
     long maxSize = 0;
+    private long testBlockSize = 0;
 
     public BlockCacheStub(long size){
       this.maxSize = size;
@@ -377,7 +507,7 @@ public class TestHeapMemoryManager {
 
     @Override
     public long getCurrentSize() {
-      return 0;
+      return this.testBlockSize;
     }
 
     @Override
@@ -399,6 +529,10 @@ public class TestHeapMemoryManager {
     public BlockCache[] getBlockCaches() {
       return null;
     }
+
+	public void setTestBlockSize(long testBlockSize) {
+		this.testBlockSize = testBlockSize;
+	}
   }
 
   private static class MemstoreFlusherStub implements FlushRequester {
@@ -525,4 +659,15 @@ public class TestHeapMemoryManager {
       return result;
     }
   }
+
+  private static class RegionServerAccountingStub extends RegionServerAccounting {
+    private long testMemstoreSize = 0;
+    @Override
+    public long getGlobalMemstoreSize() {
+      return testMemstoreSize;
+    }
+    public void setTestMemstoreSize(long testMemstoreSize) {
+      this.testMemstoreSize = testMemstoreSize;
+    }
+  }
 }