You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2015/11/02 22:53:03 UTC

[37/50] [abbrv] incubator-geode git commit: GEODE-96: fix race in unit test

GEODE-96: fix race in unit test

It looks like the problem might have been that the
test only waited 5 seconds for an event.
It now waits 30 seconds. It also now can sense more
of what is happening on the OffHeapMemmoryMonitor
using the new testHook OffHeapMemoryMonitorObserver.
If this problem reproduces then the test can add
additional validation to its observer to narrow down
what the cause of the missing event is.
This also fixes GEODE-348.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/6aadbf81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/6aadbf81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/6aadbf81

Branch: refs/heads/feature/GEODE-328
Commit: 6aadbf81c8d8549cd82ae73994997f3a9e83c060
Parents: f437106
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Fri Oct 23 13:40:22 2015 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Fri Oct 23 13:43:04 2015 -0700

----------------------------------------------------------------------
 .../cache/control/OffHeapMemoryMonitor.java     |  61 ++++++++-
 .../MemoryThresholdsOffHeapDUnitTest.java       | 132 +++++++++++++++++--
 2 files changed, 182 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6aadbf81/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
index 0678c01..27780cc 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/OffHeapMemoryMonitor.java
@@ -117,15 +117,48 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
     }
   }
 
+  public volatile OffHeapMemoryMonitorObserver testHook;
+  
+  /**
+   * Used by unit tests to be notified when OffHeapMemoryMonitor
+   * does something.
+   */
+  public static interface OffHeapMemoryMonitorObserver {
+    /**
+     * Called at the beginning of updateMemoryUsed.
+     * @param bytesUsed the number of bytes of off-heap memory currently used
+     * @param willSendEvent true if an event will be sent to the OffHeapMemoryUsageListener. 
+     */
+    public void beginUpdateMemoryUsed(long bytesUsed, boolean willSendEvent);
+    public void afterNotifyUpdateMemoryUsed(long bytesUsed);
+    /**
+     * Called at the beginning of updateStateAndSendEvent.
+     * @param bytesUsed the number of bytes of off-heap memory currently used
+     * @param willSendEvent true if an event will be sent to the OffHeapMemoryUsageListener. 
+     */
+    public void beginUpdateStateAndSendEvent(long bytesUsed, boolean willSendEvent);
+    public void updateStateAndSendEventBeforeProcess(long bytesUsed, MemoryEvent event);
+    public void updateStateAndSendEventBeforeAbnormalProcess(long bytesUsed, MemoryEvent event);
+    public void updateStateAndSendEventIgnore(long bytesUsed, MemoryState oldState, MemoryState newState, long mostRecentBytesUsed,
+        boolean deliverNextAbnormalEvent);
+  }
   @Override
   public void updateMemoryUsed(final long bytesUsed) {
-    if (!mightSendEvent(bytesUsed)) {
+    final boolean willSendEvent = mightSendEvent(bytesUsed);
+    final OffHeapMemoryMonitorObserver _testHook = this.testHook;
+    if (_testHook != null) {
+      _testHook.beginUpdateMemoryUsed(bytesUsed, willSendEvent);
+    }
+    if (!willSendEvent) {
       return;
     }
     synchronized (this.offHeapMemoryUsageListener) {
       this.offHeapMemoryUsageListener.offHeapMemoryUsed = bytesUsed;
       this.offHeapMemoryUsageListener.notifyAll();
     }
+    if (_testHook != null) {
+      _testHook.afterNotifyUpdateMemoryUsed(bytesUsed);
+    }
   }
   
   void setCriticalThreshold(final float criticalThreshold) {
@@ -242,11 +275,15 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
       final MemoryEvent mre = this.mostRecentEvent;
       final MemoryState oldState = mre.getState();
       final MemoryThresholds thresholds = this.thresholds;
+      final OffHeapMemoryMonitorObserver _testHook = this.testHook;
       MemoryState newState = thresholds.computeNextState(oldState, bytesUsed);
       if (oldState != newState) {
         this.currentState = newState;
         
         MemoryEvent event = new MemoryEvent(ResourceType.OFFHEAP_MEMORY, oldState, newState, this.cache.getMyId(), bytesUsed, true, thresholds);
+        if (_testHook != null) {
+          _testHook.updateStateAndSendEventBeforeProcess(bytesUsed, event);
+        }
         this.upcomingEvent.set(event);
 
         processLocalEvent(event);
@@ -257,8 +294,16 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
           && this.deliverNextAbnormalEvent) {
         this.deliverNextAbnormalEvent = false;
         MemoryEvent event = new MemoryEvent(ResourceType.OFFHEAP_MEMORY, oldState, newState, this.cache.getMyId(), bytesUsed, true, thresholds);
+        if (_testHook != null) {
+          _testHook.updateStateAndSendEventBeforeAbnormalProcess(bytesUsed, event);
+        }
         this.upcomingEvent.set(event);
         processLocalEvent(event);
+      } else {
+        if (_testHook != null) {
+          _testHook.updateStateAndSendEventIgnore(bytesUsed, oldState, newState, mre.getBytesUsed(), this.deliverNextAbnormalEvent);
+        }
+        
       }
     }
   }
@@ -452,6 +497,20 @@ public class OffHeapMemoryMonitor implements ResourceMonitor, MemoryUsageListene
                   // The wait timed out. So tell the OffHeapMemoryMonitor
                   // that we need an event if the state is not normal.
                   deliverNextAbnormalEvent();
+                  // TODO: don't we need a "break" here?
+                  //       As it is we set deliverNextAbnormalEvent
+                  //       but then go back to sleep in wait.
+                  //       We need to call updateStateAndSendEvent
+                  //       which tests deliverNextAbnormalEvent.
+                  // But just adding a break is probably not enough.
+                  // We only set deliverNextAbnormalEvent if the wait
+                  // timed out which means that the amount of offHeapMemoryUsed
+                  // did not change.
+                  // But in updateStateAndSendEvent we only deliver an
+                  // abnormal event if the amount of memory changed.
+                  // This code needs to be reviewed with Swapnil but
+                  // it looks to Darrel like deliverNextAbnormalEvent
+                  // can be removed.
                 } else {
                   // we have been notified so exit the inner while loop
                   // and call updateStateAndSendEvent.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6aadbf81/gemfire-core/src/test/java/com/gemstone/gemfire/cache/management/MemoryThresholdsOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/management/MemoryThresholdsOffHeapDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/management/MemoryThresholdsOffHeapDUnitTest.java
index d65dcc7..816d668 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/management/MemoryThresholdsOffHeapDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/management/MemoryThresholdsOffHeapDUnitTest.java
@@ -53,6 +53,7 @@ import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.Resou
 import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
 import com.gemstone.gemfire.internal.cache.control.MemoryThresholds.MemoryState;
 import com.gemstone.gemfire.internal.cache.control.OffHeapMemoryMonitor;
+import com.gemstone.gemfire.internal.cache.control.OffHeapMemoryMonitor.OffHeapMemoryMonitorObserver;
 import com.gemstone.gemfire.internal.cache.control.ResourceAdvisor;
 import com.gemstone.gemfire.internal.cache.control.ResourceListener;
 import com.gemstone.gemfire.internal.cache.control.TestMemoryThresholdListener;
@@ -1383,6 +1384,8 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
     final Host host = Host.getHost(0);
     final VM server = host.getVM(0);
     final VM client = host.getVM(1);
+    final Object bigKey = -1;
+    final Object smallKey = -2;
 
     final int port = AvailablePortHelper.getRandomAvailableTCPPort();
     final int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort();
@@ -1394,20 +1397,52 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
     doPutAlls(client, regionName, false/*catchServerException*/,
         false/*catchLowMemoryException*/, Range.DEFAULT);
 
+    
     //make the region sick in the server
-    server.invoke(new SerializableRunnable() {
-      public void run() {
+    final long bytesUsedAfterSmallKey = (long)server.invoke(new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
         InternalResourceManager irm = ((GemFireCacheImpl)getCache()).getResourceManager();
         final OffHeapMemoryMonitor ohm = irm.getOffHeapMonitor();
         assertTrue(ohm.getState().isNormal());
         getCache().getLoggerI18n().fine(addExpectedExString);
         final LocalRegion r = (LocalRegion) getRootRegion().getSubregion(regionName);
-        final Object key = 1;
-        r.put(key, new byte[943720]);
+        final long bytesUsedAfterSmallKey;
+        {
+          OffHeapMemoryMonitorObserverImpl _testHook = new OffHeapMemoryMonitorObserverImpl();
+          ohm.testHook = _testHook;
+          try {
+            r.put(smallKey, "1234567890");
+            bytesUsedAfterSmallKey = _testHook.verifyBeginUpdateMemoryUsed(false);
+          } finally {
+            ohm.testHook = null;
+          }
+        }
+        {
+          final OffHeapMemoryMonitorObserverImpl th = new OffHeapMemoryMonitorObserverImpl();
+          ohm.testHook = th;
+          try {
+            r.put(bigKey, new byte[943720]);
+            th.verifyBeginUpdateMemoryUsed(bytesUsedAfterSmallKey + 943720 + 8, true);
+            WaitCriterion waitForCritical = new WaitCriterion() {
+              public boolean done() {
+                return th.checkUpdateStateAndSendEventBeforeProcess(bytesUsedAfterSmallKey + 943720 + 8, MemoryState.EVICTION_DISABLED_CRITICAL);
+              }
+              @Override
+              public String description() {
+                return null;
+              }
+            };
+            waitForCriterion(waitForCritical, 30*1000, 9, false);
+            th.validateUpdateStateAndSendEventBeforeProcess(bytesUsedAfterSmallKey + 943720 + 8, MemoryState.EVICTION_DISABLED_CRITICAL);
+          } finally {
+            ohm.testHook = null;
+          }
+        }
         WaitCriterion wc;
         if (r instanceof PartitionedRegion) {
           final PartitionedRegion pr = (PartitionedRegion) r;
-          final int bucketId = PartitionedRegionHelper.getHashKey(pr, null, key, null, null);
+          final int bucketId = PartitionedRegionHelper.getHashKey(pr, null, bigKey, null, null);
           wc = new WaitCriterion() {
             @Override
             public String description() {
@@ -1419,7 +1454,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
               if (!ohm.getState().isCritical()) return false;
               // Only done once the bucket has been marked sick
               try {
-                pr.getRegionAdvisor().checkIfBucketSick(bucketId, key);
+                pr.getRegionAdvisor().checkIfBucketSick(bucketId, bigKey);
                 return false;
               } catch (LowMemoryException ignore) {
                 return true;
@@ -1439,9 +1474,9 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
             }
           };
         }
-        waitForCriterion(wc, 5000, 100, true);
+        waitForCriterion(wc, 30000, 9, true);
         getCache().getLoggerI18n().fine(removeExpectedExString);
-        return;
+        return bytesUsedAfterSmallKey;
       }
     });
 
@@ -1458,7 +1493,14 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
         final OffHeapMemoryMonitor ohm = irm.getOffHeapMonitor();
         assertTrue(ohm.getState().isCritical());
         getCache().getLogger().fine(MemoryThresholdsOffHeapDUnitTest.this.addExpectedBelow);
-        getRootRegion().getSubregion(regionName).destroy(1);
+        OffHeapMemoryMonitorObserverImpl _testHook = new OffHeapMemoryMonitorObserverImpl();
+        ohm.testHook = _testHook;
+        try {
+          getRootRegion().getSubregion(regionName).destroy(bigKey);
+          _testHook.verifyBeginUpdateMemoryUsed(bytesUsedAfterSmallKey, true);
+        } finally {
+          ohm.testHook = null;
+        }
         WaitCriterion wc = new WaitCriterion() {
           @Override
           public String description() {
@@ -1470,13 +1512,83 @@ public class MemoryThresholdsOffHeapDUnitTest extends ClientServerTestCase {
             return ohm.getState().isNormal();
           }
         };
-        waitForCriterion(wc, 5000, 100, true);
+        waitForCriterion(wc, 30000, 9, true);
         getCache().getLogger().fine(MemoryThresholdsOffHeapDUnitTest.this.removeExpectedBelow);
         return;
       }
     });
   }
   
+  private static class OffHeapMemoryMonitorObserverImpl implements OffHeapMemoryMonitorObserver {
+    private boolean beginUpdateMemoryUsed;
+    private long beginUpdateMemoryUsed_bytesUsed;
+    private boolean beginUpdateMemoryUsed_willSendEvent;
+    @Override
+    public synchronized void beginUpdateMemoryUsed(long bytesUsed, boolean willSendEvent) {
+      beginUpdateMemoryUsed = true;
+      beginUpdateMemoryUsed_bytesUsed = bytesUsed;
+      beginUpdateMemoryUsed_willSendEvent = willSendEvent;
+    }
+    @Override
+    public synchronized void afterNotifyUpdateMemoryUsed(long bytesUsed) {
+    }
+    @Override
+    public synchronized void beginUpdateStateAndSendEvent(long bytesUsed, boolean willSendEvent) {
+    }
+    private boolean updateStateAndSendEventBeforeProcess;
+    private long updateStateAndSendEventBeforeProcess_bytesUsed;
+    private MemoryEvent updateStateAndSendEventBeforeProcess_event;
+    @Override
+    public synchronized void updateStateAndSendEventBeforeProcess(long bytesUsed, MemoryEvent event) {
+      updateStateAndSendEventBeforeProcess = true;
+      updateStateAndSendEventBeforeProcess_bytesUsed = bytesUsed;
+      updateStateAndSendEventBeforeProcess_event = event;
+    }
+    @Override
+    public synchronized void updateStateAndSendEventBeforeAbnormalProcess(long bytesUsed, MemoryEvent event) {
+    }
+    @Override
+    public synchronized void updateStateAndSendEventIgnore(long bytesUsed, MemoryState oldState, MemoryState newState, long mostRecentBytesUsed,
+        boolean deliverNextAbnormalEvent) {
+    }
+
+    public synchronized void verifyBeginUpdateMemoryUsed(long expected_bytesUsed, boolean expected_willSendEvent) {
+      if (!beginUpdateMemoryUsed) {
+        fail("beginUpdateMemoryUsed was not called");
+      }
+      assertEquals(expected_bytesUsed, beginUpdateMemoryUsed_bytesUsed);
+      assertEquals(expected_willSendEvent, beginUpdateMemoryUsed_willSendEvent);
+    }
+    /**
+     * Verify that beginUpdateMemoryUsed was called, event will be sent, and return the "bytesUsed" it recorded.
+     */
+    public synchronized long verifyBeginUpdateMemoryUsed(boolean expected_willSendEvent) {
+      if (!beginUpdateMemoryUsed) {
+        fail("beginUpdateMemoryUsed was not called");
+      }
+      assertEquals(expected_willSendEvent, beginUpdateMemoryUsed_willSendEvent);
+      return beginUpdateMemoryUsed_bytesUsed;
+    }
+    public synchronized boolean checkUpdateStateAndSendEventBeforeProcess(long expected_bytesUsed, MemoryState expected_memoryState) {
+      if (!updateStateAndSendEventBeforeProcess) {
+        return false;
+      }
+      if (expected_bytesUsed != updateStateAndSendEventBeforeProcess_bytesUsed) {
+        return false;
+      }
+      if (!expected_memoryState.equals(updateStateAndSendEventBeforeProcess_event.getState())) {
+        return false;
+      }
+      return true;
+    }
+    public synchronized void validateUpdateStateAndSendEventBeforeProcess(long expected_bytesUsed, MemoryState expected_memoryState) {
+      if (!updateStateAndSendEventBeforeProcess) {
+        fail("updateStateAndSendEventBeforeProcess was not called");
+      }
+      assertEquals(expected_bytesUsed, updateStateAndSendEventBeforeProcess_bytesUsed);
+      assertEquals(expected_memoryState, updateStateAndSendEventBeforeProcess_event.getState());
+    }
+   }
   private void registerTestMemoryThresholdListener(VM vm) {
     vm.invoke(new SerializableCallable() {
       public Object call() throws Exception {