You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by lg...@apache.org on 2018/08/24 15:58:32 UTC
[geode] branch develop updated: GEODE-5562 Reading the values of
LocalRegion.memoryThresholdReached and
DistributedRegion.memoryThresholdReachedMembers must be atomic (#2320)
This is an automated email from the ASF dual-hosted git repository.
lgallinat pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 583d141 GEODE-5562 Reading the values of LocalRegion.memoryThresholdReached and DistributedRegion.memoryThresholdReachedMembers must be atomic (#2320)
583d141 is described below
commit 583d1416644a8392ed1bf257b87025a9071aaa55
Author: Lynn Gallinat <lg...@pivotal.io>
AuthorDate: Fri Aug 24 08:58:27 2018 -0700
GEODE-5562 Reading the values of LocalRegion.memoryThresholdReached and DistributedRegion.memoryThresholdReachedMembers must be atomic (#2320)
* GEODE-5562 Reading the values of LocalRegion.memoryThresholdReached and
DistributedRegion.memoryThresholdReachedMembers must be atomic.
---
.../management/MemoryThresholdsDUnitTest.java | 2 +-
.../MemoryThresholdsOffHeapDUnitTest.java | 18 +-
.../cache/query/internal/DefaultQueryService.java | 17 +-
.../internal/cache/CacheDistributionAdvisor.java | 2 +-
.../geode/internal/cache/DistributedRegion.java | 41 +--
.../geode/internal/cache/InternalRegion.java | 2 +
.../apache/geode/internal/cache/LocalRegion.java | 63 ++--
.../geode/internal/cache/MemoryThresholdInfo.java | 53 ++++
.../geode/internal/cache/PartitionedRegion.java | 29 +-
.../internal/cache/control/HeapMemoryMonitor.java | 70 ++++-
.../internal/cache/control/ResourceAdvisor.java | 2 +-
.../execute/DistributedRegionFunctionExecutor.java | 16 +-
.../cache/execute/MemberFunctionExecutor.java | 19 +-
.../cache/execute/MultiRegionFunctionExecutor.java | 32 +-
.../execute/PartitionedRegionFunctionExecutor.java | 17 +-
.../partitioned/PartitionedRegionRebalanceOp.java | 2 +-
.../internal/cache/partitioned/RegionAdvisor.java | 4 +-
.../tier/sockets/command/ExecuteFunction.java | 19 +-
.../tier/sockets/command/ExecuteFunction65.java | 20 +-
.../tier/sockets/command/ExecuteFunction66.java | 20 +-
.../internal/cache/DistributedRegionJUnitTest.java | 32 ++
.../internal/cache/MemoryThresholdInfoTest.java | 40 +++
.../cache/control/HeapMemoryMonitorTest.java | 325 +++++++++++++++++++++
.../sockets/command/ExecuteFunction66Test.java | 4 +
24 files changed, 638 insertions(+), 211 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsDUnitTest.java
index d8cfec4..79c522e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsDUnitTest.java
@@ -303,7 +303,7 @@ public class MemoryThresholdsDUnitTest extends ClientServerTestCase {
public boolean done() {
DistributedRegion dr = (DistributedRegion) getRootRegion().getSubregion(regionName);
- return dr.getMemoryThresholdReachedMembers().size() == 0;
+ return dr.getAtomicThresholdInfo().getMembersThatReachedThreshold().size() == 0;
}
};
Wait.waitForCriterion(wc, 30000, 10, true);
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsOffHeapDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsOffHeapDUnitTest.java
index 330b974..907456c 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsOffHeapDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsOffHeapDUnitTest.java
@@ -494,7 +494,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
public boolean done() {
DistributedRegion dr = (DistributedRegion) getRootRegion().getSubregion(regionName);
- return dr.getMemoryThresholdReachedMembers().size() == 0;
+ return dr.getAtomicThresholdInfo().getMembersThatReachedThreshold().size() == 0;
}
};
Wait.waitForCriterion(wc, 10000, 10, true);
@@ -602,7 +602,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
}
public boolean done() {
- return r.memoryThresholdReached.get();
+ return r.isMemoryThresholdReached();
}
};
Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -618,7 +618,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
}
public boolean done() {
- return !r.memoryThresholdReached.get();
+ return !r.isMemoryThresholdReached();
}
};
Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -670,7 +670,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
}
public boolean done() {
- return r.memoryThresholdReached.get();
+ return r.isMemoryThresholdReached();
}
};
Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -686,7 +686,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
}
public boolean done() {
- return !r.memoryThresholdReached.get();
+ return !r.isMemoryThresholdReached();
}
};
Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -1235,7 +1235,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
}
public boolean done() {
- return r.memoryThresholdReached.get();
+ return r.isMemoryThresholdReached();
}
};
Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -1258,7 +1258,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
}
public boolean done() {
- return !r.memoryThresholdReached.get();
+ return !r.isMemoryThresholdReached();
}
};
Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -1514,12 +1514,12 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
@Override
public String description() {
return "Expected to go critical: isCritical=" + ohm.getState().isCritical()
- + " memoryThresholdReached=" + r.memoryThresholdReached.get();
+ + " memoryThresholdReached=" + r.isMemoryThresholdReached();
}
@Override
public boolean done() {
- return ohm.getState().isCritical() && r.memoryThresholdReached.get();
+ return ohm.getState().isCritical() && r.isMemoryThresholdReached();
}
};
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
index 2d15795..e476d45 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
@@ -68,7 +68,9 @@ import org.apache.geode.cache.query.internal.parse.OQLLexerTokenTypes;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.MemoryThresholdInfo;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.control.MemoryThresholds;
import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -226,12 +228,15 @@ public class DefaultQueryService implements InternalQueryService {
// UnsupportedOperationException(LocalizedStrings.DefaultQueryService_INDEX_CREATION_IS_NOT_SUPPORTED_FOR_REGIONS_WHICH_OVERFLOW_TO_DISK_THE_REGION_INVOLVED_IS_0.toLocalizedString(regionPath));
// }
// if its a pr the create index on all of the local buckets.
- if (((LocalRegion) region).memoryThresholdReached.get()
- && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
- LocalRegion lr = (LocalRegion) region;
- throw new LowMemoryException(
- LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_INDEX.toLocalizedString(region.getName()),
- lr.getMemoryThresholdReachedMembers());
+ if (!MemoryThresholds.isLowMemoryExceptionDisabled()) {
+ InternalRegion internalRegion = (InternalRegion) region;
+ MemoryThresholdInfo info = internalRegion.getAtomicThresholdInfo();
+ if (info.isMemoryThresholdReached()) {
+ throw new LowMemoryException(
+ LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_INDEX
+ .toLocalizedString(region.getName()),
+ info.getMembersThatReachedThreshold());
+ }
}
if (region instanceof PartitionedRegion) {
try {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
index 6a726ce..2a74e7e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
@@ -1103,7 +1103,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
logger.debug("CDA: removing profile {}", profile);
}
if (getAdvisee() instanceof LocalRegion && profile != null) {
- ((LocalRegion) getAdvisee()).removeMemberFromCriticalList(profile.getDistributedMember());
+ ((LocalRegion) getAdvisee()).removeCriticalMember(profile.getDistributedMember());
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 9b395fc..d9f5903 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -1113,7 +1112,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
/**
* A reference counter to protected the memoryThresholdReached boolean
*/
- private final Set<DistributedMember> memoryThresholdReachedMembers = new CopyOnWriteArraySet<>();
+ private final Set<DistributedMember> memoryThresholdReachedMembers = new HashSet<>();
// TODO: cleanup getInitialImageAndRecovery
private void getInitialImageAndRecovery(InputStream snapshotInputStream,
@@ -3751,42 +3750,35 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
if (event.getState().isCritical() && !event.getPreviousState().isCritical()
&& (event.getType() == ResourceType.HEAP_MEMORY
|| (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
- setMemoryThresholdReachedCounterTrue(event.getMember());
+ addCriticalMember(event.getMember());
} else if (!event.getState().isCritical() && event.getPreviousState().isCritical()
&& (event.getType() == ResourceType.HEAP_MEMORY
|| (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
- removeMemberFromCriticalList(event.getMember());
+ removeCriticalMember(event.getMember());
}
}
}
@Override
- public void removeMemberFromCriticalList(DistributedMember member) {
+ public void removeCriticalMember(DistributedMember member) {
if (logger.isDebugEnabled()) {
logger.debug("DR: removing member {} from critical member list", member);
}
synchronized (this.memoryThresholdReachedMembers) {
this.memoryThresholdReachedMembers.remove(member);
- if (this.memoryThresholdReachedMembers.size() == 0) {
- memoryThresholdReached.set(false);
+ if (this.memoryThresholdReachedMembers.isEmpty()) {
+ setMemoryThresholdReached(false);
}
}
}
@Override
- public Set<DistributedMember> getMemoryThresholdReachedMembers() {
- synchronized (this.memoryThresholdReachedMembers) {
- return Collections.unmodifiableSet(this.memoryThresholdReachedMembers);
- }
- }
-
- @Override
public void initialCriticalMembers(boolean localMemoryIsCritical,
Set<InternalDistributedMember> criticalMembers) {
Set<InternalDistributedMember> others = getCacheDistributionAdvisor().adviseGeneric();
for (InternalDistributedMember idm : criticalMembers) {
if (others.contains(idm)) {
- setMemoryThresholdReachedCounterTrue(idm);
+ addCriticalMember(idm);
}
}
}
@@ -3794,12 +3786,23 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
/**
* @param idm member whose threshold has been exceeded
*/
- private void setMemoryThresholdReachedCounterTrue(final DistributedMember idm) {
+ protected void addCriticalMember(final DistributedMember idm) {
synchronized (this.memoryThresholdReachedMembers) {
- this.memoryThresholdReachedMembers.add(idm);
- if (this.memoryThresholdReachedMembers.size() > 0) {
- memoryThresholdReached.set(true);
+ if (this.memoryThresholdReachedMembers.isEmpty()) {
+ setMemoryThresholdReached(true);
}
+ this.memoryThresholdReachedMembers.add(idm);
+ }
+ }
+
+ @Override
+ public MemoryThresholdInfo getAtomicThresholdInfo() {
+ if (!isMemoryThresholdReached()) {
+ return MemoryThresholdInfo.getNotReached();
+ }
+ synchronized (memoryThresholdReachedMembers) {
+ return new MemoryThresholdInfo(isMemoryThresholdReached(),
+ new HashSet<DistributedMember>(memoryThresholdReachedMembers));
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
index 8b887ef..78a664e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -408,4 +408,6 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo
default void handleWANEvent(EntryEventImpl event) {}
+ MemoryThresholdInfo getAtomicThresholdInfo();
+
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 482bf0c..92c2e3c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -488,7 +488,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* This boolean is true when a member who has this region is running low on memory. It is used to
* reject region operations.
*/
- public final AtomicBoolean memoryThresholdReached = new AtomicBoolean(false);
+ private final AtomicBoolean memoryThresholdReached = new AtomicBoolean(false);
/**
* Lock for updating PR MetaData on client side
@@ -1023,13 +1023,13 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
if (!newRegion.getOffHeap()) {
newRegion.initialCriticalMembers(
this.cache.getInternalResourceManager().getHeapMonitor().getState().isCritical(),
- this.cache.getResourceAdvisor().adviseCritialMembers());
+ this.cache.getResourceAdvisor().adviseCriticalMembers());
} else {
newRegion.initialCriticalMembers(
this.cache.getInternalResourceManager().getHeapMonitor().getState().isCritical()
|| this.cache.getInternalResourceManager().getOffHeapMonitor().getState()
.isCritical(),
- this.cache.getResourceAdvisor().adviseCritialMembers());
+ this.cache.getResourceAdvisor().adviseCriticalMembers());
}
// synchronization would be done on ManagementAdapter.regionOpLock
@@ -2912,7 +2912,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
protected boolean isMemoryThresholdReachedForLoad() {
- return this.memoryThresholdReached.get();
+ return isMemoryThresholdReached();
}
/**
@@ -5736,14 +5736,13 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* @throws LowMemoryException if the target member for this operation is sick
*/
private void checkIfAboveThreshold(final Object key) throws LowMemoryException {
- if (this.memoryThresholdReached.get()) {
- Set<DistributedMember> membersThatReachedThreshold = getMemoryThresholdReachedMembers();
-
+ MemoryThresholdInfo info = getAtomicThresholdInfo();
+ if (info.isMemoryThresholdReached()) {
+ Set<DistributedMember> membersThatReachedThreshold = info.getMembersThatReachedThreshold();
// #45603: trigger a background eviction since we're above the the critical
// threshold
InternalResourceManager.getInternalResourceManager(this.cache).getHeapMonitor()
.updateStateAndSendEvent();
-
throw new LowMemoryException(
LocalizedStrings.ResourceManager_LOW_MEMORY_IN_0_FOR_PUT_1_MEMBER_2.toLocalizedString(
getFullPath(), key, membersThatReachedThreshold),
@@ -5751,6 +5750,15 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
+ @Override
+ public MemoryThresholdInfo getAtomicThresholdInfo() {
+ if (!isMemoryThresholdReached()) {
+ return MemoryThresholdInfo.getNotReached();
+ }
+ return new MemoryThresholdInfo(isMemoryThresholdReached(),
+ Collections.singleton(this.cache.getMyId()));
+ }
+
/**
* Allows null as new value to accommodate create with a null value.
*
@@ -10726,13 +10734,16 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
final Function function, final Object args, final ResultCollector rc, final Set filter,
final ServerToClientFunctionResultSender sender) {
- if (function.optimizeForWrite() && this.memoryThresholdReached.get()
+ if (function.optimizeForWrite()
&& !MemoryThresholds.isLowMemoryExceptionDisabled()) {
- Set<DistributedMember> members = getMemoryThresholdReachedMembers();
- throw new LowMemoryException(
- LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(function.getId(), members),
- members);
+ MemoryThresholdInfo info = getAtomicThresholdInfo();
+ if (info.isMemoryThresholdReached()) {
+ Set<DistributedMember> members = info.getMembersThatReachedThreshold();
+ throw new LowMemoryException(
+ LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
+ .toLocalizedString(function.getId(), members),
+ members);
+ }
}
final LocalResultCollector<?, ?> resultCollector =
execution.getLocalResultCollector(function, rc);
@@ -10747,13 +10758,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return resultCollector;
}
- /**
- * @return the set of members which are known to be critical
- */
- public Set<DistributedMember> getMemoryThresholdReachedMembers() {
- return Collections.singleton(this.cache.getMyId());
- }
-
@Override
public void onEvent(MemoryEvent event) {
if (logger.isDebugEnabled()) {
@@ -10769,11 +10773,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
&& (event.getType() == ResourceType.HEAP_MEMORY
|| (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
// start rejecting operations
- this.memoryThresholdReached.set(true);
+ setMemoryThresholdReached(true);
} else if (!event.getState().isCritical() && event.getPreviousState().isCritical()
&& (event.getType() == ResourceType.HEAP_MEMORY
|| (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
- this.memoryThresholdReached.set(false);
+ setMemoryThresholdReached(false);
}
}
}
@@ -10836,7 +10840,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* This method is meant to be overridden by DistributedRegion and PartitionedRegions to cleanup
* CRITICAL state
*/
- public void removeMemberFromCriticalList(DistributedMember member) {
+ public void removeCriticalMember(DistributedMember member) {
// should not be called for LocalRegion
Assert.assertTrue(false);
}
@@ -10857,7 +10861,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
Set<InternalDistributedMember> criticalMembers) {
assert getScope().isLocal();
if (localMemoryIsCritical) {
- this.memoryThresholdReached.set(true);
+ setMemoryThresholdReached(true);
}
}
@@ -12252,4 +12256,13 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
public Lock getClientMetaDataLock() {
return clientMetaDataLock;
}
+
+ public boolean isMemoryThresholdReached() {
+ return memoryThresholdReached.get();
+ }
+
+ protected void setMemoryThresholdReached(boolean reached) {
+ this.memoryThresholdReached.set(reached);
+ }
+
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/MemoryThresholdInfo.java b/geode-core/src/main/java/org/apache/geode/internal/cache/MemoryThresholdInfo.java
new file mode 100644
index 0000000..3735a0d
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/MemoryThresholdInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.geode.distributed.DistributedMember;
+
+public class MemoryThresholdInfo {
+ private final boolean memoryThresholdReached;
+ private final Set<DistributedMember> membersThatReachedThreshold;
+ private static final MemoryThresholdInfo NOT_REACHED = new MemoryThresholdInfo(false,
+ Collections.EMPTY_SET);
+
+ MemoryThresholdInfo(boolean memoryThresholdReached,
+ Set<DistributedMember> membersThatReachedThreshold) {
+ this.memoryThresholdReached = memoryThresholdReached;
+ this.membersThatReachedThreshold = membersThatReachedThreshold;
+ }
+
+ public Set<DistributedMember> getMembersThatReachedThreshold() {
+ return membersThatReachedThreshold;
+ }
+
+ public boolean isMemoryThresholdReached() {
+ return memoryThresholdReached;
+ }
+
+ static MemoryThresholdInfo getNotReached() {
+ return NOT_REACHED;
+ }
+
+ @Override
+ public String toString() {
+ return "MemoryThresholdInfo{" +
+ "memoryThresholdReached=" + memoryThresholdReached +
+ ", membersThatReachedThreshold=" + membersThatReachedThreshold +
+ '}';
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 77175a3..86b35dc 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -154,17 +154,13 @@ import org.apache.geode.distributed.internal.membership.MemberAttributes;
import org.apache.geode.i18n.StringId;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.NanoTimer;
-import org.apache.geode.internal.SetUtils;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.BucketAdvisor.ServerBucketProfile;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
import org.apache.geode.internal.cache.DestroyPartitionedRegionMessage.DestroyPartitionedRegionResponse;
import org.apache.geode.internal.cache.PutAllPartialResultException.PutAllPartialResult;
-import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
-import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
import org.apache.geode.internal.cache.control.MemoryEvent;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
import org.apache.geode.internal.cache.eviction.EvictionController;
import org.apache.geode.internal.cache.eviction.HeapEvictor;
import org.apache.geode.internal.cache.execute.AbstractExecution;
@@ -3617,16 +3613,7 @@ public class PartitionedRegion extends LocalRegion
InternalDistributedMember targetNode = null;
if (function.optimizeForWrite()) {
targetNode = createBucket(bucketId, 0, null /* retryTimeKeeper */);
- HeapMemoryMonitor hmm =
- ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
- if (hmm.isMemberHeapCritical(targetNode)
- && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
- Set<DistributedMember> sm = Collections.singleton((DistributedMember) targetNode);
- throw new LowMemoryException(
- LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(function.getId(), sm),
- sm);
- }
+ cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function, targetNode);
} else {
targetNode = getOrCreateNodeForBucketRead(bucketId);
}
@@ -3796,16 +3783,8 @@ public class PartitionedRegion extends LocalRegion
}
Set<InternalDistributedMember> dest = memberToBuckets.keySet();
- if (function.optimizeForWrite()
- && cache.getInternalResourceManager().getHeapMonitor().containsHeapCriticalMembers(dest)
- && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
- Set<InternalDistributedMember> hcm = cache.getResourceAdvisor().adviseCritialMembers();
- Set<DistributedMember> sm = SetUtils.intersection(hcm, dest);
- throw new LowMemoryException(
- LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(function.getId(), sm),
- sm);
- }
+ cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function,
+ Collections.unmodifiableSet(dest));
boolean isSelf = false;
execution.setExecutionNodes(dest);
@@ -9295,7 +9274,7 @@ public class PartitionedRegion extends LocalRegion
}
@Override
- public void removeMemberFromCriticalList(DistributedMember member) {
+ public void removeCriticalMember(DistributedMember member) {
if (logger.isDebugEnabled()) {
logger.debug("PR: removing member {} from critical member list", member);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
index f187f2e..5bbc27a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
@@ -18,6 +18,8 @@ import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryType;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -36,9 +38,11 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.LowMemoryException;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.SetUtils;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
import org.apache.geode.internal.cache.control.MemoryThresholds.MemoryState;
@@ -733,20 +737,54 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
});
}
- /**
- * Given a set of members, determine if any member in the set is above critical threshold.
- *
- * @param members The set of members to check.
- * @return True if the set contains a member above critical threshold, false otherwise
- */
- public boolean containsHeapCriticalMembers(final Set<InternalDistributedMember> members) {
- if (members.contains(this.cache.getMyId()) && this.mostRecentEvent.getState().isCritical()) {
- return true;
+ protected Set<DistributedMember> getHeapCriticalMembersFrom(Set<DistributedMember> members) {
+ Set<DistributedMember> criticalMembers = getCriticalMembers();
+ criticalMembers.retainAll(members);
+ return criticalMembers;
+ }
+
+ private Set<DistributedMember> getCriticalMembers() {
+ Set<DistributedMember> criticalMembers = new HashSet<>(resourceAdvisor.adviseCriticalMembers());
+ if (this.mostRecentEvent.getState().isCritical()) {
+ criticalMembers.add(cache.getMyId());
+ }
+ return criticalMembers;
+ }
+
+ public void checkForLowMemory(Function function, DistributedMember targetMember) {
+ Set<DistributedMember> targetMembers = Collections.singleton(targetMember);
+ checkForLowMemory(function, targetMembers);
+ }
+
+ public void checkForLowMemory(Function function, Set<DistributedMember> dest) {
+ LowMemoryException exception = createLowMemoryIfNeeded(function, dest);
+ if (exception != null) {
+ throw exception;
}
+ }
+
+ public LowMemoryException createLowMemoryIfNeeded(Function function,
+ DistributedMember targetMember) {
+ Set<DistributedMember> targetMembers = Collections.singleton(targetMember);
+ return createLowMemoryIfNeeded(function, targetMembers);
+ }
- return SetUtils.intersectsWith(members, this.resourceAdvisor.adviseCritialMembers());
+ public LowMemoryException createLowMemoryIfNeeded(Function function,
+ Set<DistributedMember> memberSet) {
+ if (function.optimizeForWrite()
+ && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
+ Set<DistributedMember> criticalMembersFrom = getHeapCriticalMembersFrom(memberSet);
+ if (!criticalMembersFrom.isEmpty()) {
+ return new LowMemoryException(
+ LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
+ .toLocalizedString(function.getId(), criticalMembersFrom),
+ criticalMembersFrom);
+ }
+ }
+ return null;
}
+
/**
* Determines if the given member is in a heap critical state.
*
@@ -761,6 +799,16 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
return this.resourceAdvisor.isHeapCritical(member);
}
+ protected MemoryEvent getMostRecentEvent() {
+ return mostRecentEvent;
+ }
+
+ protected HeapMemoryMonitor setMostRecentEvent(
+ MemoryEvent mostRecentEvent) {
+ this.mostRecentEvent = mostRecentEvent;
+ return this;
+ }
+
class LocalHeapStatListener implements LocalStatListener {
/*
* (non-Javadoc)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/ResourceAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/ResourceAdvisor.java
index 46e8a82..84da640 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/ResourceAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/ResourceAdvisor.java
@@ -425,7 +425,7 @@ public class ResourceAdvisor extends DistributionAdvisor {
*
* @return a mutable set of members in the critical state otherwise {@link Collections#EMPTY_SET}
*/
- public Set<InternalDistributedMember> adviseCritialMembers() {
+ public Set<InternalDistributedMember> adviseCriticalMembers() {
return adviseFilter(new Filter() {
@Override
public boolean include(Profile profile) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java
index e48d79c..46ac4ea 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java
@@ -29,6 +29,8 @@ import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.MemoryThresholdInfo;
+import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.MemoryThresholds;
import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -318,15 +320,15 @@ public class DistributedRegionFunctionExecutor extends AbstractExecution {
}
}
if (!MemoryThresholds.isLowMemoryExceptionDisabled() && function.optimizeForWrite()) {
- try {
- region.checkIfAboveThreshold(null);
- } catch (LowMemoryException ignore) {
- Set<DistributedMember> htrm = region.getMemoryThresholdReachedMembers();
+ MemoryThresholdInfo info = region.getAtomicThresholdInfo();
+ if (info.isMemoryThresholdReached()) {
+ InternalResourceManager.getInternalResourceManager(region.getCache()).getHeapMonitor()
+ .updateStateAndSendEvent();
+ Set<DistributedMember> criticalMembers = info.getMembersThatReachedThreshold();
throw new LowMemoryException(
LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(function.getId(), htrm),
- htrm);
-
+ .toLocalizedString(function.getId(), criticalMembers),
+ criticalMembers);
}
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
index f6f1098..16f74f8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
@@ -20,7 +20,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
-import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.TransactionDataNotColocatedException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.execute.Execution;
@@ -34,10 +33,8 @@ import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.SetUtils;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
import org.apache.geode.internal.i18n.LocalizedStrings;
public class MemberFunctionExecutor extends AbstractExecution {
@@ -161,7 +158,10 @@ public class MemberFunctionExecutor extends AbstractExecution {
@Override
public void validateExecution(final Function function, final Set dest) {
final InternalCache cache = GemFireCacheImpl.getInstance();
- if (cache != null && cache.getTxManager().getTXState() != null) {
+ if (cache == null) {
+ return;
+ }
+ if (cache.getTxManager().getTXState() != null) {
if (dest.size() > 1) {
throw new TransactionException(
LocalizedStrings.PartitionedRegion_TX_FUNCTION_ON_MORE_THAN_ONE_NODE
@@ -179,16 +179,7 @@ public class MemberFunctionExecutor extends AbstractExecution {
}
}
}
- if (function.optimizeForWrite() && cache != null
- && cache.getInternalResourceManager().getHeapMonitor().containsHeapCriticalMembers(dest)
- && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
- Set<InternalDistributedMember> hcm = cache.getResourceAdvisor().adviseCritialMembers();
- Set<DistributedMember> sm = SetUtils.intersection(hcm, dest);
- throw new LowMemoryException(
- LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(new Object[] {function.getId(), sm}),
- sm);
- }
+ cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function, dest);
}
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
index 66ccfe8..8be821b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.internal.cache.execute;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -21,7 +22,6 @@ import java.util.Random;
import java.util.Set;
import org.apache.geode.cache.DataPolicy;
-import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.TransactionDataNotColocatedException;
import org.apache.geode.cache.TransactionException;
@@ -34,13 +34,11 @@ import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.SetUtils;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
import org.apache.geode.internal.i18n.LocalizedStrings;
public class MultiRegionFunctionExecutor extends AbstractExecution {
@@ -207,15 +205,9 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
.toLocalizedString(function.getId()));
}
final InternalCache cache = GemFireCacheImpl.getInstance();
- if (function.optimizeForWrite() && cache != null
- && cache.getInternalResourceManager().getHeapMonitor().containsHeapCriticalMembers(dest)
- && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
- Set<InternalDistributedMember> hcm = cache.getResourceAdvisor().adviseCritialMembers();
- Set<DistributedMember> sm = SetUtils.intersection(hcm, dest);
- throw new LowMemoryException(
- LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(function.getId(), sm),
- sm);
+ if (cache != null) {
+ cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function,
+ Collections.unmodifiableSet(dest));
}
setExecutionNodes(dest);
@@ -367,7 +359,10 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
cache = (InternalCache) r.getCache();
break;
}
- if (cache != null && cache.getTxManager().getTXState() != null) {
+ if (cache == null) {
+ return;
+ }
+ if (cache.getTxManager().getTXState() != null) {
if (targetMembers.size() > 1) {
throw new TransactionException(
LocalizedStrings.PartitionedRegion_TX_FUNCTION_ON_MORE_THAN_ONE_NODE
@@ -385,15 +380,6 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
}
}
}
- if (function.optimizeForWrite() && cache.getInternalResourceManager().getHeapMonitor()
- .containsHeapCriticalMembers(targetMembers)
- && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
- Set<InternalDistributedMember> hcm = cache.getResourceAdvisor().adviseCritialMembers();
- Set<DistributedMember> sm = SetUtils.intersection(hcm, targetMembers);
- throw new LowMemoryException(
- LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(function.getId(), sm),
- sm);
- }
+ cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function, targetMembers);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
index 6e13ebc..b3ea80d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
@@ -17,7 +17,6 @@ package org.apache.geode.internal.cache.execute;
import java.util.Iterator;
import java.util.Set;
-import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.TransactionDataRebalancedException;
import org.apache.geode.cache.TransactionException;
@@ -26,12 +25,9 @@ import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.SetUtils;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
import org.apache.geode.internal.i18n.LocalizedStrings;
public class PartitionedRegionFunctionExecutor extends AbstractExecution {
@@ -329,7 +325,7 @@ public class PartitionedRegionFunctionExecutor extends AbstractExecution {
@Override
public void validateExecution(Function function, Set targetMembers) {
InternalCache cache = pr.getGemFireCache();
- if (cache != null && cache.getTxManager().getTXState() != null) {
+ if (cache.getTxManager().getTXState() != null) {
if (targetMembers.size() > 1) {
throw new TransactionException(
LocalizedStrings.PartitionedRegion_TX_FUNCTION_ON_MORE_THAN_ONE_NODE
@@ -347,15 +343,6 @@ public class PartitionedRegionFunctionExecutor extends AbstractExecution {
}
}
}
- if (function.optimizeForWrite() && cache.getInternalResourceManager().getHeapMonitor()
- .containsHeapCriticalMembers(targetMembers)
- && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
- Set<InternalDistributedMember> hcm = cache.getResourceAdvisor().adviseCritialMembers();
- Set<DistributedMember> sm = SetUtils.intersection(hcm, targetMembers);
- throw new LowMemoryException(
- LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(new Object[] {function.getId(), sm}),
- sm);
- }
+ cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function, targetMembers);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
index 7135e66..85a2473 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
@@ -454,7 +454,7 @@ public class PartitionedRegionRebalanceOp {
int redundantCopies = leaderRegion.getRedundantCopies();
int totalNumberOfBuckets = leaderRegion.getTotalNumberOfBuckets();
Set<InternalDistributedMember> criticalMembers =
- resourceManager.getResourceAdvisor().adviseCritialMembers();;
+ resourceManager.getResourceAdvisor().adviseCriticalMembers();;
boolean removeOverRedundancy = true;
debug("Building Model for rebalancing " + leaderRegion + ". redundantCopies=" + redundantCopies
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
index c6a9c30..6c8aa33 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
@@ -474,7 +474,7 @@ public class RegionAdvisor extends CacheDistributionAdvisor {
} else {
ResourceAdvisor advisor = getPartitionedRegion().getCache().getResourceAdvisor();
- boolean sick = advisor.adviseCritialMembers().contains(member);
+ boolean sick = advisor.adviseCriticalMembers().contains(member);
if (logger.isDebugEnabled()) {
logger.debug("updateBucketStatus:({}):member:{}:sick:{}",
getPartitionedRegion().bucketStringForLogs(bucketId), member, sick);
@@ -1822,7 +1822,7 @@ public class RegionAdvisor extends CacheDistributionAdvisor {
logger.debug("RA: removing profile {}", profile);
}
if (getAdvisee() instanceof PartitionedRegion) {
- ((PartitionedRegion) getAdvisee()).removeMemberFromCriticalList(profile.peerMemberId);
+ ((PartitionedRegion) getAdvisee()).removeCriticalMember(profile.peerMemberId);
}
if (this.buckets != null) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java
index 1bb0367..7415d37 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java
@@ -15,22 +15,15 @@
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
-import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultSender;
import org.apache.geode.cache.operations.ExecuteFunctionOperationContext;
-import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
-import org.apache.geode.internal.cache.control.InternalResourceManager;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
import org.apache.geode.internal.cache.execute.FunctionContextImpl;
import org.apache.geode.internal.cache.execute.FunctionStats;
import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
@@ -159,16 +152,8 @@ public class ExecuteFunction extends BaseCommand {
+ "with context :" + context.toString());
}
- HeapMemoryMonitor hmm =
- ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
- if (functionObject.optimizeForWrite() && cache != null && hmm.getState().isCritical()
- && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
- Set<DistributedMember> sm = Collections.<DistributedMember>singleton(cache.getMyId());
- throw new LowMemoryException(
- LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(new Object[] {functionObject.getId(), sm}),
- sm);
- }
+ cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(functionObject,
+ cache.getMyId());
functionObject.execute(context);
stats.endFunctionExecution(startExecution, functionObject.hasResult());
} catch (FunctionException functionException) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java
index 23e2f17..2e0fa38 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java
@@ -15,22 +15,15 @@
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
-import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultSender;
import org.apache.geode.cache.operations.ExecuteFunctionOperationContext;
-import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
-import org.apache.geode.internal.cache.control.InternalResourceManager;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
import org.apache.geode.internal.cache.execute.AbstractExecution;
import org.apache.geode.internal.cache.execute.FunctionContextImpl;
import org.apache.geode.internal.cache.execute.FunctionStats;
@@ -190,16 +183,9 @@ public class ExecuteFunction65 extends BaseCommand {
context);
}
- HeapMemoryMonitor hmm =
- ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
- if (functionObject.optimizeForWrite() && cache != null && hmm.getState().isCritical()
- && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
- Set<DistributedMember> sm = Collections.singleton((DistributedMember) cache.getMyId());
- Exception e = new LowMemoryException(
- LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(new Object[] {functionObject.getId(), sm}),
- sm);
-
+ Exception e = cache.getInternalResourceManager().getHeapMonitor()
+ .createLowMemoryIfNeeded(functionObject, cache.getMyId());
+ if (e != null) {
sendException(hasResult, clientMessage, e.getMessage(), serverConnection, e);
return;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java
index be4749f..6ef3576 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java
@@ -15,22 +15,18 @@
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.geode.InternalGemFireError;
-import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.client.internal.ConnectionImpl;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.operations.ExecuteFunctionOperationContext;
-import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -39,9 +35,6 @@ import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXStateProxy;
-import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
-import org.apache.geode.internal.cache.control.InternalResourceManager;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
import org.apache.geode.internal.cache.execute.AbstractExecution;
import org.apache.geode.internal.cache.execute.FunctionContextImpl;
import org.apache.geode.internal.cache.execute.FunctionStats;
@@ -229,16 +222,9 @@ public class ExecuteFunction66 extends BaseCommand {
context);
}
- HeapMemoryMonitor hmm =
- ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
- if (functionObject.optimizeForWrite() && cache != null && hmm.getState().isCritical()
- && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
- Set<DistributedMember> sm = Collections.singleton((DistributedMember) cache.getMyId());
- Exception e = new LowMemoryException(
- LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(new Object[] {functionObject.getId(), sm}),
- sm);
-
+ Exception e = cache.getInternalResourceManager().getHeapMonitor()
+ .createLowMemoryIfNeeded(functionObject, cache.getMyId());
+ if (e != null) {
sendException(hasResult, clientMessage, e.getMessage(), serverConnection, e);
return;
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
index 12f9dfe..2f58510 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.internal.cache;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
@@ -33,6 +34,7 @@ import org.junit.Test;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.event.BulkOperationHolder;
import org.apache.geode.internal.cache.event.EventTracker;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
@@ -154,4 +156,34 @@ public class DistributedRegionJUnitTest extends AbstractDistributedRegionJUnitTe
}
}
+ @Test
+ public void testThatMemoryThresholdInfoRelectsStateOfRegion() {
+ InternalDistributedMember internalDM = mock(InternalDistributedMember.class);
+ DistributedRegion distRegion = prepare(true, false);
+ distRegion.addCriticalMember(internalDM);
+
+ MemoryThresholdInfo info = distRegion.getAtomicThresholdInfo();
+
+ assertThat(distRegion.isMemoryThresholdReached()).isTrue();
+ assertThat(distRegion.getAtomicThresholdInfo().getMembersThatReachedThreshold())
+ .containsExactly(internalDM);
+ assertThat(info.isMemoryThresholdReached()).isTrue();
+ assertThat(info.getMembersThatReachedThreshold()).containsExactly(internalDM);
+ }
+
+ @Test
+ public void testThatMemoryThresholdInfoDoesNotChangeWhenRegionChanges() {
+ InternalDistributedMember internalDM = mock(InternalDistributedMember.class);
+ DistributedRegion distRegion = prepare(true, false);
+
+ MemoryThresholdInfo info = distRegion.getAtomicThresholdInfo();
+ distRegion.addCriticalMember(internalDM);
+
+ assertThat(distRegion.isMemoryThresholdReached()).isTrue();
+ assertThat(distRegion.getAtomicThresholdInfo().getMembersThatReachedThreshold())
+ .containsExactly(internalDM);
+ assertThat(info.isMemoryThresholdReached()).isFalse();
+ assertThat(info.getMembersThatReachedThreshold()).isEmpty();
+ }
+
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/MemoryThresholdInfoTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/MemoryThresholdInfoTest.java
new file mode 100644
index 0000000..5a60710
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/MemoryThresholdInfoTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class MemoryThresholdInfoTest {
+
+ @Test
+ public void getNotReachedReturnsIdenticalResults() {
+ MemoryThresholdInfo info1 = MemoryThresholdInfo.getNotReached();
+ MemoryThresholdInfo info2 = MemoryThresholdInfo.getNotReached();
+
+ assertThat(info1).isSameAs(info2);
+ assertThat(info1.getMembersThatReachedThreshold())
+ .isSameAs(info2.getMembersThatReachedThreshold());
+ }
+
+ @Test
+ public void getNotReachedReturnsEmptyMembersReached() {
+ MemoryThresholdInfo info1 = MemoryThresholdInfo.getNotReached();
+
+ assertThat(info1.getMembersThatReachedThreshold()).isEmpty();
+ }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/control/HeapMemoryMonitorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/control/HeapMemoryMonitorTest.java
new file mode 100644
index 0000000..f5ddc52
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/control/HeapMemoryMonitorTest.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.control;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.geode.cache.LowMemoryException;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({MemoryThresholds.class})
+public class HeapMemoryMonitorTest {
+
+ private HeapMemoryMonitor heapMonitor;
+ private Function function;
+ private Set memberSet;
+ private DistributedMember member;
+ private InternalDistributedMember myself;
+ private ResourceAdvisor resourceAdvisor;
+ private static final String LOW_MEMORY_REGEX =
+ "Function: null cannot be executed because the members.*are running low on memory";
+
+ @Before
+ public void setup() {
+ InternalCache internalCache = mock(InternalCache.class);
+ DistributedSystem distributedSystem = mock(DistributedSystem.class);
+ function = mock(Function.class);
+ member = mock(InternalDistributedMember.class);
+ myself = mock(InternalDistributedMember.class);
+ resourceAdvisor = mock(ResourceAdvisor.class);
+
+ when(internalCache.getDistributedSystem()).thenReturn(distributedSystem);
+ when(internalCache.getDistributionAdvisor()).thenReturn(resourceAdvisor);
+ when(internalCache.getMyId()).thenReturn(myself);
+
+ heapMonitor = new HeapMemoryMonitor(null, internalCache, null);
+ memberSet = new HashSet<>();
+ memberSet.add(member);
+ heapMonitor.setMostRecentEvent(new MemoryEvent(InternalResourceManager.ResourceType.HEAP_MEMORY,
+ MemoryThresholds.MemoryState.DISABLED, MemoryThresholds.MemoryState.DISABLED, null, 0L,
+ true, null)); // myself is not critical
+ }
+
+ // ========== tests for getHeapCriticalMembersFrom ==========
+ @Test
+ public void getHeapCriticalMembersFrom_WithEmptyCriticalMembersReturnsEmptySet() {
+ getHeapCriticalMembersFrom_returnsEmptySet(Collections.emptySet(), memberSet);
+ }
+
+ @Test
+ public void getHeapCriticalMembersFrom_WithEmptyArgReturnsEmptySet() {
+ getHeapCriticalMembersFrom_returnsEmptySet(memberSet, Collections.emptySet());
+ }
+
+ @Test
+ public void getHeapCriticalMembersFromWithEmptySetsReturnsEmptySet() {
+ getHeapCriticalMembersFrom_returnsEmptySet(Collections.emptySet(), Collections.emptySet());
+ }
+
+ @Test
+ public void getHeapCriticalMembersFrom_WithDisjointSetsReturnsEmptySet() {
+ Set argSet = new HashSet();
+ argSet.add(mock(InternalDistributedMember.class));
+
+ getHeapCriticalMembersFrom_returnsEmptySet(memberSet, argSet);
+ }
+
+ @Test
+ public void getHeapCriticalMembersFrom_WithEqualSetsReturnsMember() {
+ getHeapCriticalMembersFrom_returnsNonEmptySet(memberSet, Collections.unmodifiableSet(memberSet),
+ new HashSet(memberSet));
+ }
+
+ @Test
+ public void getHeapCriticalMembersFrom_ReturnsMultipleMembers() {
+ DistributedMember member1 = mock(InternalDistributedMember.class);
+ DistributedMember member2 = mock(InternalDistributedMember.class);
+ DistributedMember member3 = mock(InternalDistributedMember.class);
+ DistributedMember member4 = mock(InternalDistributedMember.class);
+ Set advisorSet = new HashSet();
+ advisorSet.add(member1);
+ advisorSet.add(member2);
+ advisorSet.add(member4);
+ Set argSet = new HashSet(memberSet);
+ argSet.add(member1);
+ argSet.add(member3);
+ argSet.add(member4);
+ Set expectedResult = new HashSet();
+ expectedResult.add(member1);
+ expectedResult.add(member4);
+
+ getHeapCriticalMembersFrom_returnsNonEmptySet(advisorSet, argSet, expectedResult);
+ }
+
+ @Test
+ public void getHeapCriticalMembersFrom_DoesNotReturnMyselfWhenNotCritical() {
+ Set expectedResult = new HashSet(memberSet);
+ Set advisorSet = new HashSet(memberSet);
+ memberSet.add(myself);
+
+ getHeapCriticalMembersFrom_returnsNonEmptySet(advisorSet,
+ Collections.unmodifiableSet(memberSet),
+ expectedResult);
+ }
+
+ @Test
+ public void getHeapCriticalMembersFrom_IncludesMyselfWhenCritical() throws Exception {
+ Set advisorSet = new HashSet(memberSet);
+ heapMonitor.setMostRecentEvent(new MemoryEvent(InternalResourceManager.ResourceType.HEAP_MEMORY,
+ MemoryThresholds.MemoryState.DISABLED, MemoryThresholds.MemoryState.CRITICAL, null, 0L,
+ true, null));
+ memberSet.add(myself);
+
+ getHeapCriticalMembersFrom_returnsNonEmptySet(advisorSet,
+ Collections.unmodifiableSet(memberSet),
+ new HashSet(memberSet));
+ }
+
+ // ========== tests for createLowMemoryIfNeeded (with Set argument) ==========
+ @Test
+ public void createLowMemoryIfNeededWithSetArg_ReturnsNullWhenNotOptimizedForWrite()
+ throws Exception {
+ createLowMemoryIfNeededWithSetArg_returnsNull(false, false, memberSet);
+ }
+
+ @Test
+ public void createLowMemoryIfNeededWithSetArg_ReturnsNullWhenLowMemoryExceptionDisabled()
+ throws Exception {
+ createLowMemoryIfNeededWithSetArg_returnsNull(true, true, memberSet);
+ }
+
+ @Test
+ public void createLowMemoryIfNeededWithSetArg_ReturnsNullWhenNoCriticalMembers()
+ throws Exception {
+ createLowMemoryIfNeededWithSetArg_returnsNull(true, false, Collections.emptySet());
+ }
+
+ @Test
+ public void createLowMemoryIfNeededWithSetArg_ReturnsException() throws Exception {
+ setMocking(true, false, memberSet);
+
+ LowMemoryException exception = heapMonitor.createLowMemoryIfNeeded(function, memberSet);
+
+ assertLowMemoryException(exception);
+ }
+
+ // ========== tests for createLowMemoryIfNeeded (with DistributedMember argument) ==========
+ @Test
+ public void createLowMemoryIfNeededWithMemberArg_ReturnsNullWhenNotOptimizedForWrite()
+ throws Exception {
+ createLowMemoryIfNeededWithMemberArg_returnsNull(false, false, member);
+ }
+
+ @Test
+ public void createLowMemoryIfNeededWithMemberArg_ReturnsNullWhenLowMemoryExceptionDisabled()
+ throws Exception {
+ createLowMemoryIfNeededWithMemberArg_returnsNull(true, true, member);
+ }
+
+ @Test
+ public void createLowMemoryIfNeededWithMemberArg_ReturnsNullWhenNoCriticalMembers()
+ throws Exception {
+ createLowMemoryIfNeededWithMemberArg_returnsNull(true, false, member);
+ }
+
+ @Test
+ public void createLowMemoryIfNeededWithMemberArg_ReturnsException() throws Exception {
+ setMocking(true, false, memberSet);
+
+ LowMemoryException exception = heapMonitor.createLowMemoryIfNeeded(function, member);
+
+ assertLowMemoryException(exception);
+ }
+
+ // ========== tests for checkForLowMemory (with Set argument) ==========
+ @Test
+ public void checkForLowMemoryWithSetArg_DoesNotThrowWhenNotOptimizedForWrite() throws Exception {
+ checkForLowMemoryWithSetArg_doesNotThrow(false, false, memberSet);
+ }
+
+ @Test
+ public void checkForLowMemoryWithSetArg_DoesNotThrowWhenLowMemoryExceptionDisabled()
+ throws Exception {
+ checkForLowMemoryWithSetArg_doesNotThrow(true, true, memberSet);
+ }
+
+ @Test
+ public void checkForLowMemoryWithSetArg_DoesNotThrowWhenNoCriticalMembers() throws Exception {
+ checkForLowMemoryWithSetArg_doesNotThrow(true, false, Collections.emptySet());
+ }
+
+ @Test
+ public void checkForLowMemoryWithSetArg_ThrowsLowMemoryException() throws Exception {
+ setMocking(true, false, memberSet);
+
+ assertThatThrownBy(() -> heapMonitor.checkForLowMemory(function, memberSet))
+ .isExactlyInstanceOf(LowMemoryException.class);
+ }
+
+ // ========== tests for checkForLowMemory (with DistributedMember argument) ==========
+ @Test
+ public void checkForLowMemoryIfNeededWithMemberArg_ReturnsNullWhenNotOptimizedForWrite()
+ throws Exception {
+ checkForLowMemoryWithMemberArg_doesNotThrow(false, false, member);
+ }
+
+ @Test
+ public void checkForLowMemoryIfNeededWithMemberArg_ReturnsNullWhenLowMemoryExceptionDisabled()
+ throws Exception {
+ checkForLowMemoryWithMemberArg_doesNotThrow(true, true, member);
+ }
+
+ @Test
+ public void checkForLowMemoryIfNeededWithMemberArg_ReturnsNullWhenNoCriticalMembers()
+ throws Exception {
+ checkForLowMemoryWithMemberArg_doesNotThrow(true, false, member);
+ }
+
+ @Test
+ public void checkForLowMemoryWithMemberArg_ReturnsException() throws Exception {
+ setMocking(true, false, memberSet);
+
+ assertThatThrownBy(() -> heapMonitor.checkForLowMemory(function, member))
+ .isExactlyInstanceOf(LowMemoryException.class).hasMessageMatching(LOW_MEMORY_REGEX);
+ }
+
+ // ========== private methods ==========
+ private void getHeapCriticalMembersFrom_returnsEmptySet(Set adviseCriticalMembers, Set argSet) {
+ when(resourceAdvisor.adviseCriticalMembers()).thenReturn(adviseCriticalMembers);
+
+ Set<DistributedMember> criticalMembers = heapMonitor.getHeapCriticalMembersFrom(argSet);
+
+ assertThat(criticalMembers).isEmpty();
+ }
+
+ private void getHeapCriticalMembersFrom_returnsNonEmptySet(Set adviseCriticalMembers, Set argSet,
+ Set expectedResult) {
+ when(resourceAdvisor.adviseCriticalMembers()).thenReturn(adviseCriticalMembers);
+
+ Set<DistributedMember> criticalMembers = heapMonitor.getHeapCriticalMembersFrom(argSet);
+
+ assertThat(criticalMembers).containsAll(expectedResult);
+ }
+
+ private void createLowMemoryIfNeededWithSetArg_returnsNull(boolean optimizeForWrite,
+ boolean isLowMemoryExceptionDisabled, Set memberSetArg) throws Exception {
+ setMocking(optimizeForWrite, isLowMemoryExceptionDisabled, memberSetArg);
+
+ LowMemoryException exception = heapMonitor.createLowMemoryIfNeeded(function, memberSetArg);
+
+ assertThat(exception).isNull();
+ }
+
+ private void createLowMemoryIfNeededWithMemberArg_returnsNull(boolean optimizeForWrite,
+ boolean isLowMemoryExceptionDisabled, DistributedMember memberArg) throws Exception {
+ setMocking(optimizeForWrite, isLowMemoryExceptionDisabled, Collections.emptySet());
+
+ LowMemoryException exception = heapMonitor.createLowMemoryIfNeeded(function, memberArg);
+
+ assertThat(exception).isNull();
+ }
+
+ private void checkForLowMemoryWithSetArg_doesNotThrow(boolean optimizeForWrite,
+ boolean isLowMemoryExceptionDisabled, Set memberSetArg) throws Exception {
+ setMocking(optimizeForWrite, isLowMemoryExceptionDisabled, memberSetArg);
+
+ heapMonitor.checkForLowMemory(function, memberSetArg);
+ }
+
+ private void checkForLowMemoryWithMemberArg_doesNotThrow(boolean optimizeForWrite,
+ boolean isLowMemoryExceptionDisabled, DistributedMember memberArg) throws Exception {
+ setMocking(optimizeForWrite, isLowMemoryExceptionDisabled, Collections.emptySet());
+
+ heapMonitor.checkForLowMemory(function, memberArg);
+ }
+
+ private void setMocking(boolean optimizeForWrite, boolean isLowMemoryExceptionDisabled,
+ Set argSet) throws Exception {
+ when(function.optimizeForWrite()).thenReturn(optimizeForWrite);
+ setIsLowMemoryExceptionDisabled(isLowMemoryExceptionDisabled);
+ when(resourceAdvisor.adviseCriticalMembers()).thenReturn(argSet);
+ }
+
+ private void assertLowMemoryException(LowMemoryException exception) {
+ assertThat(exception).isExactlyInstanceOf(LowMemoryException.class);
+ assertThat(exception.getMessage()).containsPattern(LOW_MEMORY_REGEX);
+ }
+
+ private void setIsLowMemoryExceptionDisabled(boolean isLowMemoryExceptionDisabled)
+ throws Exception {
+ PowerMockito.mockStatic(MemoryThresholds.class);
+ PowerMockito.when(MemoryThresholds.class, MemoryThresholds.isLowMemoryExceptionDisabled())
+ .thenReturn(isLowMemoryExceptionDisabled);
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66Test.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66Test.java
index a61e49f..fc234c2 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66Test.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66Test.java
@@ -45,6 +45,7 @@ import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.ServerSideHandshake;
@@ -124,6 +125,7 @@ public class ExecuteFunction66Test {
when(this.cache.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
when(this.cache.getDistributedSystem()).thenReturn(mock(InternalDistributedSystem.class));
when(this.cache.getResourceManager()).thenReturn(this.internalResourceManager);
+ when(this.cache.getInternalResourceManager()).thenReturn(this.internalResourceManager);
when(this.callbackArgPart.getObject()).thenReturn(CALLBACK_ARG);
@@ -149,6 +151,8 @@ public class ExecuteFunction66Test {
when(this.serverConnection.getAcceptor()).thenReturn(this.acceptor);
when(this.serverConnection.getHandshake()).thenReturn(mock(ServerSideHandshake.class));
+ when(this.internalResourceManager.getHeapMonitor()).thenReturn(mock(HeapMemoryMonitor.class));
+
PowerMockito.mockStatic(FunctionService.class);
PowerMockito.when(FunctionService.getFunction(eq(FUNCTION))).thenReturn(this.functionObject);
}