You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2015/11/19 03:05:03 UTC

incubator-geode git commit: Listener thread will now wakeup frequently and check if an event is needed

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-438 04afd82bc -> 1d0ea2458


Listener thread will now wakeup frequently and check if an event is needed


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/1d0ea245
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/1d0ea245
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/1d0ea245

Branch: refs/heads/feature/GEODE-438
Commit: 1d0ea245867fac19c0cbd58423d0fe19d4798945
Parents: 04afd82
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Wed Nov 18 18:03:49 2015 -0800
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed Nov 18 18:03:49 2015 -0800

----------------------------------------------------------------------
 .../cache/control/InternalResourceManager.java  |   3 +-
 .../cache/control/OffHeapMemoryMonitor.java     | 140 +++++++------------
 .../gemfire/internal/i18n/LocalizedStrings.java |   2 -
 3 files changed, 52 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1d0ea245/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
index 81d46ff..e7b514c 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
@@ -54,6 +54,7 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.offheap.MemoryAllocator;
 
 /**
  * Implementation of ResourceManager with additional internal-only methods.
@@ -165,7 +166,7 @@ public class InternalResourceManager implements ResourceManager {
     // Create the monitors
     Map<ResourceType, ResourceMonitor> tempMonitors = new HashMap<ResourceType, ResourceMonitor>();
     tempMonitors.put(ResourceType.HEAP_MEMORY, new HeapMemoryMonitor(this, cache, this.stats));
-    tempMonitors.put(ResourceType.OFFHEAP_MEMORY, new OffHeapMemoryMonitor(this, cache, this.stats));
+    tempMonitors.put(ResourceType.OFFHEAP_MEMORY, new OffHeapMemoryMonitor(this, cache, cache.getOffHeapStore(), this.stats));
     this.resourceMonitors = Collections.unmodifiableMap(tempMonitors);
     
     // Initialize the listener sets so that it only needs to be done once

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1d0ea245/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
index b2b56dc..4b28ddd 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
@@ -48,9 +48,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
       MemoryState.DISABLED, null, 0L, true, this.thresholds);
   private volatile MemoryState currentState = MemoryState.DISABLED;
 
-  // This holds a new event as it transitions from updateStateAndSendEvent(...) to fillInProfile()
-  private ThreadLocal<MemoryEvent> upcomingEvent = new ThreadLocal<MemoryEvent>();
-  
   // Set when startMonitoring() and stopMonitoring() are called
   // Package private for testing
   Boolean started = false;
@@ -65,22 +62,26 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
   private final ResourceAdvisor resourceAdvisor;
   private final GemFireCacheImpl cache;
   private final ResourceManagerStats stats;
+  /**
+   * InternalResoruceManager insists on creating a OffHeapMemoryMonitor even when it
+   * does not have off-heap memory. So we need to handle memoryAllocator being null.
+   */
   private final MemoryAllocator memoryAllocator;
   private final LogWriterI18n log;
 
-  OffHeapMemoryMonitor(final InternalResourceManager resourceManager, final GemFireCacheImpl cache, final ResourceManagerStats stats) {
+  OffHeapMemoryMonitor(final InternalResourceManager resourceManager, final GemFireCacheImpl cache, final MemoryAllocator memoryAllocator, final ResourceManagerStats stats) {
     this.resourceManager = resourceManager;
     this.resourceAdvisor = (ResourceAdvisor) cache.getDistributionAdvisor();
     this.cache = cache;
     this.stats = stats;
     
-    this.memoryAllocator = cache.getOffHeapStore();
-    if (this.memoryAllocator != null) {
+    this.memoryAllocator = memoryAllocator;
+    if (memoryAllocator != null) {
       this.thresholds = new MemoryThresholds(this.memoryAllocator.getTotalMemory());
     }
     
     this.log = cache.getLoggerI18n();
-    this.offHeapMemoryUsageListener = new OffHeapMemoryUsageListener(0L);
+    this.offHeapMemoryUsageListener = new OffHeapMemoryUsageListener();
   }
 
   /**
@@ -93,7 +94,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
         return;
       }
 
-      this.offHeapMemoryUsageListener.offHeapMemoryUsed = getBytesUsed();
       ThreadGroup group = LoggingThreadGroup.createThreadGroup("OffHeapMemoryMonitor Threads", logger);
       Thread t = new Thread(group, this.offHeapMemoryUsageListener);
       t.setName(t.getName() + " OffHeapMemoryListener");
@@ -124,10 +124,7 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
       
       this.memoryAllocator.removeMemoryUsageListener(this);
       
-      this.offHeapMemoryUsageListener.stopRequested = true;
-      synchronized (this.offHeapMemoryUsageListener) {
-        this.offHeapMemoryUsageListener.notifyAll();
-      }
+      this.offHeapMemoryUsageListener.stop();
       if (waitForThread) {
         threadToWaitFor = this.memoryListenerThread;
       }
@@ -179,7 +176,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
       return;
     }
     synchronized (this.offHeapMemoryUsageListener) {
-      this.offHeapMemoryUsageListener.offHeapMemoryUsed = bytesUsed;
       this.offHeapMemoryUsageListener.notifyAll();
     }
     if (_testHook != null) {
@@ -199,10 +195,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
         throw new IllegalArgumentException(LocalizedStrings.MemoryThresholds_CRITICAL_PERCENTAGE_GT_ZERO_AND_LTE_100
             .toLocalizedString());
       }
-      if (this.memoryAllocator == null) {
-        throw new IllegalStateException(LocalizedStrings.OffHeapMemoryMonitor_NO_OFF_HEAP_MEMORY_HAS_BEEN_CONFIGURED
-            .toLocalizedString());
-      }
       if (criticalThreshold != 0 && this.thresholds.isEvictionThresholdEnabled()
           && criticalThreshold <= this.thresholds.getEvictionThreshold()) {
         throw new IllegalArgumentException(LocalizedStrings.MemoryThresholds_CRITICAL_PERCENTAGE_GTE_EVICTION_PERCENTAGE
@@ -249,10 +241,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
         throw new IllegalArgumentException(LocalizedStrings.MemoryThresholds_EVICTION_PERCENTAGE_GT_ZERO_AND_LTE_100
             .toLocalizedString());
       }
-      if (this.memoryAllocator == null) {
-        throw new IllegalStateException(LocalizedStrings.OffHeapMemoryMonitor_NO_OFF_HEAP_MEMORY_HAS_BEEN_CONFIGURED
-            .toLocalizedString());
-      }
       if (evictionThreshold != 0 && this.thresholds.isCriticalThresholdEnabled()
           && evictionThreshold >= this.thresholds.getCriticalThreshold()) {
         throw new IllegalArgumentException(LocalizedStrings.MemoryMonitor_EVICTION_PERCENTAGE_LTE_CRITICAL_PERCENTAGE
@@ -282,9 +270,10 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
   /**
    * Compare the number of bytes used (fetched from the JVM) to the thresholds.
    * If necessary, change the state and send an event for the state change.
+   * @return true if an event was sent
    */
-  public void updateStateAndSendEvent() {
-    updateStateAndSendEvent(getBytesUsed());
+  public boolean updateStateAndSendEvent() {
+    return updateStateAndSendEvent(getBytesUsed());
   }
 
   /**
@@ -295,8 +284,10 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
    * 
    * @param bytesUsed
    *          Number of bytes of off-heap memory currently used.
+   * @return true if an event was sent
    */
-  public void updateStateAndSendEvent(long bytesUsed) {
+  public boolean updateStateAndSendEvent(long bytesUsed) {
+    boolean result = false;
     synchronized (this) {
       final MemoryEvent mre = this.mostRecentEvent;
       final MemoryState oldState = mre.getState();
@@ -310,10 +301,10 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
         if (_testHook != null) {
           _testHook.updateStateAndSendEventBeforeProcess(bytesUsed, event);
         }
-        this.upcomingEvent.set(event);
-
+        this.mostRecentEvent = event;
         processLocalEvent(event);
         updateStatsFromEvent(event);
+        result = true;
         
       } else if (!oldState.isNormal()
           && bytesUsed != mre.getBytesUsed()
@@ -323,8 +314,9 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
         if (_testHook != null) {
           _testHook.updateStateAndSendEventBeforeAbnormalProcess(bytesUsed, event);
         }
-        this.upcomingEvent.set(event);
+        this.mostRecentEvent = event;
         processLocalEvent(event);
+        result = true;
       } else {
         if (_testHook != null) {
           _testHook.updateStateAndSendEventIgnore(bytesUsed, oldState, newState, mre.getBytesUsed(), this.deliverNextAbnormalEvent);
@@ -332,6 +324,7 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
         
       }
     }
+    return result;
   }
   
   /**
@@ -343,17 +336,9 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
    * @return true if a new event might need to be sent
    */
   private boolean mightSendEvent(long bytesUsed) {
-    if (this.offHeapMemoryUsageListener.offHeapMemoryUsed != bytesUsed) {
-      // fix for GEODE-438
-      return true;
-    }
-    // We do the following if the memory is the same just in case
-    // corner cases exist in which even though the amount of memory
-    // has not changed the we might have a different state.
-    // It is possible that the following code is not needed.
     final MemoryEvent mre = this.mostRecentEvent;
     final MemoryState oldState = mre.getState();
-    final MemoryThresholds thresholds = this.thresholds;
+    final MemoryThresholds thresholds = mre.getThresholds();
     MemoryState newState = thresholds.computeNextState(oldState, bytesUsed);
     if (oldState != newState) {
       return true;
@@ -409,11 +394,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
    */
   @Override
   public void fillInProfile(final ResourceManagerProfile profile) {
-    final MemoryEvent tempEvent = this.upcomingEvent.get();
-    if (tempEvent != null) {
-      this.mostRecentEvent = tempEvent;
-      this.upcomingEvent.set(null);
-    }
     final MemoryEvent eventToPopulate = this.mostRecentEvent;
     profile.setOffHeapData(eventToPopulate.getBytesUsed(), eventToPopulate.getState(), eventToPopulate.getThresholds());
   }
@@ -437,7 +417,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
     if (this.memoryAllocator == null) {
       return 0;
     }
-    
     return this.memoryAllocator.getUsedMemory();
   }
 
@@ -505,68 +484,49 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
   }
   
   class OffHeapMemoryUsageListener implements Runnable {
-    volatile boolean stopRequested = false;
-    /**
-     * volatile so that mightSendEvent can check it w/o syncing
-     */
-    volatile long offHeapMemoryUsed; // In bytes
+    private boolean stopRequested = false;
     
-    OffHeapMemoryUsageListener(final long offHeapMemoryUsed) {
-      this.offHeapMemoryUsed = offHeapMemoryUsed;
+    OffHeapMemoryUsageListener() {
+    }
+    
+    public synchronized void stop() {
+      this.stopRequested = true;
+      this.notifyAll();
     }
     
     @Override
     public void run() {
       getLogWriter().fine("OffHeapMemoryUsageListener is starting " + this);
-      long lastOffHeapMemoryUsed;
-      synchronized (this) {
-        lastOffHeapMemoryUsed = this.offHeapMemoryUsed;
-      }
-      while (!this.stopRequested) {
-        updateStateAndSendEvent(lastOffHeapMemoryUsed);
+      int callsWithNoEvent = 0;
+      final int MS_TIMEOUT = 10;
+      final int MAX_CALLS_WITH_NO_EVENT = 1000/MS_TIMEOUT;
+      boolean exitRunLoop = false;
+      while (!exitRunLoop) {
+        if (!updateStateAndSendEvent()) {
+          callsWithNoEvent++;
+          if (callsWithNoEvent > MAX_CALLS_WITH_NO_EVENT) {
+            deliverNextAbnormalEvent();
+            callsWithNoEvent = 0;
+          }
+        } else {
+          callsWithNoEvent = 0;
+        }
 
         synchronized (this) {
-          long newOffHeapMemoryUsed = this.offHeapMemoryUsed;
           if (this.stopRequested) {
-            // no need to wait since we are stopping
-          } else if (lastOffHeapMemoryUsed != newOffHeapMemoryUsed) {
-            // no need to wait since memory used has changed
-            // This fixes a race like bug GEODE-500
-            lastOffHeapMemoryUsed = newOffHeapMemoryUsed;
+            exitRunLoop = true;
           } else {
-            // wait for memory used to change
+            // Wait to be notified that off-heap memory changed
+            // or for the wait to timeout.
+            // In some cases we need to generate an event even
+            // when we have not been notified (see GEODE-438).
+            // So we don't want this wait to be for very long.
             try {  
-              do {
-                this.wait(1000);
-                newOffHeapMemoryUsed = this.offHeapMemoryUsed;
-                if (newOffHeapMemoryUsed == lastOffHeapMemoryUsed) {
-                  // The wait timed out. So tell the OffHeapMemoryMonitor
-                  // that we need an event if the state is not normal.
-                  deliverNextAbnormalEvent();
-                  // TODO: don't we need a "break" here?
-                  //       As it is we set deliverNextAbnormalEvent
-                  //       but then go back to sleep in wait.
-                  //       We need to call updateStateAndSendEvent
-                  //       which tests deliverNextAbnormalEvent.
-                  // But just adding a break is probably not enough.
-                  // We only set deliverNextAbnormalEvent if the wait
-                  // timed out which means that the amount of offHeapMemoryUsed
-                  // did not change.
-                  // But in updateStateAndSendEvent we only deliver an
-                  // abnormal event if the amount of memory changed.
-                  // This code needs to be reviewed with Swapnil but
-                  // it looks to Darrel like deliverNextAbnormalEvent
-                  // can be removed.
-                } else {
-                  // we have been notified so exit the inner while loop
-                  // and call updateStateAndSendEvent.
-                  lastOffHeapMemoryUsed = newOffHeapMemoryUsed;
-                  break;
-                }
-              } while (!this.stopRequested);
+              this.wait(MS_TIMEOUT);
             } catch (InterruptedException iex) {
               getLogWriter().warning("OffHeapMemoryUsageListener was interrupted " + this);
               this.stopRequested = true;
+              exitRunLoop = true;
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1d0ea245/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
index ca44d93..f87ac55 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
@@ -2057,8 +2057,6 @@ public class LocalizedStrings extends ParentLocalizedStrings {
 
   public static final StringId GatewayImpl_GATEWAY_0_HAS_BEEN_REBALANCED = new StringIdImpl(5621, "GatewaySender {0} has been rebalanced");
 
-  public static final StringId OffHeapMemoryMonitor_NO_OFF_HEAP_MEMORY_HAS_BEEN_CONFIGURED = new StringIdImpl(5622, "No off-heap memory has been configured.");
-
   public static final StringId Oplog_Close_Failed = new StringIdImpl(5640, "Failed to close file {0}");
   public static final StringId Oplog_PreAllocate_Failure = new StringIdImpl(5641, "Could not pre-allocate file {0} with size={1}");
   public static final StringId Oplog_PreAllocate_Failure_Init = new StringIdImpl(5642, "Could not create and pre grow file in dir {0} with size={1}");