You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2016/07/29 16:29:48 UTC

incubator-geode git commit: GEODE-1678: Fix offheap memory leak when gateway sender events failed to be enqueued to GatewaySenderQueue.

Repository: incubator-geode
Updated Branches:
  refs/heads/develop 547fc4886 -> bf6cf4147


GEODE-1678: Fix offheap memory leak when gateway sender events failed to be enqueued to GatewaySenderQueue.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/bf6cf414
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/bf6cf414
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/bf6cf414

Branch: refs/heads/develop
Commit: bf6cf41474c14ea702d8d12de26c0012db4bc13f
Parents: 547fc48
Author: eshu <es...@pivotal.io>
Authored: Fri Jul 29 09:24:45 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Fri Jul 29 09:24:45 2016 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/RegionQueue.java     |  3 ++-
 .../internal/cache/ha/HARegionQueue.java        | 16 +++++++++---
 .../ConcurrentParallelGatewaySenderQueue.java   |  2 +-
 .../ParallelGatewaySenderEventProcessor.java    |  7 +++---
 .../parallel/ParallelGatewaySenderQueue.java    | 25 +++++++++++--------
 .../SerialGatewaySenderEventProcessor.java      | 26 +++++++++++++++-----
 .../wan/serial/SerialGatewaySenderQueue.java    |  4 ++-
 .../cache/ha/TestBlockingHARegionQueue.java     |  7 +++---
 8 files changed, 62 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionQueue.java
index 5108861..44e03bd 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionQueue.java
@@ -45,8 +45,9 @@ public interface RegionQueue
    * 
    * @throws InterruptedException
    * @throws CacheException
+   * @return boolean whether object was successfully put onto the queue
    */
-  public void put(Object object) throws InterruptedException, CacheException;
+  public boolean put(Object object) throws InterruptedException, CacheException;
 
   /**
    * Returns the underlying region that backs this queue.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
index 85b50a1..a9d5e6b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
@@ -573,7 +573,7 @@ public class HARegionQueue implements RegionQueue
   }
   
   /**
-   * Adds an object at the queue's tail. The implemetation supports concurrent
+   * Adds an object at the queue's tail. The implementation supports concurrent
    * put operations in a performant manner. This is done in following steps:
    * <br>
    * 1)Check if the Event being added has a sequence ID less than the Last
@@ -593,9 +593,9 @@ public class HARegionQueue implements RegionQueue
    *          object to put onto the queue
    * @throws InterruptedException
    * @throws CacheException
-   * 
+   * @return boolean 
    */
-  public void put(Object object) throws CacheException, InterruptedException {
+  public boolean put(Object object) throws CacheException, InterruptedException {
     this.giiLock.readLock().lock(); // fix for bug #41681 - durable client misses event
     try {
       if (this.giiCount > 0) {
@@ -616,6 +616,16 @@ public class HARegionQueue implements RegionQueue
     } finally {
       this.giiLock.readLock().unlock();
     }
+    
+    //basicPut() invokes dace.putObject() to put onto HARegionQueue
+    //However, dace.putObject could return true even though 
+    //the event is not put onto the HARegionQueue due to eliding events etc. 
+    //So it is not reliable to be used whether offheap ref ownership is passed over to 
+    //the queue (if and when HARegionQueue uses offheap). The probable 
+    //solution could be that to let dace.putObject() to increase offheap REF count
+    //when it puts the event onto the region queue. Also always release (dec)
+    //the offheap REF count from the caller.
+    return true;
   }
   
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index ccdf42a..e25f472 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -65,7 +65,7 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
   }
   
   @Override
-  public void put(Object object) throws InterruptedException, CacheException {
+  public boolean put(Object object) throws InterruptedException, CacheException {
     throw new UnsupportedOperationException("CPGAQ method(put) is not supported");
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index 11502af..1810427 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -134,6 +134,7 @@ public class ParallelGatewaySenderEventProcessor extends
 //        .isPdxTypesRegion())) {
 //      bucketId = PartitionedRegionHelper.getHashKey(event);
 //    }
+    boolean queuedEvent = false;
     try {
       EventID eventID = ((EntryEventImpl)event).getEventId();
 
@@ -142,11 +143,11 @@ public class ParallelGatewaySenderEventProcessor extends
       gatewayQueueEvent = new GatewaySenderEventImpl(operation, event,
           substituteValue, true, eventID.getBucketID());
 
+
       if (getSender().beforeEnqueue(gatewayQueueEvent)) {
         long start = getSender().getStatistics().startTime();
         try {
-          this.queue.put(gatewayQueueEvent);
-          gatewayQueueEvent = null;
+          queuedEvent = this.queue.put(gatewayQueueEvent);
         }
         catch (InterruptedException e) {
           e.printStackTrace();
@@ -161,7 +162,7 @@ public class ParallelGatewaySenderEventProcessor extends
       }
     }
     finally {
-      if (gatewayQueueEvent != null) {
+      if (!queuedEvent) {
         // it was not queued for some reason
         gatewayQueueEvent.release();
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index d21d6dc..1b5c11f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -676,9 +676,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     }
   }
   
-  public void put(Object object) throws InterruptedException, CacheException {
+  public boolean put(Object object) throws InterruptedException, CacheException {
     final boolean isDebugEnabled = logger.isDebugEnabled();
-    
+    boolean putDone = false;
     //Suranjan : Can this region ever be null? Should we work with regionName and not with region instance. 
     // It can't be as put is happeing on the region and its still under process
     GatewaySenderEventImpl value = (GatewaySenderEventImpl)object;
@@ -707,8 +707,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         logger.debug("The userRegionNameToshadowPRMap is {}", userRegionNameToshadowPRMap);
       }
       logger.warn(LocalizedMessage.create(LocalizedStrings.NOT_QUEUING_AS_USERPR_IS_NOT_YET_CONFIGURED, value));        
-      value.release();
-      return;
+      //does not put into queue
+      return false;
     }
     
     PartitionedRegion prQ = this.userRegionNameToshadowPRMap.get(regionPath);
@@ -724,8 +724,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         if (isDebugEnabled) {
           logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : Value : {}", key, value);
         }
-        value.release();
-        return;
+        //does not put into queue
+        return false;
       }  
     }else{
       key = value.getEventId();
@@ -759,6 +759,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
             brq.getInitializationLock().readLock().lock();
             try {
               putIntoBucketRegionQueue(brq, key, value);
+              putDone = true;
             } finally {
               brq.getInitializationLock().readLock().unlock();
             }
@@ -767,7 +768,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
             // above search then it means that bucket is not intended for this
             // node. So lets not add this event in temp queue event as we are
             // doing it for PRevent
-            value.release();
+            // does not put onto the queue
           } else {
             // We have to handle the case where brq is null because the
             // colocation
@@ -781,9 +782,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
                 logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : Value : {} as shadowPR bucket is destroyed.",
                     key, value);
               }
-              value.release();
+              // does not put onto the queue
             } else {
-              /**
+              /*
                * This is to prevent data loss, in the scenario when bucket is
                * not available in the cache but we know that it will be created.
                */
@@ -803,6 +804,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
                   brq.getInitializationLock().readLock().lock();
                   try {
                     putIntoBucketRegionQueue(brq, key, value);
+                    putDone = true;
                   } finally {
                     brq.getInitializationLock().readLock().unlock();
                   }
@@ -813,6 +815,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
                   // this.bucketToTempQueueMap.put(bucketId, tempQueue);
                   // }
                   tempQueue.add(value);
+                  putDone = true;
                   // For debugging purpose.
                   if (isDebugEnabled) {
                     logger.debug("The value {} is enqueued to the tempQueue for the BucketRegionQueue.", value);
@@ -840,17 +843,19 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
         if (!thisbucketDestroyed) {
           putIntoBucketRegionQueue(brq, key, value);
+          putDone = true;
         } else {
           if (isDebugEnabled) {
             logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : Value : {} as shadowPR bucket is destroyed.",
                 key, value);
           }
-          value.release();
+          // does not put onto the queue
         }
       }
     } finally {
       notifyEventProcessorIfRequired();
     }
+    return putDone;
   }
 
   public void notifyEventProcessorIfRequired() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index ba839f4..2b1eb3d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -330,8 +330,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
             it.remove();
             boolean queuedEvent = false;
             try {
-              queuePrimaryEvent(gatewayEvent);
-              queuedEvent = true;
+              queuedEvent = queuePrimaryEvent(gatewayEvent);
             } catch (IOException ex) {
               if (!stopped()) {
                 logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayImpl_EVENT_DROPPED_DURING_FAILOVER_0, gatewayEvent), ex);
@@ -431,11 +430,24 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
       }
       // If it is, create and enqueue an initialized GatewayEventImpl
       senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue); // OFFHEAP ok
-      queuePrimaryEvent(senderEvent);
+      
+      boolean queuedEvent =false;
+      try {
+        queuedEvent = queuePrimaryEvent(senderEvent);
+      } finally {
+        //When queuePrimaryEvent() failed with some exception, it could
+        //occur after the GatewaySenderEventImpl is put onto the queue.
+        //In that case, the GatewaySenderEventImpl could be released here,
+        //and IllegalStateException could be thrown if getDeserializedValue is called
+        //when the event is accessed through the region queue.
+        if (!queuedEvent) {
+          GatewaySenderEventImpl.release(senderEvent);
+        }
+      }
     }
   }
 
-  private void queuePrimaryEvent(GatewaySenderEventImpl gatewayEvent)
+  private boolean queuePrimaryEvent(GatewaySenderEventImpl gatewayEvent)
       throws IOException, CacheException {
     // Queue the event
     GatewaySenderStats statistics = this.sender.getStatistics();
@@ -447,11 +459,12 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
         logger.debug("Event {} is not added to queue.", gatewayEvent);
       }
       statistics.incEventsFiltered();
-      return;
+      return false;
     }
     long start = statistics.startTime();
+    boolean putDone = false;
     try {
-      this.queue.put(gatewayEvent);
+      putDone = this.queue.put(gatewayEvent);      
     } catch (InterruptedException e) {
       // Asif Not expected from SingleWriteSingleReadRegionQueue as it does not
       // throw
@@ -480,6 +493,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
               new Object[] { sender.getId(), Integer.valueOf(AbstractGatewaySender.QUEUE_SIZE_THRESHOLD) }));
       this.eventQueueSizeWarning = true;
     }
+    return putDone;
   }
 
   protected void waitForFailoverCompletion() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index efa7870..1ae55ac 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -232,7 +232,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     getRegion().localDestroyRegion();
   }
 
-  public synchronized void put(Object event) throws CacheException {
+  public synchronized boolean put(Object event) throws CacheException {
     GatewaySenderEventImpl eventImpl = (GatewaySenderEventImpl)event;
     final Region r = eventImpl.getRegion();
     final boolean isPDXRegion = (r instanceof DistributedRegion && r.getName()
@@ -249,9 +249,11 @@ public class SerialGatewaySenderQueue implements RegionQueue {
 //      else {
 //        synchronized (this) {
           putAndGetKey(event);
+          return true;
         //}
       //}
     }
+    return false;    
   }
 
   private long putAndGetKey(Object object) throws CacheException {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/TestBlockingHARegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/TestBlockingHARegionQueue.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/TestBlockingHARegionQueue.java
index d57290a..ced87bd 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/TestBlockingHARegionQueue.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/TestBlockingHARegionQueue.java
@@ -56,12 +56,12 @@ public class TestBlockingHARegionQueue extends HARegionQueue.TestOnlyHARegionQue
    * @throws CacheException
    * @throws InterruptedException
    * 
-   * @throws InterruptedException
+   * @return boolean whether object was successfully put onto the queue
    */
 
-  public void put(Object object) throws CacheException, InterruptedException
+  public boolean put(Object object) throws CacheException, InterruptedException
   {
-    super.put(object);
+    boolean putDone = super.put(object);
 
     if (takeFirst) {
       this.take();
@@ -71,6 +71,7 @@ public class TestBlockingHARegionQueue extends HARegionQueue.TestOnlyHARegionQue
     synchronized (forWaiting) {
       forWaiting.notifyAll();
     }
+    return putDone;
   }
 
   /**