You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2016/10/13 17:50:14 UTC

incubator-geode git commit: GEODE-1999: Fix offheap memory leak when exception is thrown during basicDestroy call to remove GatewaySenderEventImpl from the sender queue

Repository: incubator-geode
Updated Branches:
  refs/heads/develop 582694d3d -> 08adacd2c


GEODE-1999: Fix offheap memory leak when exception is thrown during basicDestroy call to remove GatewaySenderEventImpl from the sender queue

Using try and finally to make sure the offheap reference will be released.
Make similar changes for the parrellel wan queue as well.
Also release offheap memory if a virtualPut failed to put the GatewaySenderEvent into the sender queue.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/08adacd2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/08adacd2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/08adacd2

Branch: refs/heads/develop
Commit: 08adacd2cfb93533ec016a82a0f71d7110e1819d
Parents: 582694d
Author: eshu <es...@pivotal.io>
Authored: Thu Oct 13 10:44:53 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Thu Oct 13 10:44:53 2016 -0700

----------------------------------------------------------------------
 .../cache/AbstractBucketRegionQueue.java        | 34 +++++------
 .../geode/internal/cache/BucketRegionQueue.java | 59 +++++++++++---------
 .../wan/serial/SerialGatewaySenderQueue.java    | 26 ++++++---
 3 files changed, 68 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/08adacd2/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index 8fa8597..7ae1249 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -357,31 +357,31 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
       boolean ifOld, Object expectedOldValue, boolean requireOldValue,
       long lastModified, boolean overwriteDestroyed) throws TimeoutException,
       CacheWriterException {
-    boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue,
-        requireOldValue, lastModified, overwriteDestroyed);
-    if (success) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Key : ----> {}", event.getKey());
+    try {
+      boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue,
+          requireOldValue, lastModified, overwriteDestroyed);
+      if (success) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Key : ----> {}", event.getKey());
+        }      
+      } else {
+        GatewaySenderEventImpl.release(event.getRawNewValue());
       }
-      //@Unretained Object ov = event.getRawOldValue();
-      //if (ov instanceof GatewaySenderEventImpl) {
-      //  ((GatewaySenderEventImpl)ov).release();
-      //}
-       GatewaySenderEventImpl.release(event.getRawOldValue());
+      return success;
+    } finally {
+      GatewaySenderEventImpl.release(event.getRawOldValue());
     }
-    return success;
     
   }
   @Override
   protected void basicDestroy(final EntryEventImpl event,
       final boolean cacheWrite, Object expectedOldValue)
       throws EntryNotFoundException, CacheWriterException, TimeoutException {
-    super.basicDestroy(event, cacheWrite, expectedOldValue);
-    //@Unretained Object rov = event.getRawOldValue();
-    //if (rov instanceof GatewaySenderEventImpl) {
-    //  ((GatewaySenderEventImpl) rov).release();
-    //}
-	GatewaySenderEventImpl.release(event.getRawOldValue());
+    try {
+      super.basicDestroy(event, cacheWrite, expectedOldValue);
+    } finally {
+      GatewaySenderEventImpl.release(event.getRawOldValue());
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/08adacd2/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 294b616..ecc659a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -257,34 +257,38 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
       boolean ifOld, Object expectedOldValue, boolean requireOldValue,
       long lastModified, boolean overwriteDestroyed) throws TimeoutException,
       CacheWriterException {
-    boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue,
-        requireOldValue, lastModified, overwriteDestroyed);
-
-    if (success) {
-      GatewaySenderEventImpl.release(event.getRawOldValue());
-
-      if (getPartitionedRegion().getColocatedWith() == null) {
-        return success;
-      }
-
-      if (getPartitionedRegion().isConflationEnabled() && this.getBucketAdvisor().isPrimary()) {
-        Object object = event.getNewValue();
-        Long key = (Long)event.getKey();
-        if (object instanceof Conflatable) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Key :{} , Object : {} is conflatable", key, object);
-          }
-          // TODO: TO optimize by destroying on primary and secondary separately
-          // in case of conflation
-          conflateOldEntry((Conflatable)object, key);
-        } else {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Object : {} is not conflatable", object);
+    try {
+      boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue,
+          requireOldValue, lastModified, overwriteDestroyed);
+  
+      if (success) {
+        if (getPartitionedRegion().getColocatedWith() == null) {
+          return success;
+        }
+  
+        if (getPartitionedRegion().isConflationEnabled() && this.getBucketAdvisor().isPrimary()) {
+          Object object = event.getNewValue();
+          Long key = (Long)event.getKey();
+          if (object instanceof Conflatable) {
+            if (logger.isDebugEnabled()) {
+              logger.debug("Key :{} , Object : {} is conflatable", key, object);
+            }
+            // TODO: TO optimize by destroying on primary and secondary separately
+            // in case of conflation
+            conflateOldEntry((Conflatable)object, key);
+          } else {
+            if (logger.isDebugEnabled()) {
+              logger.debug("Object : {} is not conflatable", object);
+            }
           }
         }
+      } else {
+        GatewaySenderEventImpl.release(event.getRawNewValue());
       }
+      return success;
+    } finally {
+      GatewaySenderEventImpl.release(event.getRawOldValue());
     }
-    return success;
   }
 
   private void conflateOldEntry(Conflatable object, Long tailKey) {
@@ -357,9 +361,12 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
     if (getPartitionedRegion().isConflationEnabled()) {
       removeIndex((Long)event.getKey());
     }
-    super.basicDestroy(event, cacheWrite, expectedOldValue);
+    try {
+      super.basicDestroy(event, cacheWrite, expectedOldValue);
+    } finally {
+      GatewaySenderEventImpl.release(event.getRawOldValue());
+    }
 
-    GatewaySenderEventImpl.release(event.getRawOldValue());
     // Primary buckets should already remove the key while peeking
     if (!this.getBucketAdvisor().isPrimary()) {
       if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/08adacd2/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 79b9d86..a22666c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -1301,22 +1301,32 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     protected void basicDestroy(final EntryEventImpl event,
         final boolean cacheWrite, Object expectedOldValue)
         throws EntryNotFoundException, CacheWriterException, TimeoutException {
-
-      super.basicDestroy(event, cacheWrite, expectedOldValue);
-      GatewaySenderEventImpl.release(event.getRawOldValue());
+      try {
+        super.basicDestroy(event, cacheWrite, expectedOldValue);
+      } finally {
+        GatewaySenderEventImpl.release(event.getRawOldValue());
+      }
     }
     @Override
     protected boolean virtualPut(EntryEventImpl event, boolean ifNew,
         boolean ifOld, Object expectedOldValue, boolean requireOldValue,
         long lastModified, boolean overwriteDestroyed) throws TimeoutException,
         CacheWriterException {
-      boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue,
-          requireOldValue, lastModified, overwriteDestroyed);
-
-      if (success) {
+      try {
+        boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue,
+            requireOldValue, lastModified, overwriteDestroyed);
+        if (!success) {
+          //release offheap reference if GatewaySenderEventImpl is not put into 
+          //the region queue
+          GatewaySenderEventImpl.release(event.getRawNewValue());
+        }
+        return success;
+      } finally {
+        //GatewaySenderQueue probably only adding new events into the queue.
+        //Add the finally block just in case if there actually is an update 
+        //in the sender queue or occurs in the the future.
         GatewaySenderEventImpl.release(event.getRawOldValue());
       }
-      return success;
     }
   }
 }