You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/09/21 22:34:16 UTC

[geode] 01/01: GEODE-5772: fix the potential cache leaks caused by reconnect

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-5772
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a92c904194f117ea5a14f63bf3c0bb211fd55572
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Fri Sep 21 15:33:21 2018 -0700

    GEODE-5772: fix the potential cache leaks caused by reconnect
---
 .../apache/geode/distributed/ServerLauncher.java   |  4 +++
 .../internal/ClusterDistributionManager.java       |  5 +++
 .../distributed/internal/DistributionManager.java  |  2 ++
 .../internal/InternalDistributedSystem.java        | 10 ++++++
 .../internal/LonerDistributionManager.java         |  2 ++
 .../membership/gms/mgr/GMSMembershipManager.java   |  2 +-
 .../apache/geode/internal/cache/DiskStoreImpl.java |  3 +-
 .../geode/internal/cache/GemFireCacheImpl.java     |  1 +
 .../apache/geode/internal/cache/TXManagerImpl.java |  4 ++-
 .../geode/internal/cache/TombstoneService.java     | 23 ++++++++-----
 .../internal/cache/wan/AbstractGatewaySender.java  |  5 +++
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 38 ++++++++++------------
 .../cache/wan/serial/SerialGatewaySenderQueue.java | 14 ++++----
 .../cache/xmlcache/CacheServerCreation.java        |  4 +--
 14 files changed, 75 insertions(+), 42 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
index 9b7be48..6c2fe5d 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
@@ -347,6 +347,10 @@ public class ServerLauncher extends AbstractLauncher<String> {
     return this.cache;
   }
 
+  public void setCache(Cache cache) {
+    this.cache = cache;
+  }
+
   /**
    * Gets the CacheConfig object used to configure additional GemFire Cache components and features
    * (e.g. PDX).
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index 6b503c7..966cdc9 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -2111,6 +2111,10 @@ public class ClusterDistributionManager implements DistributionManager {
     this.membershipListeners.remove(l);
   }
 
+  public void clearMembershipListeners() {
+    this.membershipListeners.clear();
+  }
+
   /**
    * Adds a <code>MembershipListener</code> to this distribution manager.
    */
@@ -2280,6 +2284,7 @@ public class ClusterDistributionManager implements DistributionManager {
           this.localAddress));
       MembershipLogger.logShutdown(this.localAddress);
       closed = true;
+      this.cache = null;
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
index 9742822..f9e7e6f 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
@@ -198,6 +198,8 @@ public interface DistributionManager extends ReplySender {
    */
   void removeMembershipListener(MembershipListener l);
 
+  void clearMembershipListeners();
+
   /**
    * Removes a <code>MembershipListener</code> listening for all members from this distribution
    * manager.
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index 9d992a1..5d5fe8d 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -67,6 +67,7 @@ import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.DistributedSystemDisconnectedException;
 import org.apache.geode.distributed.DurableClientAttributes;
+import org.apache.geode.distributed.ServerLauncher;
 import org.apache.geode.distributed.internal.locks.GrantorRequestProcessor;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.distributed.internal.membership.MembershipManager;
@@ -2693,6 +2694,11 @@ public class InternalDistributedSystem extends DistributedSystem
           logger.warn("Exception disconnecting for reconnect", ee);
         }
 
+        if (ServerLauncher.getInstance() != null) {
+          ServerLauncher.getInstance().setCache(null);
+        }
+        getDM().clearMembershipListeners();
+
         try {
           reconnectLock.wait(timeOut);
         } catch (InterruptedException e) {
@@ -2798,6 +2804,10 @@ public class InternalDistributedSystem extends DistributedSystem
               }
               cache = GemFireCacheImpl.create(this.reconnectDS, config);
 
+              if (ServerLauncher.getInstance() != null) {
+                ServerLauncher.getInstance().setCache(cache);
+              }
+
               createAndStartCacheServers(cacheServerCreation, cache);
 
               if (cache.getCachePerfStats().getReliableRegionsMissing() == 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index 6115f7c..f265def 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -251,6 +251,8 @@ public class LonerDistributionManager implements DistributionManager {
 
   public void removeMembershipListener(MembershipListener l) {}
 
+  public void clearMembershipListeners() {}
+
   public void removeAllMembershipListener(MembershipListener l) {}
 
   public void addAdminConsole(InternalDistributedMember p_id) {}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index cb19969..c015cb4 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -1596,7 +1596,7 @@ public class GMSMembershipManager implements MembershipManager, Manager {
         for (final Object o : cache.getCacheServers()) {
           CacheServerImpl cs = (CacheServerImpl) o;
           if (cs.isDefaultServer()) {
-            CacheServerCreation bsc = new CacheServerCreation(cache, cs);
+            CacheServerCreation bsc = new CacheServerCreation(cache, cs, false);
             list.add(bsc);
           }
         }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index d174157..dfb65a2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -263,7 +263,7 @@ public class DiskStoreImpl implements DiskStore {
   public static volatile HashSet<String> TEST_CHK_FALLOC_DIRS;
   public static volatile HashSet<String> TEST_NO_FALLOC_DIRS;
 
-  private final InternalCache cache;
+  private InternalCache cache;
 
   /** The stats for this store */
   private final DiskStoreStats stats;
@@ -2368,6 +2368,7 @@ public class DiskStoreImpl implements DiskStore {
       }
     } finally {
       this.closed = true;
+      this.cache = null;
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 467feee..7beecdf 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -2202,6 +2202,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
             }
           } catch (CancelException ignore) {
           }
+          ((AbstractGatewaySender) sender).resetCache();
         }
 
         destroyGatewaySenderLockService();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index b0ec44d..e2aa0f2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -92,7 +92,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
   private final AtomicInteger uniqId;
 
   private final DistributionManager dm;
-  private final InternalCache cache;
+  private InternalCache cache;
 
   // The DistributionMemberID used to construct TXId's
   private final InternalDistributedMember distributionMgrId;
@@ -649,6 +649,8 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
     if (isClosed()) {
       return;
     }
+    this.cache = null;
+    this.currentInstance = null;
     TXStateProxy[] proxies = null;
     synchronized (this.hostedTXStates) {
       // After this, newly added TXStateProxy would not operate on the TXState.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
index 14c1d53..fa90fcf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
@@ -570,6 +570,10 @@ public class TombstoneService {
               try {
                 // this thread should not reference other sweeper state, which is not synchronized
                 for (Map.Entry<DistributedRegion, Set<Object>> mapEntry : reapedKeys.entrySet()) {
+                  if (isStopped) {
+                    logger.info("expireBatch is stopped due to close");
+                    break;
+                  }
                   DistributedRegion r = mapEntry.getKey();
                   Set<Object> rKeysReaped = mapEntry.getValue();
                   r.distributeTombstoneGC(rKeysReaped);
@@ -775,7 +779,7 @@ public class TombstoneService {
     protected final CachePerfStats stats;
     private final CancelCriterion cancelCriterion;
 
-    private volatile boolean isStopped;
+    protected volatile boolean isStopped;
 
     TombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion,
         long expiryTime, String threadName) {
@@ -923,13 +927,16 @@ public class TombstoneService {
       if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) {
         logger.trace(LogMarker.TOMBSTONE_VERBOSE, "sleeping for {}", sleepTime);
       }
-      synchronized (this) {
-        if (isStopped) {
-          return;
-        }
-        try {
-          this.wait(sleepTime);
-        } catch (InterruptedException ignore) {
+      long then = getNow();
+      while ((getNow() - then) <= sleepTime) {
+        synchronized (this) {
+          if (isStopped) {
+            return;
+          }
+          try {
+            this.wait(500);
+          } catch (InterruptedException ignore) {
+          }
         }
       }
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index b099eb1..2eb7abe 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -278,6 +278,11 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
     return senderAdvisor;
   }
 
+  public void resetCache() {
+    logger.info("Cache reference is reset for GatewaySender " + this.getId());
+    this.cache = null;
+  }
+
   @Override
   public GatewaySenderStats getStatistics() {
     return statistics;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 057697a..d3cbd43 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -288,7 +288,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     // still, it is safer approach to synchronize it
     synchronized (ParallelGatewaySenderQueue.class) {
       if (removalThread == null) {
-        removalThread = new BatchRemovalThread(this.sender.getCache(), this);
+        removalThread = new BatchRemovalThread(this);
         removalThread.start();
       }
     }
@@ -310,9 +310,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       if (this.userRegionNameToshadowPRMap.containsKey(regionName))
         return;
 
-      InternalCache cache = sender.getCache();
       final String prQName = getQueueName(sender.getId(), userRegion.getFullPath());
-      prQ = (PartitionedRegion) cache.getRegion(prQName);
+      prQ = (PartitionedRegion) sender.getCache().getRegion(prQName);
       if (prQ == null) {
         // TODO:REF:Avoid deprecated apis
         AttributesFactory fact = new AttributesFactory();
@@ -356,10 +355,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         }
 
         ParallelGatewaySenderQueueMetaRegion meta =
-            new ParallelGatewaySenderQueueMetaRegion(prQName, ra, null, cache, sender);
+            new ParallelGatewaySenderQueueMetaRegion(prQName, ra, null, sender.getCache(), sender);
 
         try {
-          prQ = (PartitionedRegion) cache.createVMRegion(prQName, ra,
+          prQ = (PartitionedRegion) sender.getCache().createVMRegion(prQName, ra,
               new InternalRegionArguments().setInternalMetaRegion(meta).setDestroyLockFlag(true)
                   .setSnapshotInputStream(null).setImageTarget(null));
 
@@ -399,7 +398,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         // in case shadowPR exists already (can be possible when sender is
         // started from stop operation)
         if (this.index == 0) // HItesh: for first processor only
-          handleShadowPRExistsScenario(cache, prQ);
+          handleShadowPRExistsScenario(sender.getCache(), prQ);
       }
       /*
        * Here, enqueueTempEvents need to be invoked when a sender is already running and userPR is
@@ -455,11 +454,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
                 .toLocalizedString(new Object[] {this.sender.getId(), userPR.getFullPath()}));
       }
 
-      InternalCache cache = sender.getCache();
       boolean isAccessor = (userPR.getLocalMaxMemory() == 0);
 
       final String prQName = sender.getId() + QSTRING + convertPathToName(userPR.getFullPath());
-      prQ = (PartitionedRegion) cache.getRegion(prQName);
+      prQ = (PartitionedRegion) sender.getCache().getRegion(prQName);
       if (prQ == null) {
         // TODO:REF:Avoid deprecated apis
 
@@ -506,10 +504,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         }
 
         ParallelGatewaySenderQueueMetaRegion meta =
-            metaRegionFactory.newMetataRegion(cache, prQName, ra, sender);
+            metaRegionFactory.newMetataRegion(sender.getCache(), prQName, ra, sender);
 
         try {
-          prQ = (PartitionedRegion) cache.createVMRegion(prQName, ra,
+          prQ = (PartitionedRegion) sender.getCache().createVMRegion(prQName, ra,
               new InternalRegionArguments().setInternalMetaRegion(meta).setDestroyLockFlag(true)
                   .setInternalRegion(true).setSnapshotInputStream(null).setImageTarget(null));
           // at this point we should be able to assert prQ == meta;
@@ -520,7 +518,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
             return; // return from here if accessor node
 
           // Add the overflow statistics to the mbean
-          addOverflowStatisticsToMBean(cache, prQ);
+          addOverflowStatisticsToMBean(sender.getCache(), prQ);
 
           // Wait for buckets to be recovered.
           prQ.shadowPRWaitForBucketRecovery();
@@ -539,7 +537,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         // in case shadowPR exists already (can be possible when sender is
         // started from stop operation)
         if (this.index == 0) // HItesh:for first parallelGatewaySenderQueue only
-          handleShadowPRExistsScenario(cache, prQ);
+          handleShadowPRExistsScenario(sender.getCache(), prQ);
       }
 
     } finally {
@@ -1014,8 +1012,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
           }
         } else {
           String regionPath = event.getRegionPath();
-          InternalCache cache = this.sender.getCache();
-          Region region = (PartitionedRegion) cache.getRegion(regionPath);
+          Region region = (PartitionedRegion) sender.getCache().getRegion(regionPath);
           if (region != null && !region.isDestroyed()) {
             // TODO: We have to get colocated parent region for this region
             if (region instanceof DistributedRegion) {
@@ -1600,17 +1597,14 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
      */
     private volatile boolean shutdown = false;
 
-    private final InternalCache cache;
-
     private final ParallelGatewaySenderQueue parallelQueue;
 
     /**
      * Constructor : Creates and initializes the thread
      */
-    public BatchRemovalThread(InternalCache c, ParallelGatewaySenderQueue queue) {
+    public BatchRemovalThread(ParallelGatewaySenderQueue queue) {
       super("BatchRemovalThread for GatewaySender_" + queue.sender.getId() + "_" + queue.index);
       this.setDaemon(true);
-      this.cache = c;
       this.parallelQueue = queue;
     }
 
@@ -1618,7 +1612,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       if (shutdown) {
         return true;
       }
-      if (cache.getCancelCriterion().isCancelInProgress()) {
+      if (parallelQueue.sender.getCache().getCancelCriterion().isCancelInProgress()) {
         return true;
       }
       return false;
@@ -1627,7 +1621,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     @Override
     public void run() {
       try {
-        InternalDistributedSystem ids = cache.getInternalDistributedSystem();
+        InternalDistributedSystem ids =
+            parallelQueue.sender.getCache().getInternalDistributedSystem();
         DistributionManager dm = ids.getDistributionManager();
         for (;;) {
           try { // be somewhat tolerant of failures
@@ -1684,7 +1679,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
               buckToDispatchLock.unlock();
             }
             // Get all the data-stores wherever userPRs are present
-            Set<InternalDistributedMember> recipients = getAllRecipients(cache, temp);
+            Set<InternalDistributedMember> recipients =
+                getAllRecipients(parallelQueue.sender.getCache(), temp);
             if (!recipients.isEmpty()) {
               ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(temp);
               pqrm.setRecipients(recipients);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 6cfe7f4..357d992 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -213,7 +213,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     initializeRegion(abstractSender, listener);
     // Increment queue size. Fix for bug 51988.
     this.stats.incQueueSize(this.region.size());
-    this.removalThread = new BatchRemovalThread(abstractSender.getCache());
+    this.removalThread = new BatchRemovalThread();
     this.removalThread.start();
     this.sender = abstractSender;
     if (logger.isDebugEnabled()) {
@@ -1042,22 +1042,19 @@ public class SerialGatewaySenderQueue implements RegionQueue {
      */
     private volatile boolean shutdown = false;
 
-    private final InternalCache cache;
-
     /**
      * Constructor : Creates and initializes the thread
      *
      */
-    public BatchRemovalThread(InternalCache c) {
+    public BatchRemovalThread() {
       this.setDaemon(true);
-      this.cache = c;
     }
 
     private boolean checkCancelled() {
       if (shutdown) {
         return true;
       }
-      if (cache.getCancelCriterion().isCancelInProgress()) {
+      if (sender.getCache().getCancelCriterion().isCancelInProgress()) {
         return true;
       }
       return false;
@@ -1065,7 +1062,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
 
     @Override
     public void run() {
-      InternalDistributedSystem ids = cache.getInternalDistributedSystem();
+      InternalDistributedSystem ids = sender.getCache().getInternalDistributedSystem();
 
       try { // ensure exit message is printed
         // Long waitTime = Long.getLong(QUEUE_REMOVAL_WAIT_TIME, 1000);
@@ -1113,7 +1110,8 @@ public class SerialGatewaySenderQueue implements RegionQueue {
             }
             // release not needed since disallowOffHeapValues called
             EntryEventImpl event = EntryEventImpl.create((LocalRegion) region, Operation.DESTROY,
-                (lastDestroyedKey + 1), null/* newValue */, null, false, cache.getMyId());
+                (lastDestroyedKey + 1), null/* newValue */, null, false,
+                sender.getCache().getMyId());
             event.disallowOffHeapValues();
             event.setTailKey(temp);
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java
index 07c2685..a9f2e46 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java
@@ -54,8 +54,8 @@ public class CacheServerCreation extends AbstractCacheServer {
    * Constructor for retaining bridge server information during auto-reconnect
    *
    */
-  public CacheServerCreation(InternalCache cache, CacheServer other) {
-    super(cache);
+  public CacheServerCreation(InternalCache cache, CacheServer other, boolean attachListener) {
+    super(cache, attachListener);
     setPort(other.getPort());
     setBindAddress(other.getBindAddress());
     setHostnameForClients(other.getHostnameForClients());