You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ma...@apache.org on 2015/11/23 21:49:10 UTC
[40/50] [abbrv] incubator-geode git commit: GEODE-438: fix race in
OffHeapMemoryMonitor This fix also fixes GEODE-488 and GEODE-551. Also fixed
test to use standard locators for connectivity instead of an mcast-port.
GEODE-438: fix race in OffHeapMemoryMonitor
This fix also fixes GEODE-488 and GEODE-551.
Also fixed test to use standard locators for connectivity
instead of an mcast-port.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/59ab8cf5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/59ab8cf5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/59ab8cf5
Branch: refs/heads/feature/GEODE-53
Commit: 59ab8cf5aef00dddb9587ae9b7964de277630d35
Parents: 4b904ce
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Nov 17 17:20:59 2015 -0800
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Thu Nov 19 14:35:12 2015 -0800
----------------------------------------------------------------------
.../cache/control/InternalResourceManager.java | 2 +-
.../cache/control/OffHeapMemoryMonitor.java | 137 +++++++++----------
.../MemoryThresholdsOffHeapDUnitTest.java | 17 ++-
3 files changed, 74 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/59ab8cf5/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
index 81d46ff..98e1f25 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
@@ -165,7 +165,7 @@ public class InternalResourceManager implements ResourceManager {
// Create the monitors
Map<ResourceType, ResourceMonitor> tempMonitors = new HashMap<ResourceType, ResourceMonitor>();
tempMonitors.put(ResourceType.HEAP_MEMORY, new HeapMemoryMonitor(this, cache, this.stats));
- tempMonitors.put(ResourceType.OFFHEAP_MEMORY, new OffHeapMemoryMonitor(this, cache, this.stats));
+ tempMonitors.put(ResourceType.OFFHEAP_MEMORY, new OffHeapMemoryMonitor(this, cache, cache.getOffHeapStore(), this.stats));
this.resourceMonitors = Collections.unmodifiableMap(tempMonitors);
// Initialize the listener sets so that it only needs to be done once
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/59ab8cf5/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
index 721e9a6..3ab39ea 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
@@ -48,9 +48,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
MemoryState.DISABLED, null, 0L, true, this.thresholds);
private volatile MemoryState currentState = MemoryState.DISABLED;
- // This holds a new event as it transitions from updateStateAndSendEvent(...) to fillInProfile()
- private ThreadLocal<MemoryEvent> upcomingEvent = new ThreadLocal<MemoryEvent>();
-
// Set when startMonitoring() and stopMonitoring() are called
// Package private for testing
Boolean started = false;
@@ -58,28 +55,33 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
// Set to true when setEvictionThreshold(...) is called.
private boolean hasEvictionThreshold = false;
- private OffHeapMemoryUsageListener offHeapMemoryUsageListener;
private Thread memoryListenerThread;
+ private final OffHeapMemoryUsageListener offHeapMemoryUsageListener;
private final InternalResourceManager resourceManager;
private final ResourceAdvisor resourceAdvisor;
private final GemFireCacheImpl cache;
private final ResourceManagerStats stats;
+ /**
+ * InternalResoruceManager insists on creating a OffHeapMemoryMonitor even when it
+ * does not have off-heap memory. So we need to handle memoryAllocator being null.
+ */
private final MemoryAllocator memoryAllocator;
private final LogWriterI18n log;
- OffHeapMemoryMonitor(final InternalResourceManager resourceManager, final GemFireCacheImpl cache, final ResourceManagerStats stats) {
+ OffHeapMemoryMonitor(final InternalResourceManager resourceManager, final GemFireCacheImpl cache, final MemoryAllocator memoryAllocator, final ResourceManagerStats stats) {
this.resourceManager = resourceManager;
this.resourceAdvisor = (ResourceAdvisor) cache.getDistributionAdvisor();
this.cache = cache;
this.stats = stats;
- this.memoryAllocator = cache.getOffHeapStore();
- if (this.memoryAllocator != null) {
+ this.memoryAllocator = memoryAllocator;
+ if (memoryAllocator != null) {
this.thresholds = new MemoryThresholds(this.memoryAllocator.getTotalMemory());
}
this.log = cache.getLoggerI18n();
+ this.offHeapMemoryUsageListener = new OffHeapMemoryUsageListener();
}
/**
@@ -92,7 +94,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
return;
}
- this.offHeapMemoryUsageListener = new OffHeapMemoryUsageListener(getBytesUsed());
ThreadGroup group = LoggingThreadGroup.createThreadGroup("OffHeapMemoryMonitor Threads", logger);
Thread t = new Thread(group, this.offHeapMemoryUsageListener);
t.setName(t.getName() + " OffHeapMemoryListener");
@@ -123,10 +124,7 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
this.memoryAllocator.removeMemoryUsageListener(this);
- this.offHeapMemoryUsageListener.stopRequested = true;
- synchronized (this.offHeapMemoryUsageListener) {
- this.offHeapMemoryUsageListener.notifyAll();
- }
+ this.offHeapMemoryUsageListener.stop();
if (waitForThread) {
threadToWaitFor = this.memoryListenerThread;
}
@@ -177,10 +175,7 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
if (!willSendEvent) {
return;
}
- synchronized (this.offHeapMemoryUsageListener) {
- this.offHeapMemoryUsageListener.offHeapMemoryUsed = bytesUsed;
- this.offHeapMemoryUsageListener.notifyAll();
- }
+ this.offHeapMemoryUsageListener.deliverEvent();
if (_testHook != null) {
_testHook.afterNotifyUpdateMemoryUsed(bytesUsed);
}
@@ -281,9 +276,10 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
/**
* Compare the number of bytes used (fetched from the JVM) to the thresholds.
* If necessary, change the state and send an event for the state change.
+ * @return true if an event was sent
*/
- public void updateStateAndSendEvent() {
- updateStateAndSendEvent(getBytesUsed());
+ public boolean updateStateAndSendEvent() {
+ return updateStateAndSendEvent(getBytesUsed());
}
/**
@@ -294,8 +290,10 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
*
* @param bytesUsed
* Number of bytes of off-heap memory currently used.
+ * @return true if an event was sent
*/
- public void updateStateAndSendEvent(long bytesUsed) {
+ public boolean updateStateAndSendEvent(long bytesUsed) {
+ boolean result = false;
synchronized (this) {
final MemoryEvent mre = this.mostRecentEvent;
final MemoryState oldState = mre.getState();
@@ -309,10 +307,10 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
if (_testHook != null) {
_testHook.updateStateAndSendEventBeforeProcess(bytesUsed, event);
}
- this.upcomingEvent.set(event);
-
+ this.mostRecentEvent = event;
processLocalEvent(event);
updateStatsFromEvent(event);
+ result = true;
} else if (!oldState.isNormal()
&& bytesUsed != mre.getBytesUsed()
@@ -322,8 +320,9 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
if (_testHook != null) {
_testHook.updateStateAndSendEventBeforeAbnormalProcess(bytesUsed, event);
}
- this.upcomingEvent.set(event);
+ this.mostRecentEvent = event;
processLocalEvent(event);
+ result = true;
} else {
if (_testHook != null) {
_testHook.updateStateAndSendEventIgnore(bytesUsed, oldState, newState, mre.getBytesUsed(), this.deliverNextAbnormalEvent);
@@ -331,6 +330,7 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
}
}
+ return result;
}
/**
@@ -344,7 +344,7 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
private boolean mightSendEvent(long bytesUsed) {
final MemoryEvent mre = this.mostRecentEvent;
final MemoryState oldState = mre.getState();
- final MemoryThresholds thresholds = this.thresholds;
+ final MemoryThresholds thresholds = mre.getThresholds();
MemoryState newState = thresholds.computeNextState(oldState, bytesUsed);
if (oldState != newState) {
return true;
@@ -400,11 +400,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
*/
@Override
public void fillInProfile(final ResourceManagerProfile profile) {
- final MemoryEvent tempEvent = this.upcomingEvent.get();
- if (tempEvent != null) {
- this.mostRecentEvent = tempEvent;
- this.upcomingEvent.set(null);
- }
final MemoryEvent eventToPopulate = this.mostRecentEvent;
profile.setOffHeapData(eventToPopulate.getBytesUsed(), eventToPopulate.getState(), eventToPopulate.getThresholds());
}
@@ -428,7 +423,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
if (this.memoryAllocator == null) {
return 0;
}
-
return this.memoryAllocator.getUsedMemory();
}
@@ -496,65 +490,60 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
}
class OffHeapMemoryUsageListener implements Runnable {
- volatile boolean stopRequested = false;
- long offHeapMemoryUsed; // In bytes
+ private boolean deliverEvent = false;
+ private boolean stopRequested = false;
- OffHeapMemoryUsageListener(final long offHeapMemoryUsed) {
- this.offHeapMemoryUsed = offHeapMemoryUsed;
+ OffHeapMemoryUsageListener() {
+ }
+
+ public synchronized void deliverEvent() {
+ this.deliverEvent = true;
+ this.notifyAll();
+ }
+
+ public synchronized void stop() {
+ this.stopRequested = true;
+ this.notifyAll();
}
@Override
public void run() {
getLogWriter().fine("OffHeapMemoryUsageListener is starting " + this);
- long lastOffHeapMemoryUsed;
- synchronized (this) {
- lastOffHeapMemoryUsed = this.offHeapMemoryUsed;
- }
- while (!this.stopRequested) {
- updateStateAndSendEvent(lastOffHeapMemoryUsed);
+ int callsWithNoEvent = 0;
+ final int MS_TIMEOUT = 10;
+ final int MAX_CALLS_WITH_NO_EVENT = 1000/MS_TIMEOUT;
+ boolean exitRunLoop = false;
+ while (!exitRunLoop) {
+ if (!updateStateAndSendEvent()) {
+ callsWithNoEvent++;
+ if (callsWithNoEvent > MAX_CALLS_WITH_NO_EVENT) {
+ deliverNextAbnormalEvent();
+ callsWithNoEvent = 0;
+ }
+ } else {
+ callsWithNoEvent = 0;
+ }
synchronized (this) {
- long newOffHeapMemoryUsed = this.offHeapMemoryUsed;
if (this.stopRequested) {
- // no need to wait since we are stopping
- } else if (lastOffHeapMemoryUsed != newOffHeapMemoryUsed) {
- // no need to wait since memory used has changed
- // This fixes a race like bug GEODE-500
- lastOffHeapMemoryUsed = newOffHeapMemoryUsed;
+ exitRunLoop = true;
+ } else if (this.deliverEvent) {
+ // No need to wait.
+ // Loop around and call updateStateAndSendEvent.
+ this.deliverEvent = false;
} else {
- // wait for memory used to change
+ // Wait to be notified that off-heap memory changed
+ // or for the wait to timeout.
+ // In some cases we need to generate an event even
+ // when we have not been notified (see GEODE-438).
+ // So we don't want this wait to be for very long.
try {
- do {
- this.wait(1000);
- newOffHeapMemoryUsed = this.offHeapMemoryUsed;
- if (newOffHeapMemoryUsed == lastOffHeapMemoryUsed) {
- // The wait timed out. So tell the OffHeapMemoryMonitor
- // that we need an event if the state is not normal.
- deliverNextAbnormalEvent();
- // TODO: don't we need a "break" here?
- // As it is we set deliverNextAbnormalEvent
- // but then go back to sleep in wait.
- // We need to call updateStateAndSendEvent
- // which tests deliverNextAbnormalEvent.
- // But just adding a break is probably not enough.
- // We only set deliverNextAbnormalEvent if the wait
- // timed out which means that the amount of offHeapMemoryUsed
- // did not change.
- // But in updateStateAndSendEvent we only deliver an
- // abnormal event if the amount of memory changed.
- // This code needs to be reviewed with Swapnil but
- // it looks to Darrel like deliverNextAbnormalEvent
- // can be removed.
- } else {
- // we have been notified so exit the inner while loop
- // and call updateStateAndSendEvent.
- lastOffHeapMemoryUsed = newOffHeapMemoryUsed;
- break;
- }
- } while (!this.stopRequested);
+ this.wait(MS_TIMEOUT);
+ this.deliverEvent = false;
} catch (InterruptedException iex) {
getLogWriter().warning("OffHeapMemoryUsageListener was interrupted " + this);
this.stopRequested = true;
+ exitRunLoop = true;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/59ab8cf5/gemfire-core/src/test/java/com/gemstone/gemfire/cache/management/MemoryThresholdsOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/management/MemoryThresholdsOffHeapDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/management/MemoryThresholdsOffHeapDUnitTest.java
index a6f24d0..5399d28 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/management/MemoryThresholdsOffHeapDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/management/MemoryThresholdsOffHeapDUnitTest.java
@@ -524,7 +524,6 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
final VM replicate1 = host.getVM(1);
final VM replicate2 = host.getVM(2);
final String rName = getUniqueName();
- final int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort();
// Make sure the desired VMs will have a fresh DS.
AsyncInvocation d1 = replicate1.invokeAsync(DistributedTestCase.class, "disconnectFromDS");
@@ -537,7 +536,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
@SuppressWarnings("synthetic-access")
@Override
public void run2() throws CacheException {
- getSystem(getServerProperties(mcastPort));
+ getSystem(getOffHeapProperties());
}
};
replicate1.invoke(establishConnectivity);
@@ -547,7 +546,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
@Override
public void run2() throws CacheException {
// Assert some level of connectivity
- InternalDistributedSystem ds = getSystem(getServerProperties(mcastPort));
+ InternalDistributedSystem ds = getSystem(getOffHeapProperties());
assertTrue(ds.getDistributionManager().getNormalDistributionManagerIds().size() >= 1);
InternalResourceManager irm = (InternalResourceManager)getCache().getResourceManager();
@@ -1156,14 +1155,13 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
final Host host = Host.getHost(0);
final VM vm = host.getVM(2);
final String rName = getUniqueName();
- final int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort();
vm.invoke(DistributedTestCase.class, "disconnectFromDS");
vm.invoke(new CacheSerializableRunnable("test LocalRegion load passthrough when critical") {
@Override
public void run2() throws CacheException {
- getSystem(getServerProperties(mcastPort));
+ getSystem(getOffHeapProperties());
InternalResourceManager irm = (InternalResourceManager)getCache().getResourceManager();
final OffHeapMemoryMonitor ohmm = irm.getOffHeapMonitor();
irm.setCriticalOffHeapPercentage(90f);
@@ -1821,12 +1819,17 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
});
}
- private Properties getServerProperties(int mcastPort) {
+ private Properties getOffHeapProperties() {
Properties p = new Properties();
+ p.setProperty(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME, "1m");
+ return p;
+ }
+
+ private Properties getServerProperties(int mcastPort) {
+ Properties p = getOffHeapProperties();
p.setProperty(DistributionConfig.MCAST_PORT_NAME, mcastPort + "");
p.setProperty(DistributionConfig.MCAST_TTL_NAME, "0");
p.setProperty(DistributionConfig.LOCATORS_NAME, "");
- p.setProperty(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME, "1m");
return p;
}