You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2015/11/19 03:05:03 UTC
incubator-geode git commit: Listener thread will now wakeup
frequently and check if an event is needed
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-438 04afd82bc -> 1d0ea2458
Listener thread will now wakeup frequently and check if an event is needed
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/1d0ea245
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/1d0ea245
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/1d0ea245
Branch: refs/heads/feature/GEODE-438
Commit: 1d0ea245867fac19c0cbd58423d0fe19d4798945
Parents: 04afd82
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Wed Nov 18 18:03:49 2015 -0800
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed Nov 18 18:03:49 2015 -0800
----------------------------------------------------------------------
.../cache/control/InternalResourceManager.java | 3 +-
.../cache/control/OffHeapMemoryMonitor.java | 140 +++++++------------
.../gemfire/internal/i18n/LocalizedStrings.java | 2 -
3 files changed, 52 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1d0ea245/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
index 81d46ff..e7b514c 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
@@ -54,6 +54,7 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.offheap.MemoryAllocator;
/**
* Implementation of ResourceManager with additional internal-only methods.
@@ -165,7 +166,7 @@ public class InternalResourceManager implements ResourceManager {
// Create the monitors
Map<ResourceType, ResourceMonitor> tempMonitors = new HashMap<ResourceType, ResourceMonitor>();
tempMonitors.put(ResourceType.HEAP_MEMORY, new HeapMemoryMonitor(this, cache, this.stats));
- tempMonitors.put(ResourceType.OFFHEAP_MEMORY, new OffHeapMemoryMonitor(this, cache, this.stats));
+ tempMonitors.put(ResourceType.OFFHEAP_MEMORY, new OffHeapMemoryMonitor(this, cache, cache.getOffHeapStore(), this.stats));
this.resourceMonitors = Collections.unmodifiableMap(tempMonitors);
// Initialize the listener sets so that it only needs to be done once
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1d0ea245/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
index b2b56dc..4b28ddd 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
@@ -48,9 +48,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
MemoryState.DISABLED, null, 0L, true, this.thresholds);
private volatile MemoryState currentState = MemoryState.DISABLED;
- // This holds a new event as it transitions from updateStateAndSendEvent(...) to fillInProfile()
- private ThreadLocal<MemoryEvent> upcomingEvent = new ThreadLocal<MemoryEvent>();
-
// Set when startMonitoring() and stopMonitoring() are called
// Package private for testing
Boolean started = false;
@@ -65,22 +62,26 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
private final ResourceAdvisor resourceAdvisor;
private final GemFireCacheImpl cache;
private final ResourceManagerStats stats;
+ /**
+ * InternalResoruceManager insists on creating a OffHeapMemoryMonitor even when it
+ * does not have off-heap memory. So we need to handle memoryAllocator being null.
+ */
private final MemoryAllocator memoryAllocator;
private final LogWriterI18n log;
- OffHeapMemoryMonitor(final InternalResourceManager resourceManager, final GemFireCacheImpl cache, final ResourceManagerStats stats) {
+ OffHeapMemoryMonitor(final InternalResourceManager resourceManager, final GemFireCacheImpl cache, final MemoryAllocator memoryAllocator, final ResourceManagerStats stats) {
this.resourceManager = resourceManager;
this.resourceAdvisor = (ResourceAdvisor) cache.getDistributionAdvisor();
this.cache = cache;
this.stats = stats;
- this.memoryAllocator = cache.getOffHeapStore();
- if (this.memoryAllocator != null) {
+ this.memoryAllocator = memoryAllocator;
+ if (memoryAllocator != null) {
this.thresholds = new MemoryThresholds(this.memoryAllocator.getTotalMemory());
}
this.log = cache.getLoggerI18n();
- this.offHeapMemoryUsageListener = new OffHeapMemoryUsageListener(0L);
+ this.offHeapMemoryUsageListener = new OffHeapMemoryUsageListener();
}
/**
@@ -93,7 +94,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
return;
}
- this.offHeapMemoryUsageListener.offHeapMemoryUsed = getBytesUsed();
ThreadGroup group = LoggingThreadGroup.createThreadGroup("OffHeapMemoryMonitor Threads", logger);
Thread t = new Thread(group, this.offHeapMemoryUsageListener);
t.setName(t.getName() + " OffHeapMemoryListener");
@@ -124,10 +124,7 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
this.memoryAllocator.removeMemoryUsageListener(this);
- this.offHeapMemoryUsageListener.stopRequested = true;
- synchronized (this.offHeapMemoryUsageListener) {
- this.offHeapMemoryUsageListener.notifyAll();
- }
+ this.offHeapMemoryUsageListener.stop();
if (waitForThread) {
threadToWaitFor = this.memoryListenerThread;
}
@@ -179,7 +176,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
return;
}
synchronized (this.offHeapMemoryUsageListener) {
- this.offHeapMemoryUsageListener.offHeapMemoryUsed = bytesUsed;
this.offHeapMemoryUsageListener.notifyAll();
}
if (_testHook != null) {
@@ -199,10 +195,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
throw new IllegalArgumentException(LocalizedStrings.MemoryThresholds_CRITICAL_PERCENTAGE_GT_ZERO_AND_LTE_100
.toLocalizedString());
}
- if (this.memoryAllocator == null) {
- throw new IllegalStateException(LocalizedStrings.OffHeapMemoryMonitor_NO_OFF_HEAP_MEMORY_HAS_BEEN_CONFIGURED
- .toLocalizedString());
- }
if (criticalThreshold != 0 && this.thresholds.isEvictionThresholdEnabled()
&& criticalThreshold <= this.thresholds.getEvictionThreshold()) {
throw new IllegalArgumentException(LocalizedStrings.MemoryThresholds_CRITICAL_PERCENTAGE_GTE_EVICTION_PERCENTAGE
@@ -249,10 +241,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
throw new IllegalArgumentException(LocalizedStrings.MemoryThresholds_EVICTION_PERCENTAGE_GT_ZERO_AND_LTE_100
.toLocalizedString());
}
- if (this.memoryAllocator == null) {
- throw new IllegalStateException(LocalizedStrings.OffHeapMemoryMonitor_NO_OFF_HEAP_MEMORY_HAS_BEEN_CONFIGURED
- .toLocalizedString());
- }
if (evictionThreshold != 0 && this.thresholds.isCriticalThresholdEnabled()
&& evictionThreshold >= this.thresholds.getCriticalThreshold()) {
throw new IllegalArgumentException(LocalizedStrings.MemoryMonitor_EVICTION_PERCENTAGE_LTE_CRITICAL_PERCENTAGE
@@ -282,9 +270,10 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
/**
* Compare the number of bytes used (fetched from the JVM) to the thresholds.
* If necessary, change the state and send an event for the state change.
+ * @return true if an event was sent
*/
- public void updateStateAndSendEvent() {
- updateStateAndSendEvent(getBytesUsed());
+ public boolean updateStateAndSendEvent() {
+ return updateStateAndSendEvent(getBytesUsed());
}
/**
@@ -295,8 +284,10 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
*
* @param bytesUsed
* Number of bytes of off-heap memory currently used.
+ * @return true if an event was sent
*/
- public void updateStateAndSendEvent(long bytesUsed) {
+ public boolean updateStateAndSendEvent(long bytesUsed) {
+ boolean result = false;
synchronized (this) {
final MemoryEvent mre = this.mostRecentEvent;
final MemoryState oldState = mre.getState();
@@ -310,10 +301,10 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
if (_testHook != null) {
_testHook.updateStateAndSendEventBeforeProcess(bytesUsed, event);
}
- this.upcomingEvent.set(event);
-
+ this.mostRecentEvent = event;
processLocalEvent(event);
updateStatsFromEvent(event);
+ result = true;
} else if (!oldState.isNormal()
&& bytesUsed != mre.getBytesUsed()
@@ -323,8 +314,9 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
if (_testHook != null) {
_testHook.updateStateAndSendEventBeforeAbnormalProcess(bytesUsed, event);
}
- this.upcomingEvent.set(event);
+ this.mostRecentEvent = event;
processLocalEvent(event);
+ result = true;
} else {
if (_testHook != null) {
_testHook.updateStateAndSendEventIgnore(bytesUsed, oldState, newState, mre.getBytesUsed(), this.deliverNextAbnormalEvent);
@@ -332,6 +324,7 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
}
}
+ return result;
}
/**
@@ -343,17 +336,9 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
* @return true if a new event might need to be sent
*/
private boolean mightSendEvent(long bytesUsed) {
- if (this.offHeapMemoryUsageListener.offHeapMemoryUsed != bytesUsed) {
- // fix for GEODE-438
- return true;
- }
- // We do the following if the memory is the same just in case
- // corner cases exist in which even though the amount of memory
- // has not changed the we might have a different state.
- // It is possible that the following code is not needed.
final MemoryEvent mre = this.mostRecentEvent;
final MemoryState oldState = mre.getState();
- final MemoryThresholds thresholds = this.thresholds;
+ final MemoryThresholds thresholds = mre.getThresholds();
MemoryState newState = thresholds.computeNextState(oldState, bytesUsed);
if (oldState != newState) {
return true;
@@ -409,11 +394,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
*/
@Override
public void fillInProfile(final ResourceManagerProfile profile) {
- final MemoryEvent tempEvent = this.upcomingEvent.get();
- if (tempEvent != null) {
- this.mostRecentEvent = tempEvent;
- this.upcomingEvent.set(null);
- }
final MemoryEvent eventToPopulate = this.mostRecentEvent;
profile.setOffHeapData(eventToPopulate.getBytesUsed(), eventToPopulate.getState(), eventToPopulate.getThresholds());
}
@@ -437,7 +417,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
if (this.memoryAllocator == null) {
return 0;
}
-
return this.memoryAllocator.getUsedMemory();
}
@@ -505,68 +484,49 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
}
class OffHeapMemoryUsageListener implements Runnable {
- volatile boolean stopRequested = false;
- /**
- * volatile so that mightSendEvent can check it w/o syncing
- */
- volatile long offHeapMemoryUsed; // In bytes
+ private boolean stopRequested = false;
- OffHeapMemoryUsageListener(final long offHeapMemoryUsed) {
- this.offHeapMemoryUsed = offHeapMemoryUsed;
+ OffHeapMemoryUsageListener() {
+ }
+
+ public synchronized void stop() {
+ this.stopRequested = true;
+ this.notifyAll();
}
@Override
public void run() {
getLogWriter().fine("OffHeapMemoryUsageListener is starting " + this);
- long lastOffHeapMemoryUsed;
- synchronized (this) {
- lastOffHeapMemoryUsed = this.offHeapMemoryUsed;
- }
- while (!this.stopRequested) {
- updateStateAndSendEvent(lastOffHeapMemoryUsed);
+ int callsWithNoEvent = 0;
+ final int MS_TIMEOUT = 10;
+ final int MAX_CALLS_WITH_NO_EVENT = 1000/MS_TIMEOUT;
+ boolean exitRunLoop = false;
+ while (!exitRunLoop) {
+ if (!updateStateAndSendEvent()) {
+ callsWithNoEvent++;
+ if (callsWithNoEvent > MAX_CALLS_WITH_NO_EVENT) {
+ deliverNextAbnormalEvent();
+ callsWithNoEvent = 0;
+ }
+ } else {
+ callsWithNoEvent = 0;
+ }
synchronized (this) {
- long newOffHeapMemoryUsed = this.offHeapMemoryUsed;
if (this.stopRequested) {
- // no need to wait since we are stopping
- } else if (lastOffHeapMemoryUsed != newOffHeapMemoryUsed) {
- // no need to wait since memory used has changed
- // This fixes a race like bug GEODE-500
- lastOffHeapMemoryUsed = newOffHeapMemoryUsed;
+ exitRunLoop = true;
} else {
- // wait for memory used to change
+ // Wait to be notified that off-heap memory changed
+ // or for the wait to timeout.
+ // In some cases we need to generate an event even
+ // when we have not been notified (see GEODE-438).
+ // So we don't want this wait to be for very long.
try {
- do {
- this.wait(1000);
- newOffHeapMemoryUsed = this.offHeapMemoryUsed;
- if (newOffHeapMemoryUsed == lastOffHeapMemoryUsed) {
- // The wait timed out. So tell the OffHeapMemoryMonitor
- // that we need an event if the state is not normal.
- deliverNextAbnormalEvent();
- // TODO: don't we need a "break" here?
- // As it is we set deliverNextAbnormalEvent
- // but then go back to sleep in wait.
- // We need to call updateStateAndSendEvent
- // which tests deliverNextAbnormalEvent.
- // But just adding a break is probably not enough.
- // We only set deliverNextAbnormalEvent if the wait
- // timed out which means that the amount of offHeapMemoryUsed
- // did not change.
- // But in updateStateAndSendEvent we only deliver an
- // abnormal event if the amount of memory changed.
- // This code needs to be reviewed with Swapnil but
- // it looks to Darrel like deliverNextAbnormalEvent
- // can be removed.
- } else {
- // we have been notified so exit the inner while loop
- // and call updateStateAndSendEvent.
- lastOffHeapMemoryUsed = newOffHeapMemoryUsed;
- break;
- }
- } while (!this.stopRequested);
+ this.wait(MS_TIMEOUT);
} catch (InterruptedException iex) {
getLogWriter().warning("OffHeapMemoryUsageListener was interrupted " + this);
this.stopRequested = true;
+ exitRunLoop = true;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1d0ea245/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
index ca44d93..f87ac55 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
@@ -2057,8 +2057,6 @@ public class LocalizedStrings extends ParentLocalizedStrings {
public static final StringId GatewayImpl_GATEWAY_0_HAS_BEEN_REBALANCED = new StringIdImpl(5621, "GatewaySender {0} has been rebalanced");
- public static final StringId OffHeapMemoryMonitor_NO_OFF_HEAP_MEMORY_HAS_BEEN_CONFIGURED = new StringIdImpl(5622, "No off-heap memory has been configured.");
-
public static final StringId Oplog_Close_Failed = new StringIdImpl(5640, "Failed to close file {0}");
public static final StringId Oplog_PreAllocate_Failure = new StringIdImpl(5641, "Could not pre-allocate file {0} with size={1}");
public static final StringId Oplog_PreAllocate_Failure_Init = new StringIdImpl(5642, "Could not create and pre grow file in dir {0} with size={1}");