You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ma...@apache.org on 2015/11/23 21:49:10 UTC

[40/50] [abbrv] incubator-geode git commit: GEODE-438: fix race in OffHeapMemoryMonitor This fix also fixes GEODE-488 and GEODE-551. Also fixed test to use standard locators for connectivity instead of an mcast-port.

GEODE-438: fix race in OffHeapMemoryMonitor
This fix also fixes GEODE-488 and GEODE-551.
Also fixed test to use standard locators for connectivity
instead of an mcast-port.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/59ab8cf5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/59ab8cf5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/59ab8cf5

Branch: refs/heads/feature/GEODE-53
Commit: 59ab8cf5aef00dddb9587ae9b7964de277630d35
Parents: 4b904ce
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Nov 17 17:20:59 2015 -0800
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Thu Nov 19 14:35:12 2015 -0800

----------------------------------------------------------------------
 .../cache/control/InternalResourceManager.java  |   2 +-
 .../cache/control/OffHeapMemoryMonitor.java     | 137 +++++++++----------
 .../MemoryThresholdsOffHeapDUnitTest.java       |  17 ++-
 3 files changed, 74 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/59ab8cf5/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
index 81d46ff..98e1f25 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/InternalResourceManager.java
@@ -165,7 +165,7 @@ public class InternalResourceManager implements ResourceManager {
     // Create the monitors
     Map<ResourceType, ResourceMonitor> tempMonitors = new HashMap<ResourceType, ResourceMonitor>();
     tempMonitors.put(ResourceType.HEAP_MEMORY, new HeapMemoryMonitor(this, cache, this.stats));
-    tempMonitors.put(ResourceType.OFFHEAP_MEMORY, new OffHeapMemoryMonitor(this, cache, this.stats));
+    tempMonitors.put(ResourceType.OFFHEAP_MEMORY, new OffHeapMemoryMonitor(this, cache, cache.getOffHeapStore(), this.stats));
     this.resourceMonitors = Collections.unmodifiableMap(tempMonitors);
     
     // Initialize the listener sets so that it only needs to be done once

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/59ab8cf5/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
index 721e9a6..3ab39ea 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
@@ -48,9 +48,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
       MemoryState.DISABLED, null, 0L, true, this.thresholds);
   private volatile MemoryState currentState = MemoryState.DISABLED;
 
-  // This holds a new event as it transitions from updateStateAndSendEvent(...) to fillInProfile()
-  private ThreadLocal<MemoryEvent> upcomingEvent = new ThreadLocal<MemoryEvent>();
-  
   // Set when startMonitoring() and stopMonitoring() are called
   // Package private for testing
   Boolean started = false;
@@ -58,28 +55,33 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
   // Set to true when setEvictionThreshold(...) is called.
   private boolean hasEvictionThreshold = false;
 
-  private OffHeapMemoryUsageListener offHeapMemoryUsageListener;
   private Thread memoryListenerThread;
   
+  private final OffHeapMemoryUsageListener offHeapMemoryUsageListener;
   private final InternalResourceManager resourceManager;
   private final ResourceAdvisor resourceAdvisor;
   private final GemFireCacheImpl cache;
   private final ResourceManagerStats stats;
+  /**
+   * InternalResoruceManager insists on creating a OffHeapMemoryMonitor even when it
+   * does not have off-heap memory. So we need to handle memoryAllocator being null.
+   */
   private final MemoryAllocator memoryAllocator;
   private final LogWriterI18n log;
 
-  OffHeapMemoryMonitor(final InternalResourceManager resourceManager, final GemFireCacheImpl cache, final ResourceManagerStats stats) {
+  OffHeapMemoryMonitor(final InternalResourceManager resourceManager, final GemFireCacheImpl cache, final MemoryAllocator memoryAllocator, final ResourceManagerStats stats) {
     this.resourceManager = resourceManager;
     this.resourceAdvisor = (ResourceAdvisor) cache.getDistributionAdvisor();
     this.cache = cache;
     this.stats = stats;
     
-    this.memoryAllocator = cache.getOffHeapStore();
-    if (this.memoryAllocator != null) {
+    this.memoryAllocator = memoryAllocator;
+    if (memoryAllocator != null) {
       this.thresholds = new MemoryThresholds(this.memoryAllocator.getTotalMemory());
     }
     
     this.log = cache.getLoggerI18n();
+    this.offHeapMemoryUsageListener = new OffHeapMemoryUsageListener();
   }
 
   /**
@@ -92,7 +94,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
         return;
       }
 
-      this.offHeapMemoryUsageListener = new OffHeapMemoryUsageListener(getBytesUsed());
       ThreadGroup group = LoggingThreadGroup.createThreadGroup("OffHeapMemoryMonitor Threads", logger);
       Thread t = new Thread(group, this.offHeapMemoryUsageListener);
       t.setName(t.getName() + " OffHeapMemoryListener");
@@ -123,10 +124,7 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
       
       this.memoryAllocator.removeMemoryUsageListener(this);
       
-      this.offHeapMemoryUsageListener.stopRequested = true;
-      synchronized (this.offHeapMemoryUsageListener) {
-        this.offHeapMemoryUsageListener.notifyAll();
-      }
+      this.offHeapMemoryUsageListener.stop();
       if (waitForThread) {
         threadToWaitFor = this.memoryListenerThread;
       }
@@ -177,10 +175,7 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
     if (!willSendEvent) {
       return;
     }
-    synchronized (this.offHeapMemoryUsageListener) {
-      this.offHeapMemoryUsageListener.offHeapMemoryUsed = bytesUsed;
-      this.offHeapMemoryUsageListener.notifyAll();
-    }
+    this.offHeapMemoryUsageListener.deliverEvent();
     if (_testHook != null) {
       _testHook.afterNotifyUpdateMemoryUsed(bytesUsed);
     }
@@ -281,9 +276,10 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
   /**
    * Compare the number of bytes used (fetched from the JVM) to the thresholds.
    * If necessary, change the state and send an event for the state change.
+   * @return true if an event was sent
    */
-  public void updateStateAndSendEvent() {
-    updateStateAndSendEvent(getBytesUsed());
+  public boolean updateStateAndSendEvent() {
+    return updateStateAndSendEvent(getBytesUsed());
   }
 
   /**
@@ -294,8 +290,10 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
    * 
    * @param bytesUsed
    *          Number of bytes of off-heap memory currently used.
+   * @return true if an event was sent
    */
-  public void updateStateAndSendEvent(long bytesUsed) {
+  public boolean updateStateAndSendEvent(long bytesUsed) {
+    boolean result = false;
     synchronized (this) {
       final MemoryEvent mre = this.mostRecentEvent;
       final MemoryState oldState = mre.getState();
@@ -309,10 +307,10 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
         if (_testHook != null) {
           _testHook.updateStateAndSendEventBeforeProcess(bytesUsed, event);
         }
-        this.upcomingEvent.set(event);
-
+        this.mostRecentEvent = event;
         processLocalEvent(event);
         updateStatsFromEvent(event);
+        result = true;
         
       } else if (!oldState.isNormal()
           && bytesUsed != mre.getBytesUsed()
@@ -322,8 +320,9 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
         if (_testHook != null) {
           _testHook.updateStateAndSendEventBeforeAbnormalProcess(bytesUsed, event);
         }
-        this.upcomingEvent.set(event);
+        this.mostRecentEvent = event;
         processLocalEvent(event);
+        result = true;
       } else {
         if (_testHook != null) {
           _testHook.updateStateAndSendEventIgnore(bytesUsed, oldState, newState, mre.getBytesUsed(), this.deliverNextAbnormalEvent);
@@ -331,6 +330,7 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
         
       }
     }
+    return result;
   }
   
   /**
@@ -344,7 +344,7 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
   private boolean mightSendEvent(long bytesUsed) {
     final MemoryEvent mre = this.mostRecentEvent;
     final MemoryState oldState = mre.getState();
-    final MemoryThresholds thresholds = this.thresholds;
+    final MemoryThresholds thresholds = mre.getThresholds();
     MemoryState newState = thresholds.computeNextState(oldState, bytesUsed);
     if (oldState != newState) {
       return true;
@@ -400,11 +400,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
    */
   @Override
   public void fillInProfile(final ResourceManagerProfile profile) {
-    final MemoryEvent tempEvent = this.upcomingEvent.get();
-    if (tempEvent != null) {
-      this.mostRecentEvent = tempEvent;
-      this.upcomingEvent.set(null);
-    }
     final MemoryEvent eventToPopulate = this.mostRecentEvent;
     profile.setOffHeapData(eventToPopulate.getBytesUsed(), eventToPopulate.getState(), eventToPopulate.getThresholds());
   }
@@ -428,7 +423,6 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
     if (this.memoryAllocator == null) {
       return 0;
     }
-    
     return this.memoryAllocator.getUsedMemory();
   }
 
@@ -496,65 +490,60 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
   }
   
   class OffHeapMemoryUsageListener implements Runnable {
-    volatile boolean stopRequested = false;
-    long offHeapMemoryUsed; // In bytes
+    private boolean deliverEvent = false;
+    private boolean stopRequested = false;
     
-    OffHeapMemoryUsageListener(final long offHeapMemoryUsed) {
-      this.offHeapMemoryUsed = offHeapMemoryUsed;
+    OffHeapMemoryUsageListener() {
+    }
+    
+    public synchronized void deliverEvent() {
+      this.deliverEvent = true;
+      this.notifyAll();
+    }
+
+    public synchronized void stop() {
+      this.stopRequested = true;
+      this.notifyAll();
     }
     
     @Override
     public void run() {
       getLogWriter().fine("OffHeapMemoryUsageListener is starting " + this);
-      long lastOffHeapMemoryUsed;
-      synchronized (this) {
-        lastOffHeapMemoryUsed = this.offHeapMemoryUsed;
-      }
-      while (!this.stopRequested) {
-        updateStateAndSendEvent(lastOffHeapMemoryUsed);
+      int callsWithNoEvent = 0;
+      final int MS_TIMEOUT = 10;
+      final int MAX_CALLS_WITH_NO_EVENT = 1000/MS_TIMEOUT;
+      boolean exitRunLoop = false;
+      while (!exitRunLoop) {
+        if (!updateStateAndSendEvent()) {
+          callsWithNoEvent++;
+          if (callsWithNoEvent > MAX_CALLS_WITH_NO_EVENT) {
+            deliverNextAbnormalEvent();
+            callsWithNoEvent = 0;
+          }
+        } else {
+          callsWithNoEvent = 0;
+        }
 
         synchronized (this) {
-          long newOffHeapMemoryUsed = this.offHeapMemoryUsed;
           if (this.stopRequested) {
-            // no need to wait since we are stopping
-          } else if (lastOffHeapMemoryUsed != newOffHeapMemoryUsed) {
-            // no need to wait since memory used has changed
-            // This fixes a race like bug GEODE-500
-            lastOffHeapMemoryUsed = newOffHeapMemoryUsed;
+            exitRunLoop = true;
+          } else if (this.deliverEvent) {
+            // No need to wait.
+            // Loop around and call updateStateAndSendEvent.
+            this.deliverEvent = false;
           } else {
-            // wait for memory used to change
+            // Wait to be notified that off-heap memory changed
+            // or for the wait to timeout.
+            // In some cases we need to generate an event even
+            // when we have not been notified (see GEODE-438).
+            // So we don't want this wait to be for very long.
             try {  
-              do {
-                this.wait(1000);
-                newOffHeapMemoryUsed = this.offHeapMemoryUsed;
-                if (newOffHeapMemoryUsed == lastOffHeapMemoryUsed) {
-                  // The wait timed out. So tell the OffHeapMemoryMonitor
-                  // that we need an event if the state is not normal.
-                  deliverNextAbnormalEvent();
-                  // TODO: don't we need a "break" here?
-                  //       As it is we set deliverNextAbnormalEvent
-                  //       but then go back to sleep in wait.
-                  //       We need to call updateStateAndSendEvent
-                  //       which tests deliverNextAbnormalEvent.
-                  // But just adding a break is probably not enough.
-                  // We only set deliverNextAbnormalEvent if the wait
-                  // timed out which means that the amount of offHeapMemoryUsed
-                  // did not change.
-                  // But in updateStateAndSendEvent we only deliver an
-                  // abnormal event if the amount of memory changed.
-                  // This code needs to be reviewed with Swapnil but
-                  // it looks to Darrel like deliverNextAbnormalEvent
-                  // can be removed.
-                } else {
-                  // we have been notified so exit the inner while loop
-                  // and call updateStateAndSendEvent.
-                  lastOffHeapMemoryUsed = newOffHeapMemoryUsed;
-                  break;
-                }
-              } while (!this.stopRequested);
+              this.wait(MS_TIMEOUT);
+              this.deliverEvent = false;
             } catch (InterruptedException iex) {
               getLogWriter().warning("OffHeapMemoryUsageListener was interrupted " + this);
               this.stopRequested = true;
+              exitRunLoop = true;
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/59ab8cf5/gemfire-core/src/test/java/com/gemstone/gemfire/cache/management/MemoryThresholdsOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/management/MemoryThresholdsOffHeapDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/management/MemoryThresholdsOffHeapDUnitTest.java
index a6f24d0..5399d28 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/management/MemoryThresholdsOffHeapDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/management/MemoryThresholdsOffHeapDUnitTest.java
@@ -524,7 +524,6 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
     final VM replicate1 = host.getVM(1);
     final VM replicate2 = host.getVM(2);
     final String rName = getUniqueName();
-    final int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort();
     
     // Make sure the desired VMs will have a fresh DS.
     AsyncInvocation d1 = replicate1.invokeAsync(DistributedTestCase.class, "disconnectFromDS");
@@ -537,7 +536,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
       @SuppressWarnings("synthetic-access")
       @Override
       public void run2() throws CacheException {
-        getSystem(getServerProperties(mcastPort));
+        getSystem(getOffHeapProperties());
       }
     };
     replicate1.invoke(establishConnectivity);
@@ -547,7 +546,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
       @Override
       public void run2() throws CacheException {
         // Assert some level of connectivity
-        InternalDistributedSystem ds = getSystem(getServerProperties(mcastPort));
+        InternalDistributedSystem ds = getSystem(getOffHeapProperties());
         assertTrue(ds.getDistributionManager().getNormalDistributionManagerIds().size() >= 1);
 
         InternalResourceManager irm = (InternalResourceManager)getCache().getResourceManager();
@@ -1156,14 +1155,13 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
     final Host host = Host.getHost(0);
     final VM vm = host.getVM(2);
     final String rName = getUniqueName();
-    final int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort();
 
     vm.invoke(DistributedTestCase.class, "disconnectFromDS");
     
     vm.invoke(new CacheSerializableRunnable("test LocalRegion load passthrough when critical") {
       @Override
       public void run2() throws CacheException {
-        getSystem(getServerProperties(mcastPort));
+        getSystem(getOffHeapProperties());
         InternalResourceManager irm = (InternalResourceManager)getCache().getResourceManager();
         final OffHeapMemoryMonitor ohmm = irm.getOffHeapMonitor();
         irm.setCriticalOffHeapPercentage(90f);
@@ -1821,12 +1819,17 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
     });
   }
   
-  private Properties getServerProperties(int mcastPort) {
+  private Properties getOffHeapProperties() {
     Properties p = new Properties();
+    p.setProperty(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME, "1m");
+    return p;
+  }
+
+  private Properties getServerProperties(int mcastPort) {
+    Properties p = getOffHeapProperties();
     p.setProperty(DistributionConfig.MCAST_PORT_NAME, mcastPort + "");
     p.setProperty(DistributionConfig.MCAST_TTL_NAME, "0");
     p.setProperty(DistributionConfig.LOCATORS_NAME, "");
-    p.setProperty(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME, "1m");
     return p;
   }