You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/09/01 18:48:45 UTC

[07/37] incubator-geode git commit: GEODE-212: improve off-heap + ResourceManager performance

GEODE-212: improve off-heap + ResourceManager performance

The synchronous code that notifies the async listener now does
a quick check to see if an event might be needed. This is done
with no synchronization. It just does two volatile reads.
The only time the synchronized notify is now needed is if we might
have crossed a threshold boundary or if the listener indicates it
really wants an event.

The old code called System.currentTimeMillis everytime it created
a MemoryEvent. This timestamp was only used by off-heap to reduce
how many notifications were done when the state is not normal.
Heap does notifications in abnormal states every time the amount
of memory changes. Off-heap does it when abnormal at most every second
by using the timestamp.
To get rid of all these currentTimeMillis calls the new code has the
background OffHeapMemoryUsageListener times out its wait call every
second. If it times out it will set a volatile that indicates it
wants the next abnormal event.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/e865b35f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/e865b35f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/e865b35f

Branch: refs/heads/feature/GEODE-77
Commit: e865b35fa58196a7475efcc36d108e871d76ba4c
Parents: 751f2e9
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Wed Aug 12 13:49:48 2015 -0700
Committer: Kirk Lund <kl...@pivotal.io>
Committed: Wed Aug 19 16:08:02 2015 -0700

----------------------------------------------------------------------
 .../internal/cache/control/MemoryEvent.java     |  9 +-
 .../cache/control/OffHeapMemoryMonitor.java     | 88 ++++++++++++++++----
 2 files changed, 72 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e865b35f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/MemoryEvent.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/MemoryEvent.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/MemoryEvent.java
index bda6518..c6dfd10 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/MemoryEvent.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/MemoryEvent.java
@@ -19,13 +19,12 @@ import com.gemstone.gemfire.internal.cache.control.MemoryThresholds.MemoryState;
  */
 public class MemoryEvent implements ResourceEvent {
   private final ResourceType type;
-  private volatile MemoryState state;
+  private final MemoryState state;
   private final MemoryState previousState;
   private final DistributedMember member;
   private final long bytesUsed;
   private final boolean isLocal;
   private final MemoryThresholds thresholds;
-  private final long eventTime;
  
   public MemoryEvent(final ResourceType type, final MemoryState previousState, final MemoryState state,
       final DistributedMember member, final long bytesUsed, final boolean isLocal, final MemoryThresholds thresholds) {
@@ -36,7 +35,6 @@ public class MemoryEvent implements ResourceEvent {
     this.bytesUsed = bytesUsed;
     this.isLocal = isLocal;
     this.thresholds = thresholds;
-    this.eventTime = System.currentTimeMillis();
   }
 
   @Override
@@ -66,10 +64,6 @@ public class MemoryEvent implements ResourceEvent {
     return this.isLocal;
   }
   
-  public long getEventTime() {
-    return this.eventTime;
-  }
-
   public MemoryThresholds getThresholds() {
     return this.thresholds;
   }
@@ -84,7 +78,6 @@ public class MemoryEvent implements ResourceEvent {
         .append(",state:" + this.state)
         .append(",bytesUsed:" + this.bytesUsed)
         .append(",isLocal:" + this.isLocal)
-        .append(",eventTime:" + this.eventTime)
         .append(",thresholds:" + this.thresholds + "]")
         .toString();
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e865b35f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
index a1856e4..0678c01 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
@@ -119,6 +119,9 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
 
   @Override
   public void updateMemoryUsed(final long bytesUsed) {
+    if (!mightSendEvent(bytesUsed)) {
+      return;
+    }
     synchronized (this.offHeapMemoryUsageListener) {
       this.offHeapMemoryUsageListener.offHeapMemoryUsed = bytesUsed;
       this.offHeapMemoryUsageListener.notifyAll();
@@ -232,33 +235,70 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
    * Public for testing.
    * 
    * @param bytesUsed
-   *          Number of bytes of heap memory currently used.
+   *          Number of bytes of off-heap memory currently used.
    */
   public void updateStateAndSendEvent(long bytesUsed) {
     synchronized (this) {
-      MemoryState oldState = this.mostRecentEvent.getState();
-      MemoryState newState = this.thresholds.computeNextState(oldState, bytesUsed);
+      final MemoryEvent mre = this.mostRecentEvent;
+      final MemoryState oldState = mre.getState();
+      final MemoryThresholds thresholds = this.thresholds;
+      MemoryState newState = thresholds.computeNextState(oldState, bytesUsed);
       if (oldState != newState) {
         this.currentState = newState;
         
-        MemoryEvent event = new MemoryEvent(ResourceType.OFFHEAP_MEMORY, oldState, newState, this.cache.getMyId(), bytesUsed, true,
-            this.thresholds);
+        MemoryEvent event = new MemoryEvent(ResourceType.OFFHEAP_MEMORY, oldState, newState, this.cache.getMyId(), bytesUsed, true, thresholds);
         this.upcomingEvent.set(event);
 
         processLocalEvent(event);
         updateStatsFromEvent(event);
         
-      // The state didn't change.  However, if the state isn't normal and we've
-      // been in that state for a while, send another event with the updated
-      // memory usage.
-      } else if (!oldState.isNormal() && (System.currentTimeMillis() - this.mostRecentEvent.getEventTime()) > 1000) {
-        MemoryEvent event = new MemoryEvent(ResourceType.OFFHEAP_MEMORY, oldState, newState, this.cache.getMyId(), bytesUsed, true,
-            this.thresholds);
+      } else if (!oldState.isNormal()
+          && bytesUsed != mre.getBytesUsed()
+          && this.deliverNextAbnormalEvent) {
+        this.deliverNextAbnormalEvent = false;
+        MemoryEvent event = new MemoryEvent(ResourceType.OFFHEAP_MEMORY, oldState, newState, this.cache.getMyId(), bytesUsed, true, thresholds);
         this.upcomingEvent.set(event);
         processLocalEvent(event);
       }
     }
   }
+  
+  /**
+   * Return true if the given number of bytes compared to the
+   * current monitor state would generate a new memory event.
+   * 
+   * @param bytesUsed
+   *          Number of bytes of off-heap memory currently used.
+   * @return true if a new event might need to be sent
+   */
+  private boolean mightSendEvent(long bytesUsed) {
+    final MemoryEvent mre = this.mostRecentEvent;
+    final MemoryState oldState = mre.getState();
+    final MemoryThresholds thresholds = this.thresholds;
+    MemoryState newState = thresholds.computeNextState(oldState, bytesUsed);
+    if (oldState != newState) {
+      return true;
+    } else if (!oldState.isNormal()
+        && bytesUsed != mre.getBytesUsed()
+        && this.deliverNextAbnormalEvent) {
+      return true;
+    }
+    return false;
+  }
+  
+  private volatile boolean deliverNextAbnormalEvent = false;
+   
+  /**
+   * Used by the OffHeapMemoryUsageListener to tell us that
+   * the next abnormal event should be delivered even if the
+   * state does not change as long as the memory usage changed.
+   * For some reason, unknown to me, if we stay in an abnormal
+   * state for more than a second then we want to send another
+   * event to update the memory usage.
+   */
+  void deliverNextAbnormalEvent() {
+    this.deliverNextAbnormalEvent = true;
+  }
 
   /**
    * Update resource manager stats based upon the given event.
@@ -387,7 +427,7 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
   
   class OffHeapMemoryUsageListener implements Runnable {
     volatile boolean stopRequested = false;
-    volatile long offHeapMemoryUsed; // In bytes
+    long offHeapMemoryUsed; // In bytes
     
     OffHeapMemoryUsageListener(final long offHeapMemoryUsed) {
       this.offHeapMemoryUsed = offHeapMemoryUsed;
@@ -396,15 +436,29 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
     @Override
     public void run() {
       getLogWriter().fine("OffHeapMemoryUsageListener is starting " + this);
-
+      long lastOffHeapMemoryUsed;
+      synchronized (this) {
+        lastOffHeapMemoryUsed = this.offHeapMemoryUsed;
+      }
       while (!this.stopRequested) {
-        final long saveOffHeapMemoryUsed = this.offHeapMemoryUsed;
-        updateStateAndSendEvent(saveOffHeapMemoryUsed);
+        updateStateAndSendEvent(lastOffHeapMemoryUsed);
 
         synchronized (this) {
-          if (saveOffHeapMemoryUsed == this.offHeapMemoryUsed && !this.stopRequested) {
+          if (lastOffHeapMemoryUsed == this.offHeapMemoryUsed && !this.stopRequested) {
             try {
-              this.wait();
+              do {
+                this.wait(1000);
+                if (this.offHeapMemoryUsed == lastOffHeapMemoryUsed) {
+                  // The wait timed out. So tell the OffHeapMemoryMonitor
+                  // that we need an event if the state is not normal.
+                  deliverNextAbnormalEvent();
+                } else {
+                  // we have been notified so exit the inner while loop
+                  // and call updateStateAndSendEvent.
+                  lastOffHeapMemoryUsed = this.offHeapMemoryUsed;
+                  break;
+                }
+              } while (true);
             } catch (InterruptedException iex) {
               getLogWriter().warning("OffHeapMemoryUsageListener was interrupted " + this);
               this.stopRequested = true;