You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by lg...@apache.org on 2018/08/24 15:58:32 UTC

[geode] branch develop updated: GEODE-5562 Reading the values of LocalRegion.memoryThresholdReached and DistributedRegion.memoryThresholdReachedMembers must be atomic (#2320)

This is an automated email from the ASF dual-hosted git repository.

lgallinat pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 583d141  GEODE-5562 Reading the values of LocalRegion.memoryThresholdReached and DistributedRegion.memoryThresholdReachedMembers must be atomic (#2320)
583d141 is described below

commit 583d1416644a8392ed1bf257b87025a9071aaa55
Author: Lynn Gallinat <lg...@pivotal.io>
AuthorDate: Fri Aug 24 08:58:27 2018 -0700

    GEODE-5562 Reading the values of LocalRegion.memoryThresholdReached and DistributedRegion.memoryThresholdReachedMembers must be atomic (#2320)
    
    * GEODE-5562 Reading the values of LocalRegion.memoryThresholdReached and
               DistributedRegion.memoryThresholdReachedMembers must be atomic.
---
 .../management/MemoryThresholdsDUnitTest.java      |   2 +-
 .../MemoryThresholdsOffHeapDUnitTest.java          |  18 +-
 .../cache/query/internal/DefaultQueryService.java  |  17 +-
 .../internal/cache/CacheDistributionAdvisor.java   |   2 +-
 .../geode/internal/cache/DistributedRegion.java    |  41 +--
 .../geode/internal/cache/InternalRegion.java       |   2 +
 .../apache/geode/internal/cache/LocalRegion.java   |  63 ++--
 .../geode/internal/cache/MemoryThresholdInfo.java  |  53 ++++
 .../geode/internal/cache/PartitionedRegion.java    |  29 +-
 .../internal/cache/control/HeapMemoryMonitor.java  |  70 ++++-
 .../internal/cache/control/ResourceAdvisor.java    |   2 +-
 .../execute/DistributedRegionFunctionExecutor.java |  16 +-
 .../cache/execute/MemberFunctionExecutor.java      |  19 +-
 .../cache/execute/MultiRegionFunctionExecutor.java |  32 +-
 .../execute/PartitionedRegionFunctionExecutor.java |  17 +-
 .../partitioned/PartitionedRegionRebalanceOp.java  |   2 +-
 .../internal/cache/partitioned/RegionAdvisor.java  |   4 +-
 .../tier/sockets/command/ExecuteFunction.java      |  19 +-
 .../tier/sockets/command/ExecuteFunction65.java    |  20 +-
 .../tier/sockets/command/ExecuteFunction66.java    |  20 +-
 .../internal/cache/DistributedRegionJUnitTest.java |  32 ++
 .../internal/cache/MemoryThresholdInfoTest.java    |  40 +++
 .../cache/control/HeapMemoryMonitorTest.java       | 325 +++++++++++++++++++++
 .../sockets/command/ExecuteFunction66Test.java     |   4 +
 24 files changed, 638 insertions(+), 211 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsDUnitTest.java
index d8cfec4..79c522e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsDUnitTest.java
@@ -303,7 +303,7 @@ public class MemoryThresholdsDUnitTest extends ClientServerTestCase {
 
           public boolean done() {
             DistributedRegion dr = (DistributedRegion) getRootRegion().getSubregion(regionName);
-            return dr.getMemoryThresholdReachedMembers().size() == 0;
+            return dr.getAtomicThresholdInfo().getMembersThatReachedThreshold().size() == 0;
           }
         };
         Wait.waitForCriterion(wc, 30000, 10, true);
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsOffHeapDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsOffHeapDUnitTest.java
index 330b974..907456c 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsOffHeapDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsOffHeapDUnitTest.java
@@ -494,7 +494,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
 
           public boolean done() {
             DistributedRegion dr = (DistributedRegion) getRootRegion().getSubregion(regionName);
-            return dr.getMemoryThresholdReachedMembers().size() == 0;
+            return dr.getAtomicThresholdInfo().getMembersThatReachedThreshold().size() == 0;
           }
         };
         Wait.waitForCriterion(wc, 10000, 10, true);
@@ -602,7 +602,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
               }
 
               public boolean done() {
-                return r.memoryThresholdReached.get();
+                return r.isMemoryThresholdReached();
               }
             };
             Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -618,7 +618,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
               }
 
               public boolean done() {
-                return !r.memoryThresholdReached.get();
+                return !r.isMemoryThresholdReached();
               }
             };
             Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -670,7 +670,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
           }
 
           public boolean done() {
-            return r.memoryThresholdReached.get();
+            return r.isMemoryThresholdReached();
           }
         };
         Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -686,7 +686,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
           }
 
           public boolean done() {
-            return !r.memoryThresholdReached.get();
+            return !r.isMemoryThresholdReached();
           }
         };
         Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -1235,7 +1235,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
           }
 
           public boolean done() {
-            return r.memoryThresholdReached.get();
+            return r.isMemoryThresholdReached();
           }
         };
         Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -1258,7 +1258,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
           }
 
           public boolean done() {
-            return !r.memoryThresholdReached.get();
+            return !r.isMemoryThresholdReached();
           }
         };
         Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -1514,12 +1514,12 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
             @Override
             public String description() {
               return "Expected to go critical: isCritical=" + ohm.getState().isCritical()
-                  + " memoryThresholdReached=" + r.memoryThresholdReached.get();
+                  + " memoryThresholdReached=" + r.isMemoryThresholdReached();
             }
 
             @Override
             public boolean done() {
-              return ohm.getState().isCritical() && r.memoryThresholdReached.get();
+              return ohm.getState().isCritical() && r.isMemoryThresholdReached();
             }
           };
         }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
index 2d15795..e476d45 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
@@ -68,7 +68,9 @@ import org.apache.geode.cache.query.internal.parse.OQLLexerTokenTypes;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.MemoryThresholdInfo;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -226,12 +228,15 @@ public class DefaultQueryService implements InternalQueryService {
     // UnsupportedOperationException(LocalizedStrings.DefaultQueryService_INDEX_CREATION_IS_NOT_SUPPORTED_FOR_REGIONS_WHICH_OVERFLOW_TO_DISK_THE_REGION_INVOLVED_IS_0.toLocalizedString(regionPath));
     // }
     // if its a pr the create index on all of the local buckets.
-    if (((LocalRegion) region).memoryThresholdReached.get()
-        && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-      LocalRegion lr = (LocalRegion) region;
-      throw new LowMemoryException(
-          LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_INDEX.toLocalizedString(region.getName()),
-          lr.getMemoryThresholdReachedMembers());
+    if (!MemoryThresholds.isLowMemoryExceptionDisabled()) {
+      InternalRegion internalRegion = (InternalRegion) region;
+      MemoryThresholdInfo info = internalRegion.getAtomicThresholdInfo();
+      if (info.isMemoryThresholdReached()) {
+        throw new LowMemoryException(
+            LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_INDEX
+                .toLocalizedString(region.getName()),
+            info.getMembersThatReachedThreshold());
+      }
     }
     if (region instanceof PartitionedRegion) {
       try {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
index 6a726ce..2a74e7e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
@@ -1103,7 +1103,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
       logger.debug("CDA: removing profile {}", profile);
     }
     if (getAdvisee() instanceof LocalRegion && profile != null) {
-      ((LocalRegion) getAdvisee()).removeMemberFromCriticalList(profile.getDistributedMember());
+      ((LocalRegion) getAdvisee()).removeCriticalMember(profile.getDistributedMember());
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 9b395fc..d9f5903 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -1113,7 +1112,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
   /**
    * A reference counter to protected the memoryThresholdReached boolean
    */
-  private final Set<DistributedMember> memoryThresholdReachedMembers = new CopyOnWriteArraySet<>();
+  private final Set<DistributedMember> memoryThresholdReachedMembers = new HashSet<>();
 
   // TODO: cleanup getInitialImageAndRecovery
   private void getInitialImageAndRecovery(InputStream snapshotInputStream,
@@ -3751,42 +3750,35 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
       if (event.getState().isCritical() && !event.getPreviousState().isCritical()
           && (event.getType() == ResourceType.HEAP_MEMORY
               || (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
-        setMemoryThresholdReachedCounterTrue(event.getMember());
+        addCriticalMember(event.getMember());
       } else if (!event.getState().isCritical() && event.getPreviousState().isCritical()
           && (event.getType() == ResourceType.HEAP_MEMORY
               || (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
-        removeMemberFromCriticalList(event.getMember());
+        removeCriticalMember(event.getMember());
       }
     }
   }
 
   @Override
-  public void removeMemberFromCriticalList(DistributedMember member) {
+  public void removeCriticalMember(DistributedMember member) {
     if (logger.isDebugEnabled()) {
       logger.debug("DR: removing member {} from critical member list", member);
     }
     synchronized (this.memoryThresholdReachedMembers) {
       this.memoryThresholdReachedMembers.remove(member);
-      if (this.memoryThresholdReachedMembers.size() == 0) {
-        memoryThresholdReached.set(false);
+      if (this.memoryThresholdReachedMembers.isEmpty()) {
+        setMemoryThresholdReached(false);
       }
     }
   }
 
   @Override
-  public Set<DistributedMember> getMemoryThresholdReachedMembers() {
-    synchronized (this.memoryThresholdReachedMembers) {
-      return Collections.unmodifiableSet(this.memoryThresholdReachedMembers);
-    }
-  }
-
-  @Override
   public void initialCriticalMembers(boolean localMemoryIsCritical,
       Set<InternalDistributedMember> criticalMembers) {
     Set<InternalDistributedMember> others = getCacheDistributionAdvisor().adviseGeneric();
     for (InternalDistributedMember idm : criticalMembers) {
       if (others.contains(idm)) {
-        setMemoryThresholdReachedCounterTrue(idm);
+        addCriticalMember(idm);
       }
     }
   }
@@ -3794,12 +3786,23 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
   /**
    * @param idm member whose threshold has been exceeded
    */
-  private void setMemoryThresholdReachedCounterTrue(final DistributedMember idm) {
+  protected void addCriticalMember(final DistributedMember idm) {
     synchronized (this.memoryThresholdReachedMembers) {
-      this.memoryThresholdReachedMembers.add(idm);
-      if (this.memoryThresholdReachedMembers.size() > 0) {
-        memoryThresholdReached.set(true);
+      if (this.memoryThresholdReachedMembers.isEmpty()) {
+        setMemoryThresholdReached(true);
       }
+      this.memoryThresholdReachedMembers.add(idm);
+    }
+  }
+
+  @Override
+  public MemoryThresholdInfo getAtomicThresholdInfo() {
+    if (!isMemoryThresholdReached()) {
+      return MemoryThresholdInfo.getNotReached();
+    }
+    synchronized (memoryThresholdReachedMembers) {
+      return new MemoryThresholdInfo(isMemoryThresholdReached(),
+          new HashSet<DistributedMember>(memoryThresholdReachedMembers));
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
index 8b887ef..78a664e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -408,4 +408,6 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo
 
   default void handleWANEvent(EntryEventImpl event) {}
 
+  MemoryThresholdInfo getAtomicThresholdInfo();
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 482bf0c..92c2e3c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -488,7 +488,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
    * This boolean is true when a member who has this region is running low on memory. It is used to
    * reject region operations.
    */
-  public final AtomicBoolean memoryThresholdReached = new AtomicBoolean(false);
+  private final AtomicBoolean memoryThresholdReached = new AtomicBoolean(false);
 
   /**
    * Lock for updating PR MetaData on client side
@@ -1023,13 +1023,13 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
             if (!newRegion.getOffHeap()) {
               newRegion.initialCriticalMembers(
                   this.cache.getInternalResourceManager().getHeapMonitor().getState().isCritical(),
-                  this.cache.getResourceAdvisor().adviseCritialMembers());
+                  this.cache.getResourceAdvisor().adviseCriticalMembers());
             } else {
               newRegion.initialCriticalMembers(
                   this.cache.getInternalResourceManager().getHeapMonitor().getState().isCritical()
                       || this.cache.getInternalResourceManager().getOffHeapMonitor().getState()
                           .isCritical(),
-                  this.cache.getResourceAdvisor().adviseCritialMembers());
+                  this.cache.getResourceAdvisor().adviseCriticalMembers());
             }
 
             // synchronization would be done on ManagementAdapter.regionOpLock
@@ -2912,7 +2912,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   }
 
   protected boolean isMemoryThresholdReachedForLoad() {
-    return this.memoryThresholdReached.get();
+    return isMemoryThresholdReached();
   }
 
   /**
@@ -5736,14 +5736,13 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
    * @throws LowMemoryException if the target member for this operation is sick
    */
   private void checkIfAboveThreshold(final Object key) throws LowMemoryException {
-    if (this.memoryThresholdReached.get()) {
-      Set<DistributedMember> membersThatReachedThreshold = getMemoryThresholdReachedMembers();
-
+    MemoryThresholdInfo info = getAtomicThresholdInfo();
+    if (info.isMemoryThresholdReached()) {
+      Set<DistributedMember> membersThatReachedThreshold = info.getMembersThatReachedThreshold();
       // #45603: trigger a background eviction since we're above the the critical
       // threshold
       InternalResourceManager.getInternalResourceManager(this.cache).getHeapMonitor()
           .updateStateAndSendEvent();
-
       throw new LowMemoryException(
           LocalizedStrings.ResourceManager_LOW_MEMORY_IN_0_FOR_PUT_1_MEMBER_2.toLocalizedString(
               getFullPath(), key, membersThatReachedThreshold),
@@ -5751,6 +5750,15 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     }
   }
 
+  @Override
+  public MemoryThresholdInfo getAtomicThresholdInfo() {
+    if (!isMemoryThresholdReached()) {
+      return MemoryThresholdInfo.getNotReached();
+    }
+    return new MemoryThresholdInfo(isMemoryThresholdReached(),
+        Collections.singleton(this.cache.getMyId()));
+  }
+
   /**
    * Allows null as new value to accommodate create with a null value.
    *
@@ -10726,13 +10734,16 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
       final Function function, final Object args, final ResultCollector rc, final Set filter,
       final ServerToClientFunctionResultSender sender) {
 
-    if (function.optimizeForWrite() && this.memoryThresholdReached.get()
+    if (function.optimizeForWrite()
         && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-      Set<DistributedMember> members = getMemoryThresholdReachedMembers();
-      throw new LowMemoryException(
-          LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-              .toLocalizedString(function.getId(), members),
-          members);
+      MemoryThresholdInfo info = getAtomicThresholdInfo();
+      if (info.isMemoryThresholdReached()) {
+        Set<DistributedMember> members = info.getMembersThatReachedThreshold();
+        throw new LowMemoryException(
+            LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
+                .toLocalizedString(function.getId(), members),
+            members);
+      }
     }
     final LocalResultCollector<?, ?> resultCollector =
         execution.getLocalResultCollector(function, rc);
@@ -10747,13 +10758,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     return resultCollector;
   }
 
-  /**
-   * @return the set of members which are known to be critical
-   */
-  public Set<DistributedMember> getMemoryThresholdReachedMembers() {
-    return Collections.singleton(this.cache.getMyId());
-  }
-
   @Override
   public void onEvent(MemoryEvent event) {
     if (logger.isDebugEnabled()) {
@@ -10769,11 +10773,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
           && (event.getType() == ResourceType.HEAP_MEMORY
               || (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
         // start rejecting operations
-        this.memoryThresholdReached.set(true);
+        setMemoryThresholdReached(true);
       } else if (!event.getState().isCritical() && event.getPreviousState().isCritical()
           && (event.getType() == ResourceType.HEAP_MEMORY
               || (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
-        this.memoryThresholdReached.set(false);
+        setMemoryThresholdReached(false);
       }
     }
   }
@@ -10836,7 +10840,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
    * This method is meant to be overridden by DistributedRegion and PartitionedRegions to cleanup
    * CRITICAL state
    */
-  public void removeMemberFromCriticalList(DistributedMember member) {
+  public void removeCriticalMember(DistributedMember member) {
     // should not be called for LocalRegion
     Assert.assertTrue(false);
   }
@@ -10857,7 +10861,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
       Set<InternalDistributedMember> criticalMembers) {
     assert getScope().isLocal();
     if (localMemoryIsCritical) {
-      this.memoryThresholdReached.set(true);
+      setMemoryThresholdReached(true);
     }
   }
 
@@ -12252,4 +12256,13 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   public Lock getClientMetaDataLock() {
     return clientMetaDataLock;
   }
+
+  public boolean isMemoryThresholdReached() {
+    return memoryThresholdReached.get();
+  }
+
+  protected void setMemoryThresholdReached(boolean reached) {
+    this.memoryThresholdReached.set(reached);
+  }
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/MemoryThresholdInfo.java b/geode-core/src/main/java/org/apache/geode/internal/cache/MemoryThresholdInfo.java
new file mode 100644
index 0000000..3735a0d
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/MemoryThresholdInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.geode.distributed.DistributedMember;
+
+public class MemoryThresholdInfo {
+  private final boolean memoryThresholdReached;
+  private final Set<DistributedMember> membersThatReachedThreshold;
+  private static final MemoryThresholdInfo NOT_REACHED = new MemoryThresholdInfo(false,
+      Collections.EMPTY_SET);
+
+  MemoryThresholdInfo(boolean memoryThresholdReached,
+      Set<DistributedMember> membersThatReachedThreshold) {
+    this.memoryThresholdReached = memoryThresholdReached;
+    this.membersThatReachedThreshold = membersThatReachedThreshold;
+  }
+
+  public Set<DistributedMember> getMembersThatReachedThreshold() {
+    return membersThatReachedThreshold;
+  }
+
+  public boolean isMemoryThresholdReached() {
+    return memoryThresholdReached;
+  }
+
+  static MemoryThresholdInfo getNotReached() {
+    return NOT_REACHED;
+  }
+
+  @Override
+  public String toString() {
+    return "MemoryThresholdInfo{" +
+        "memoryThresholdReached=" + memoryThresholdReached +
+        ", membersThatReachedThreshold=" + membersThatReachedThreshold +
+        '}';
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 77175a3..86b35dc 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -154,17 +154,13 @@ import org.apache.geode.distributed.internal.membership.MemberAttributes;
 import org.apache.geode.i18n.StringId;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.NanoTimer;
-import org.apache.geode.internal.SetUtils;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.BucketAdvisor.ServerBucketProfile;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
 import org.apache.geode.internal.cache.DestroyPartitionedRegionMessage.DestroyPartitionedRegionResponse;
 import org.apache.geode.internal.cache.PutAllPartialResultException.PutAllPartialResult;
-import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
-import org.apache.geode.internal.cache.control.InternalResourceManager;
 import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
 import org.apache.geode.internal.cache.control.MemoryEvent;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.cache.eviction.EvictionController;
 import org.apache.geode.internal.cache.eviction.HeapEvictor;
 import org.apache.geode.internal.cache.execute.AbstractExecution;
@@ -3617,16 +3613,7 @@ public class PartitionedRegion extends LocalRegion
     InternalDistributedMember targetNode = null;
     if (function.optimizeForWrite()) {
       targetNode = createBucket(bucketId, 0, null /* retryTimeKeeper */);
-      HeapMemoryMonitor hmm =
-          ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
-      if (hmm.isMemberHeapCritical(targetNode)
-          && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-        Set<DistributedMember> sm = Collections.singleton((DistributedMember) targetNode);
-        throw new LowMemoryException(
-            LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-                .toLocalizedString(function.getId(), sm),
-            sm);
-      }
+      cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function, targetNode);
     } else {
       targetNode = getOrCreateNodeForBucketRead(bucketId);
     }
@@ -3796,16 +3783,8 @@ public class PartitionedRegion extends LocalRegion
     }
 
     Set<InternalDistributedMember> dest = memberToBuckets.keySet();
-    if (function.optimizeForWrite()
-        && cache.getInternalResourceManager().getHeapMonitor().containsHeapCriticalMembers(dest)
-        && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-      Set<InternalDistributedMember> hcm = cache.getResourceAdvisor().adviseCritialMembers();
-      Set<DistributedMember> sm = SetUtils.intersection(hcm, dest);
-      throw new LowMemoryException(
-          LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-              .toLocalizedString(function.getId(), sm),
-          sm);
-    }
+    cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function,
+        Collections.unmodifiableSet(dest));
 
     boolean isSelf = false;
     execution.setExecutionNodes(dest);
@@ -9295,7 +9274,7 @@ public class PartitionedRegion extends LocalRegion
   }
 
   @Override
-  public void removeMemberFromCriticalList(DistributedMember member) {
+  public void removeCriticalMember(DistributedMember member) {
     if (logger.isDebugEnabled()) {
       logger.debug("PR: removing member {} from critical member list", member);
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
index f187f2e..5bbc27a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
@@ -18,6 +18,8 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.lang.management.MemoryPoolMXBean;
 import java.lang.management.MemoryType;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -36,9 +38,11 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.CancelException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.LowMemoryException;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.SetUtils;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
 import org.apache.geode.internal.cache.control.MemoryThresholds.MemoryState;
@@ -733,20 +737,54 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
     });
   }
 
-  /**
-   * Given a set of members, determine if any member in the set is above critical threshold.
-   *
-   * @param members The set of members to check.
-   * @return True if the set contains a member above critical threshold, false otherwise
-   */
-  public boolean containsHeapCriticalMembers(final Set<InternalDistributedMember> members) {
-    if (members.contains(this.cache.getMyId()) && this.mostRecentEvent.getState().isCritical()) {
-      return true;
+  protected Set<DistributedMember> getHeapCriticalMembersFrom(Set<DistributedMember> members) {
+    Set<DistributedMember> criticalMembers = getCriticalMembers();
+    criticalMembers.retainAll(members);
+    return criticalMembers;
+  }
+
+  private Set<DistributedMember> getCriticalMembers() {
+    Set<DistributedMember> criticalMembers = new HashSet<>(resourceAdvisor.adviseCriticalMembers());
+    if (this.mostRecentEvent.getState().isCritical()) {
+      criticalMembers.add(cache.getMyId());
+    }
+    return criticalMembers;
+  }
+
+  public void checkForLowMemory(Function function, DistributedMember targetMember) {
+    Set<DistributedMember> targetMembers = Collections.singleton(targetMember);
+    checkForLowMemory(function, targetMembers);
+  }
+
+  public void checkForLowMemory(Function function, Set<DistributedMember> dest) {
+    LowMemoryException exception = createLowMemoryIfNeeded(function, dest);
+    if (exception != null) {
+      throw exception;
     }
+  }
+
+  public LowMemoryException createLowMemoryIfNeeded(Function function,
+      DistributedMember targetMember) {
+    Set<DistributedMember> targetMembers = Collections.singleton(targetMember);
+    return createLowMemoryIfNeeded(function, targetMembers);
+  }
 
-    return SetUtils.intersectsWith(members, this.resourceAdvisor.adviseCritialMembers());
+  public LowMemoryException createLowMemoryIfNeeded(Function function,
+      Set<DistributedMember> memberSet) {
+    if (function.optimizeForWrite()
+        && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
+      Set<DistributedMember> criticalMembersFrom = getHeapCriticalMembersFrom(memberSet);
+      if (!criticalMembersFrom.isEmpty()) {
+        return new LowMemoryException(
+            LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
+                .toLocalizedString(function.getId(), criticalMembersFrom),
+            criticalMembersFrom);
+      }
+    }
+    return null;
   }
 
+
   /**
    * Determines if the given member is in a heap critical state.
    *
@@ -761,6 +799,16 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
     return this.resourceAdvisor.isHeapCritical(member);
   }
 
+  protected MemoryEvent getMostRecentEvent() {
+    return mostRecentEvent;
+  }
+
+  protected HeapMemoryMonitor setMostRecentEvent(
+      MemoryEvent mostRecentEvent) {
+    this.mostRecentEvent = mostRecentEvent;
+    return this;
+  }
+
   class LocalHeapStatListener implements LocalStatListener {
     /*
      * (non-Javadoc)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/ResourceAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/ResourceAdvisor.java
index 46e8a82..84da640 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/ResourceAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/ResourceAdvisor.java
@@ -425,7 +425,7 @@ public class ResourceAdvisor extends DistributionAdvisor {
    *
    * @return a mutable set of members in the critical state otherwise {@link Collections#EMPTY_SET}
    */
-  public Set<InternalDistributedMember> adviseCritialMembers() {
+  public Set<InternalDistributedMember> adviseCriticalMembers() {
     return adviseFilter(new Filter() {
       @Override
       public boolean include(Profile profile) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java
index e48d79c..46ac4ea 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java
@@ -29,6 +29,8 @@ import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.MemoryThresholdInfo;
+import org.apache.geode.internal.cache.control.InternalResourceManager;
 import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
@@ -318,15 +320,15 @@ public class DistributedRegionFunctionExecutor extends AbstractExecution {
       }
     }
     if (!MemoryThresholds.isLowMemoryExceptionDisabled() && function.optimizeForWrite()) {
-      try {
-        region.checkIfAboveThreshold(null);
-      } catch (LowMemoryException ignore) {
-        Set<DistributedMember> htrm = region.getMemoryThresholdReachedMembers();
+      MemoryThresholdInfo info = region.getAtomicThresholdInfo();
+      if (info.isMemoryThresholdReached()) {
+        InternalResourceManager.getInternalResourceManager(region.getCache()).getHeapMonitor()
+            .updateStateAndSendEvent();
+        Set<DistributedMember> criticalMembers = info.getMembersThatReachedThreshold();
         throw new LowMemoryException(
             LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-                .toLocalizedString(function.getId(), htrm),
-            htrm);
-
+                .toLocalizedString(function.getId(), criticalMembers),
+            criticalMembers);
       }
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
index f6f1098..16f74f8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
@@ -20,7 +20,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.geode.cache.LowMemoryException;
 import org.apache.geode.cache.TransactionDataNotColocatedException;
 import org.apache.geode.cache.TransactionException;
 import org.apache.geode.cache.execute.Execution;
@@ -34,10 +33,8 @@ import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.SetUtils;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
 public class MemberFunctionExecutor extends AbstractExecution {
@@ -161,7 +158,10 @@ public class MemberFunctionExecutor extends AbstractExecution {
   @Override
   public void validateExecution(final Function function, final Set dest) {
     final InternalCache cache = GemFireCacheImpl.getInstance();
-    if (cache != null && cache.getTxManager().getTXState() != null) {
+    if (cache == null) {
+      return;
+    }
+    if (cache.getTxManager().getTXState() != null) {
       if (dest.size() > 1) {
         throw new TransactionException(
             LocalizedStrings.PartitionedRegion_TX_FUNCTION_ON_MORE_THAN_ONE_NODE
@@ -179,16 +179,7 @@ public class MemberFunctionExecutor extends AbstractExecution {
         }
       }
     }
-    if (function.optimizeForWrite() && cache != null
-        && cache.getInternalResourceManager().getHeapMonitor().containsHeapCriticalMembers(dest)
-        && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-      Set<InternalDistributedMember> hcm = cache.getResourceAdvisor().adviseCritialMembers();
-      Set<DistributedMember> sm = SetUtils.intersection(hcm, dest);
-      throw new LowMemoryException(
-          LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-              .toLocalizedString(new Object[] {function.getId(), sm}),
-          sm);
-    }
+    cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function, dest);
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
index 66ccfe8..8be821b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache.execute;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -21,7 +22,6 @@ import java.util.Random;
 import java.util.Set;
 
 import org.apache.geode.cache.DataPolicy;
-import org.apache.geode.cache.LowMemoryException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.TransactionDataNotColocatedException;
 import org.apache.geode.cache.TransactionException;
@@ -34,13 +34,11 @@ import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.SetUtils;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
 public class MultiRegionFunctionExecutor extends AbstractExecution {
@@ -207,15 +205,9 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
               .toLocalizedString(function.getId()));
     }
     final InternalCache cache = GemFireCacheImpl.getInstance();
-    if (function.optimizeForWrite() && cache != null
-        && cache.getInternalResourceManager().getHeapMonitor().containsHeapCriticalMembers(dest)
-        && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-      Set<InternalDistributedMember> hcm = cache.getResourceAdvisor().adviseCritialMembers();
-      Set<DistributedMember> sm = SetUtils.intersection(hcm, dest);
-      throw new LowMemoryException(
-          LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-              .toLocalizedString(function.getId(), sm),
-          sm);
+    if (cache != null) {
+      cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function,
+          Collections.unmodifiableSet(dest));
     }
     setExecutionNodes(dest);
 
@@ -367,7 +359,10 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
       cache = (InternalCache) r.getCache();
       break;
     }
-    if (cache != null && cache.getTxManager().getTXState() != null) {
+    if (cache == null) {
+      return;
+    }
+    if (cache.getTxManager().getTXState() != null) {
       if (targetMembers.size() > 1) {
         throw new TransactionException(
             LocalizedStrings.PartitionedRegion_TX_FUNCTION_ON_MORE_THAN_ONE_NODE
@@ -385,15 +380,6 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
         }
       }
     }
-    if (function.optimizeForWrite() && cache.getInternalResourceManager().getHeapMonitor()
-        .containsHeapCriticalMembers(targetMembers)
-        && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-      Set<InternalDistributedMember> hcm = cache.getResourceAdvisor().adviseCritialMembers();
-      Set<DistributedMember> sm = SetUtils.intersection(hcm, targetMembers);
-      throw new LowMemoryException(
-          LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-              .toLocalizedString(function.getId(), sm),
-          sm);
-    }
+    cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function, targetMembers);
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
index 6e13ebc..b3ea80d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
@@ -17,7 +17,6 @@ package org.apache.geode.internal.cache.execute;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.geode.cache.LowMemoryException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.TransactionDataRebalancedException;
 import org.apache.geode.cache.TransactionException;
@@ -26,12 +25,9 @@ import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.SetUtils;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
 public class PartitionedRegionFunctionExecutor extends AbstractExecution {
@@ -329,7 +325,7 @@ public class PartitionedRegionFunctionExecutor extends AbstractExecution {
   @Override
   public void validateExecution(Function function, Set targetMembers) {
     InternalCache cache = pr.getGemFireCache();
-    if (cache != null && cache.getTxManager().getTXState() != null) {
+    if (cache.getTxManager().getTXState() != null) {
       if (targetMembers.size() > 1) {
         throw new TransactionException(
             LocalizedStrings.PartitionedRegion_TX_FUNCTION_ON_MORE_THAN_ONE_NODE
@@ -347,15 +343,6 @@ public class PartitionedRegionFunctionExecutor extends AbstractExecution {
         }
       }
     }
-    if (function.optimizeForWrite() && cache.getInternalResourceManager().getHeapMonitor()
-        .containsHeapCriticalMembers(targetMembers)
-        && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-      Set<InternalDistributedMember> hcm = cache.getResourceAdvisor().adviseCritialMembers();
-      Set<DistributedMember> sm = SetUtils.intersection(hcm, targetMembers);
-      throw new LowMemoryException(
-          LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-              .toLocalizedString(new Object[] {function.getId(), sm}),
-          sm);
-    }
+    cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function, targetMembers);
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
index 7135e66..85a2473 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
@@ -454,7 +454,7 @@ public class PartitionedRegionRebalanceOp {
     int redundantCopies = leaderRegion.getRedundantCopies();
     int totalNumberOfBuckets = leaderRegion.getTotalNumberOfBuckets();
     Set<InternalDistributedMember> criticalMembers =
-        resourceManager.getResourceAdvisor().adviseCritialMembers();;
+        resourceManager.getResourceAdvisor().adviseCriticalMembers();;
     boolean removeOverRedundancy = true;
 
     debug("Building Model for rebalancing " + leaderRegion + ". redundantCopies=" + redundantCopies
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
index c6a9c30..6c8aa33 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
@@ -474,7 +474,7 @@ public class RegionAdvisor extends CacheDistributionAdvisor {
 
     } else {
       ResourceAdvisor advisor = getPartitionedRegion().getCache().getResourceAdvisor();
-      boolean sick = advisor.adviseCritialMembers().contains(member);
+      boolean sick = advisor.adviseCriticalMembers().contains(member);
       if (logger.isDebugEnabled()) {
         logger.debug("updateBucketStatus:({}):member:{}:sick:{}",
             getPartitionedRegion().bucketStringForLogs(bucketId), member, sick);
@@ -1822,7 +1822,7 @@ public class RegionAdvisor extends CacheDistributionAdvisor {
       logger.debug("RA: removing profile {}", profile);
     }
     if (getAdvisee() instanceof PartitionedRegion) {
-      ((PartitionedRegion) getAdvisee()).removeMemberFromCriticalList(profile.peerMemberId);
+      ((PartitionedRegion) getAdvisee()).removeCriticalMember(profile.peerMemberId);
     }
 
     if (this.buckets != null) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java
index 1bb0367..7415d37 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java
@@ -15,22 +15,15 @@
 package org.apache.geode.internal.cache.tier.sockets.command;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
 
-import org.apache.geode.cache.LowMemoryException;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultSender;
 import org.apache.geode.cache.operations.ExecuteFunctionOperationContext;
-import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
-import org.apache.geode.internal.cache.control.InternalResourceManager;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.cache.execute.FunctionContextImpl;
 import org.apache.geode.internal.cache.execute.FunctionStats;
 import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
@@ -159,16 +152,8 @@ public class ExecuteFunction extends BaseCommand {
               + "with context :" + context.toString());
         }
 
-        HeapMemoryMonitor hmm =
-            ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
-        if (functionObject.optimizeForWrite() && cache != null && hmm.getState().isCritical()
-            && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-          Set<DistributedMember> sm = Collections.<DistributedMember>singleton(cache.getMyId());
-          throw new LowMemoryException(
-              LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-                  .toLocalizedString(new Object[] {functionObject.getId(), sm}),
-              sm);
-        }
+        cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(functionObject,
+            cache.getMyId());
         functionObject.execute(context);
         stats.endFunctionExecution(startExecution, functionObject.hasResult());
       } catch (FunctionException functionException) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java
index 23e2f17..2e0fa38 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java
@@ -15,22 +15,15 @@
 package org.apache.geode.internal.cache.tier.sockets.command;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
 
-import org.apache.geode.cache.LowMemoryException;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultSender;
 import org.apache.geode.cache.operations.ExecuteFunctionOperationContext;
-import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
-import org.apache.geode.internal.cache.control.InternalResourceManager;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.cache.execute.AbstractExecution;
 import org.apache.geode.internal.cache.execute.FunctionContextImpl;
 import org.apache.geode.internal.cache.execute.FunctionStats;
@@ -190,16 +183,9 @@ public class ExecuteFunction65 extends BaseCommand {
               context);
         }
 
-        HeapMemoryMonitor hmm =
-            ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
-        if (functionObject.optimizeForWrite() && cache != null && hmm.getState().isCritical()
-            && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-          Set<DistributedMember> sm = Collections.singleton((DistributedMember) cache.getMyId());
-          Exception e = new LowMemoryException(
-              LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-                  .toLocalizedString(new Object[] {functionObject.getId(), sm}),
-              sm);
-
+        Exception e = cache.getInternalResourceManager().getHeapMonitor()
+            .createLowMemoryIfNeeded(functionObject, cache.getMyId());
+        if (e != null) {
           sendException(hasResult, clientMessage, e.getMessage(), serverConnection, e);
           return;
         }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java
index be4749f..6ef3576 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java
@@ -15,22 +15,18 @@
 package org.apache.geode.internal.cache.tier.sockets.command;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.geode.InternalGemFireError;
-import org.apache.geode.cache.LowMemoryException;
 import org.apache.geode.cache.client.internal.ConnectionImpl;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.operations.ExecuteFunctionOperationContext;
-import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -39,9 +35,6 @@ import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.TXStateProxy;
-import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
-import org.apache.geode.internal.cache.control.InternalResourceManager;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.cache.execute.AbstractExecution;
 import org.apache.geode.internal.cache.execute.FunctionContextImpl;
 import org.apache.geode.internal.cache.execute.FunctionStats;
@@ -229,16 +222,9 @@ public class ExecuteFunction66 extends BaseCommand {
               context);
         }
 
-        HeapMemoryMonitor hmm =
-            ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
-        if (functionObject.optimizeForWrite() && cache != null && hmm.getState().isCritical()
-            && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-          Set<DistributedMember> sm = Collections.singleton((DistributedMember) cache.getMyId());
-          Exception e = new LowMemoryException(
-              LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-                  .toLocalizedString(new Object[] {functionObject.getId(), sm}),
-              sm);
-
+        Exception e = cache.getInternalResourceManager().getHeapMonitor()
+            .createLowMemoryIfNeeded(functionObject, cache.getMyId());
+        if (e != null) {
           sendException(hasResult, clientMessage, e.getMessage(), serverConnection, e);
           return;
         }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
index 12f9dfe..2f58510 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
@@ -33,6 +34,7 @@ import org.junit.Test;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.event.BulkOperationHolder;
 import org.apache.geode.internal.cache.event.EventTracker;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
@@ -154,4 +156,34 @@ public class DistributedRegionJUnitTest extends AbstractDistributedRegionJUnitTe
     }
   }
 
+  @Test
+  public void testThatMemoryThresholdInfoRelectsStateOfRegion() {
+    InternalDistributedMember internalDM = mock(InternalDistributedMember.class);
+    DistributedRegion distRegion = prepare(true, false);
+    distRegion.addCriticalMember(internalDM);
+
+    MemoryThresholdInfo info = distRegion.getAtomicThresholdInfo();
+
+    assertThat(distRegion.isMemoryThresholdReached()).isTrue();
+    assertThat(distRegion.getAtomicThresholdInfo().getMembersThatReachedThreshold())
+        .containsExactly(internalDM);
+    assertThat(info.isMemoryThresholdReached()).isTrue();
+    assertThat(info.getMembersThatReachedThreshold()).containsExactly(internalDM);
+  }
+
+  @Test
+  public void testThatMemoryThresholdInfoDoesNotChangeWhenRegionChanges() {
+    InternalDistributedMember internalDM = mock(InternalDistributedMember.class);
+    DistributedRegion distRegion = prepare(true, false);
+
+    MemoryThresholdInfo info = distRegion.getAtomicThresholdInfo();
+    distRegion.addCriticalMember(internalDM);
+
+    assertThat(distRegion.isMemoryThresholdReached()).isTrue();
+    assertThat(distRegion.getAtomicThresholdInfo().getMembersThatReachedThreshold())
+        .containsExactly(internalDM);
+    assertThat(info.isMemoryThresholdReached()).isFalse();
+    assertThat(info.getMembersThatReachedThreshold()).isEmpty();
+  }
+
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/MemoryThresholdInfoTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/MemoryThresholdInfoTest.java
new file mode 100644
index 0000000..5a60710
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/MemoryThresholdInfoTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class MemoryThresholdInfoTest {
+
+  @Test
+  public void getNotReachedReturnsIdenticalResults() {
+    MemoryThresholdInfo info1 = MemoryThresholdInfo.getNotReached();
+    MemoryThresholdInfo info2 = MemoryThresholdInfo.getNotReached();
+
+    assertThat(info1).isSameAs(info2);
+    assertThat(info1.getMembersThatReachedThreshold())
+        .isSameAs(info2.getMembersThatReachedThreshold());
+  }
+
+  @Test
+  public void getNotReachedReturnsEmptyMembersReached() {
+    MemoryThresholdInfo info1 = MemoryThresholdInfo.getNotReached();
+
+    assertThat(info1.getMembersThatReachedThreshold()).isEmpty();
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/control/HeapMemoryMonitorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/control/HeapMemoryMonitorTest.java
new file mode 100644
index 0000000..f5ddc52
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/control/HeapMemoryMonitorTest.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.control;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.geode.cache.LowMemoryException;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({MemoryThresholds.class})
+public class HeapMemoryMonitorTest {
+
+  private HeapMemoryMonitor heapMonitor;
+  private Function function;
+  private Set memberSet;
+  private DistributedMember member;
+  private InternalDistributedMember myself;
+  private ResourceAdvisor resourceAdvisor;
+  private static final String LOW_MEMORY_REGEX =
+      "Function: null cannot be executed because the members.*are running low on memory";
+
+  @Before
+  public void setup() {
+    InternalCache internalCache = mock(InternalCache.class);
+    DistributedSystem distributedSystem = mock(DistributedSystem.class);
+    function = mock(Function.class);
+    member = mock(InternalDistributedMember.class);
+    myself = mock(InternalDistributedMember.class);
+    resourceAdvisor = mock(ResourceAdvisor.class);
+
+    when(internalCache.getDistributedSystem()).thenReturn(distributedSystem);
+    when(internalCache.getDistributionAdvisor()).thenReturn(resourceAdvisor);
+    when(internalCache.getMyId()).thenReturn(myself);
+
+    heapMonitor = new HeapMemoryMonitor(null, internalCache, null);
+    memberSet = new HashSet<>();
+    memberSet.add(member);
+    heapMonitor.setMostRecentEvent(new MemoryEvent(InternalResourceManager.ResourceType.HEAP_MEMORY,
+        MemoryThresholds.MemoryState.DISABLED, MemoryThresholds.MemoryState.DISABLED, null, 0L,
+        true, null)); // myself is not critical
+  }
+
+  // ========== tests for getHeapCriticalMembersFrom ==========
+  @Test
+  public void getHeapCriticalMembersFrom_WithEmptyCriticalMembersReturnsEmptySet() {
+    getHeapCriticalMembersFrom_returnsEmptySet(Collections.emptySet(), memberSet);
+  }
+
+  @Test
+  public void getHeapCriticalMembersFrom_WithEmptyArgReturnsEmptySet() {
+    getHeapCriticalMembersFrom_returnsEmptySet(memberSet, Collections.emptySet());
+  }
+
+  @Test
+  public void getHeapCriticalMembersFromWithEmptySetsReturnsEmptySet() {
+    getHeapCriticalMembersFrom_returnsEmptySet(Collections.emptySet(), Collections.emptySet());
+  }
+
+  @Test
+  public void getHeapCriticalMembersFrom_WithDisjointSetsReturnsEmptySet() {
+    Set argSet = new HashSet();
+    argSet.add(mock(InternalDistributedMember.class));
+
+    getHeapCriticalMembersFrom_returnsEmptySet(memberSet, argSet);
+  }
+
+  @Test
+  public void getHeapCriticalMembersFrom_WithEqualSetsReturnsMember() {
+    getHeapCriticalMembersFrom_returnsNonEmptySet(memberSet, Collections.unmodifiableSet(memberSet),
+        new HashSet(memberSet));
+  }
+
+  @Test
+  public void getHeapCriticalMembersFrom_ReturnsMultipleMembers() {
+    DistributedMember member1 = mock(InternalDistributedMember.class);
+    DistributedMember member2 = mock(InternalDistributedMember.class);
+    DistributedMember member3 = mock(InternalDistributedMember.class);
+    DistributedMember member4 = mock(InternalDistributedMember.class);
+    Set advisorSet = new HashSet();
+    advisorSet.add(member1);
+    advisorSet.add(member2);
+    advisorSet.add(member4);
+    Set argSet = new HashSet(memberSet);
+    argSet.add(member1);
+    argSet.add(member3);
+    argSet.add(member4);
+    Set expectedResult = new HashSet();
+    expectedResult.add(member1);
+    expectedResult.add(member4);
+
+    getHeapCriticalMembersFrom_returnsNonEmptySet(advisorSet, argSet, expectedResult);
+  }
+
+  @Test
+  public void getHeapCriticalMembersFrom_DoesNotReturnMyselfWhenNotCritical() {
+    Set expectedResult = new HashSet(memberSet);
+    Set advisorSet = new HashSet(memberSet);
+    memberSet.add(myself);
+
+    getHeapCriticalMembersFrom_returnsNonEmptySet(advisorSet,
+        Collections.unmodifiableSet(memberSet),
+        expectedResult);
+  }
+
+  @Test
+  public void getHeapCriticalMembersFrom_IncludesMyselfWhenCritical() throws Exception {
+    Set advisorSet = new HashSet(memberSet);
+    heapMonitor.setMostRecentEvent(new MemoryEvent(InternalResourceManager.ResourceType.HEAP_MEMORY,
+        MemoryThresholds.MemoryState.DISABLED, MemoryThresholds.MemoryState.CRITICAL, null, 0L,
+        true, null));
+    memberSet.add(myself);
+
+    getHeapCriticalMembersFrom_returnsNonEmptySet(advisorSet,
+        Collections.unmodifiableSet(memberSet),
+        new HashSet(memberSet));
+  }
+
+  // ========== tests for createLowMemoryIfNeeded (with Set argument) ==========
+  @Test
+  public void createLowMemoryIfNeededWithSetArg_ReturnsNullWhenNotOptimizedForWrite()
+      throws Exception {
+    createLowMemoryIfNeededWithSetArg_returnsNull(false, false, memberSet);
+  }
+
+  @Test
+  public void createLowMemoryIfNeededWithSetArg_ReturnsNullWhenLowMemoryExceptionDisabled()
+      throws Exception {
+    createLowMemoryIfNeededWithSetArg_returnsNull(true, true, memberSet);
+  }
+
+  @Test
+  public void createLowMemoryIfNeededWithSetArg_ReturnsNullWhenNoCriticalMembers()
+      throws Exception {
+    createLowMemoryIfNeededWithSetArg_returnsNull(true, false, Collections.emptySet());
+  }
+
+  @Test
+  public void createLowMemoryIfNeededWithSetArg_ReturnsException() throws Exception {
+    setMocking(true, false, memberSet);
+
+    LowMemoryException exception = heapMonitor.createLowMemoryIfNeeded(function, memberSet);
+
+    assertLowMemoryException(exception);
+  }
+
+  // ========== tests for createLowMemoryIfNeeded (with DistributedMember argument) ==========
+  @Test
+  public void createLowMemoryIfNeededWithMemberArg_ReturnsNullWhenNotOptimizedForWrite()
+      throws Exception {
+    createLowMemoryIfNeededWithMemberArg_returnsNull(false, false, member);
+  }
+
+  @Test
+  public void createLowMemoryIfNeededWithMemberArg_ReturnsNullWhenLowMemoryExceptionDisabled()
+      throws Exception {
+    createLowMemoryIfNeededWithMemberArg_returnsNull(true, true, member);
+  }
+
+  @Test
+  public void createLowMemoryIfNeededWithMemberArg_ReturnsNullWhenNoCriticalMembers()
+      throws Exception {
+    createLowMemoryIfNeededWithMemberArg_returnsNull(true, false, member);
+  }
+
+  @Test
+  public void createLowMemoryIfNeededWithMemberArg_ReturnsException() throws Exception {
+    setMocking(true, false, memberSet);
+
+    LowMemoryException exception = heapMonitor.createLowMemoryIfNeeded(function, member);
+
+    assertLowMemoryException(exception);
+  }
+
+  // ========== tests for checkForLowMemory (with Set argument) ==========
+  @Test
+  public void checkForLowMemoryWithSetArg_DoesNotThrowWhenNotOptimizedForWrite() throws Exception {
+    checkForLowMemoryWithSetArg_doesNotThrow(false, false, memberSet);
+  }
+
+  @Test
+  public void checkForLowMemoryWithSetArg_DoesNotThrowWhenLowMemoryExceptionDisabled()
+      throws Exception {
+    checkForLowMemoryWithSetArg_doesNotThrow(true, true, memberSet);
+  }
+
+  @Test
+  public void checkForLowMemoryWithSetArg_DoesNotThrowWhenNoCriticalMembers() throws Exception {
+    checkForLowMemoryWithSetArg_doesNotThrow(true, false, Collections.emptySet());
+  }
+
+  @Test
+  public void checkForLowMemoryWithSetArg_ThrowsLowMemoryException() throws Exception {
+    setMocking(true, false, memberSet);
+
+    assertThatThrownBy(() -> heapMonitor.checkForLowMemory(function, memberSet))
+        .isExactlyInstanceOf(LowMemoryException.class);
+  }
+
+  // ========== tests for checkForLowMemory (with DistributedMember argument) ==========
+  @Test
+  public void checkForLowMemoryIfNeededWithMemberArg_ReturnsNullWhenNotOptimizedForWrite()
+      throws Exception {
+    checkForLowMemoryWithMemberArg_doesNotThrow(false, false, member);
+  }
+
+  @Test
+  public void checkForLowMemoryIfNeededWithMemberArg_ReturnsNullWhenLowMemoryExceptionDisabled()
+      throws Exception {
+    checkForLowMemoryWithMemberArg_doesNotThrow(true, true, member);
+  }
+
+  @Test
+  public void checkForLowMemoryIfNeededWithMemberArg_ReturnsNullWhenNoCriticalMembers()
+      throws Exception {
+    checkForLowMemoryWithMemberArg_doesNotThrow(true, false, member);
+  }
+
+  @Test
+  public void checkForLowMemoryWithMemberArg_ReturnsException() throws Exception {
+    setMocking(true, false, memberSet);
+
+    assertThatThrownBy(() -> heapMonitor.checkForLowMemory(function, member))
+        .isExactlyInstanceOf(LowMemoryException.class).hasMessageMatching(LOW_MEMORY_REGEX);
+  }
+
+  // ========== private methods ==========
+  private void getHeapCriticalMembersFrom_returnsEmptySet(Set adviseCriticalMembers, Set argSet) {
+    when(resourceAdvisor.adviseCriticalMembers()).thenReturn(adviseCriticalMembers);
+
+    Set<DistributedMember> criticalMembers = heapMonitor.getHeapCriticalMembersFrom(argSet);
+
+    assertThat(criticalMembers).isEmpty();
+  }
+
+  private void getHeapCriticalMembersFrom_returnsNonEmptySet(Set adviseCriticalMembers, Set argSet,
+      Set expectedResult) {
+    when(resourceAdvisor.adviseCriticalMembers()).thenReturn(adviseCriticalMembers);
+
+    Set<DistributedMember> criticalMembers = heapMonitor.getHeapCriticalMembersFrom(argSet);
+
+    assertThat(criticalMembers).containsAll(expectedResult);
+  }
+
+  private void createLowMemoryIfNeededWithSetArg_returnsNull(boolean optimizeForWrite,
+      boolean isLowMemoryExceptionDisabled, Set memberSetArg) throws Exception {
+    setMocking(optimizeForWrite, isLowMemoryExceptionDisabled, memberSetArg);
+
+    LowMemoryException exception = heapMonitor.createLowMemoryIfNeeded(function, memberSetArg);
+
+    assertThat(exception).isNull();
+  }
+
+  private void createLowMemoryIfNeededWithMemberArg_returnsNull(boolean optimizeForWrite,
+      boolean isLowMemoryExceptionDisabled, DistributedMember memberArg) throws Exception {
+    setMocking(optimizeForWrite, isLowMemoryExceptionDisabled, Collections.emptySet());
+
+    LowMemoryException exception = heapMonitor.createLowMemoryIfNeeded(function, memberArg);
+
+    assertThat(exception).isNull();
+  }
+
+  private void checkForLowMemoryWithSetArg_doesNotThrow(boolean optimizeForWrite,
+      boolean isLowMemoryExceptionDisabled, Set memberSetArg) throws Exception {
+    setMocking(optimizeForWrite, isLowMemoryExceptionDisabled, memberSetArg);
+
+    heapMonitor.checkForLowMemory(function, memberSetArg);
+  }
+
+  private void checkForLowMemoryWithMemberArg_doesNotThrow(boolean optimizeForWrite,
+      boolean isLowMemoryExceptionDisabled, DistributedMember memberArg) throws Exception {
+    setMocking(optimizeForWrite, isLowMemoryExceptionDisabled, Collections.emptySet());
+
+    heapMonitor.checkForLowMemory(function, memberArg);
+  }
+
+  private void setMocking(boolean optimizeForWrite, boolean isLowMemoryExceptionDisabled,
+      Set argSet) throws Exception {
+    when(function.optimizeForWrite()).thenReturn(optimizeForWrite);
+    setIsLowMemoryExceptionDisabled(isLowMemoryExceptionDisabled);
+    when(resourceAdvisor.adviseCriticalMembers()).thenReturn(argSet);
+  }
+
+  private void assertLowMemoryException(LowMemoryException exception) {
+    assertThat(exception).isExactlyInstanceOf(LowMemoryException.class);
+    assertThat(exception.getMessage()).containsPattern(LOW_MEMORY_REGEX);
+  }
+
+  private void setIsLowMemoryExceptionDisabled(boolean isLowMemoryExceptionDisabled)
+      throws Exception {
+    PowerMockito.mockStatic(MemoryThresholds.class);
+    PowerMockito.when(MemoryThresholds.class, MemoryThresholds.isLowMemoryExceptionDisabled())
+        .thenReturn(isLowMemoryExceptionDisabled);
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66Test.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66Test.java
index a61e49f..fc234c2 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66Test.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66Test.java
@@ -45,6 +45,7 @@ import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.ServerSideHandshake;
@@ -124,6 +125,7 @@ public class ExecuteFunction66Test {
     when(this.cache.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
     when(this.cache.getDistributedSystem()).thenReturn(mock(InternalDistributedSystem.class));
     when(this.cache.getResourceManager()).thenReturn(this.internalResourceManager);
+    when(this.cache.getInternalResourceManager()).thenReturn(this.internalResourceManager);
 
     when(this.callbackArgPart.getObject()).thenReturn(CALLBACK_ARG);
 
@@ -149,6 +151,8 @@ public class ExecuteFunction66Test {
     when(this.serverConnection.getAcceptor()).thenReturn(this.acceptor);
     when(this.serverConnection.getHandshake()).thenReturn(mock(ServerSideHandshake.class));
 
+    when(this.internalResourceManager.getHeapMonitor()).thenReturn(mock(HeapMemoryMonitor.class));
+
     PowerMockito.mockStatic(FunctionService.class);
     PowerMockito.when(FunctionService.getFunction(eq(FUNCTION))).thenReturn(this.functionObject);
   }