You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/09/21 22:37:50 UTC

[geode] branch feature/GEODE-5772 updated (a92c904 -> d3bb8d8)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a change to branch feature/GEODE-5772
in repository https://gitbox.apache.org/repos/asf/geode.git.


 discard a92c904  GEODE-5772: fix the potential cache leaks caused by reconnect
     add 1998a21  GEODE-5761: Exclude android-json transitive dependency (#2494)
     add be52507  GEODE-5314: Cleanup and document MBeanStatsMonitor child classes
     add cc36c91  GEODE-5600: Fix extension developer issues with buildinfo
     add 6112860  Merge pull request #2497 from rhoughton-pivot/GEODE-5600-extension-conflicts
     add 3e6c0a6  GEODE-5501: add logging in test base for debug
     add 6984baa  GEODE-5761: Fix checkPom for exclusion of android-json
     add efb2e29  GEODE-5765: Clean up soft references faster in dunit tests
     add 6006ec3  GEODE-5747: Handling SocketException in InternalDataSerializer (#2484)
     add cd17350  GEODE-5551: Replace thread interleaving mechanism in LoginHandler...Test
     new d3bb8d8  GEODE-5772: fix the potential cache leaks caused by reconnect Co-authored-by Barry Oglesby

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (a92c904)
            \
             N -- N -- N   refs/heads/feature/GEODE-5772 (d3bb8d8)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 build.gradle                                       |   6 +-
 geode-core/build.gradle                            |  10 +-
 .../cache/NetSearchMessagingDUnitTest.java         | 241 +++++++------
 .../cache/PartitionedRegionMultipleDUnitTest.java  |  10 +-
 .../cache/versions/TombstoneDUnitTest.java         |  20 +-
 .../geode/internal/InternalDataSerializer.java     |   3 +-
 .../beans/stats/AggregateRegionStatsMonitor.java   | 319 +++++++++---------
 .../internal/beans/stats/GCStatsMonitor.java       |  73 ++--
 .../beans/stats/GatewaySenderOverflowMonitor.java  | 168 ++++++----
 .../beans/stats/MemberLevelDiskMonitor.java        | 371 +++++++++++----------
 .../internal/beans/stats/VMStatsMonitor.java       | 114 ++++---
 .../internal/InternalDataSerializerJUnitTest.java  |  19 ++
 .../stats/AggregateRegionStatsMonitorTest.java     | 244 ++++++++++++++
 .../internal/beans/stats/GCStatsMonitorTest.java   |  84 +++++
 .../stats/GatewaySenderOverflowMonitorTest.java    | 177 ++++++++++
 .../beans/stats/MemberLevelDiskMonitorTest.java    | 230 +++++++++++++
 .../internal/beans/stats/VMStatsMonitorTest.java   | 142 ++++++++
 .../dunit/internal/JUnit4DistributedTestCase.java  |   7 +-
 .../test/dunit/standalone/ProcessManager.java      |   1 +
 geode-junit/build.gradle                           |   4 +-
 geode-junit/src/test/resources/expected-pom.xml    |   6 +
 geode-web/build.gradle                             |   4 +-
 .../support/LoginHandlerInterceptorJUnitTest.java  | 332 ++++++++----------
 23 files changed, 1764 insertions(+), 821 deletions(-)
 create mode 100644 geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/AggregateRegionStatsMonitorTest.java
 create mode 100644 geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GCStatsMonitorTest.java
 create mode 100644 geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitorTest.java
 create mode 100644 geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/MemberLevelDiskMonitorTest.java
 create mode 100644 geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/VMStatsMonitorTest.java


[geode] 01/01: GEODE-5772: fix the potential cache leaks caused by reconnect Co-authored-by Barry Oglesby

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-5772
in repository https://gitbox.apache.org/repos/asf/geode.git

commit d3bb8d848732428cbfa420cfd82a0b345e09599b
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Fri Sep 21 15:33:21 2018 -0700

    GEODE-5772: fix the potential cache leaks caused by reconnect
    Co-authored-by Barry Oglesby
---
 .../apache/geode/distributed/ServerLauncher.java   |  4 +++
 .../internal/ClusterDistributionManager.java       |  5 +++
 .../distributed/internal/DistributionManager.java  |  2 ++
 .../internal/InternalDistributedSystem.java        | 10 ++++++
 .../internal/LonerDistributionManager.java         |  2 ++
 .../membership/gms/mgr/GMSMembershipManager.java   |  2 +-
 .../apache/geode/internal/cache/DiskStoreImpl.java |  3 +-
 .../geode/internal/cache/GemFireCacheImpl.java     |  1 +
 .../apache/geode/internal/cache/TXManagerImpl.java |  4 ++-
 .../geode/internal/cache/TombstoneService.java     | 23 ++++++++-----
 .../internal/cache/wan/AbstractGatewaySender.java  |  5 +++
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 38 ++++++++++------------
 .../cache/wan/serial/SerialGatewaySenderQueue.java | 14 ++++----
 .../cache/xmlcache/CacheServerCreation.java        |  4 +--
 14 files changed, 75 insertions(+), 42 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
index 9b7be48..6c2fe5d 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
@@ -347,6 +347,10 @@ public class ServerLauncher extends AbstractLauncher<String> {
     return this.cache;
   }
 
+  public void setCache(Cache cache) {
+    this.cache = cache;
+  }
+
   /**
    * Gets the CacheConfig object used to configure additional GemFire Cache components and features
    * (e.g. PDX).
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index 6b503c7..966cdc9 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -2111,6 +2111,10 @@ public class ClusterDistributionManager implements DistributionManager {
     this.membershipListeners.remove(l);
   }
 
+  public void clearMembershipListeners() {
+    this.membershipListeners.clear();
+  }
+
   /**
    * Adds a <code>MembershipListener</code> to this distribution manager.
    */
@@ -2280,6 +2284,7 @@ public class ClusterDistributionManager implements DistributionManager {
           this.localAddress));
       MembershipLogger.logShutdown(this.localAddress);
       closed = true;
+      this.cache = null;
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
index 9742822..f9e7e6f 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
@@ -198,6 +198,8 @@ public interface DistributionManager extends ReplySender {
    */
   void removeMembershipListener(MembershipListener l);
 
+  void clearMembershipListeners();
+
   /**
    * Removes a <code>MembershipListener</code> listening for all members from this distribution
    * manager.
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index 9d992a1..5d5fe8d 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -67,6 +67,7 @@ import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.DistributedSystemDisconnectedException;
 import org.apache.geode.distributed.DurableClientAttributes;
+import org.apache.geode.distributed.ServerLauncher;
 import org.apache.geode.distributed.internal.locks.GrantorRequestProcessor;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.distributed.internal.membership.MembershipManager;
@@ -2693,6 +2694,11 @@ public class InternalDistributedSystem extends DistributedSystem
           logger.warn("Exception disconnecting for reconnect", ee);
         }
 
+        if (ServerLauncher.getInstance() != null) {
+          ServerLauncher.getInstance().setCache(null);
+        }
+        getDM().clearMembershipListeners();
+
         try {
           reconnectLock.wait(timeOut);
         } catch (InterruptedException e) {
@@ -2798,6 +2804,10 @@ public class InternalDistributedSystem extends DistributedSystem
               }
               cache = GemFireCacheImpl.create(this.reconnectDS, config);
 
+              if (ServerLauncher.getInstance() != null) {
+                ServerLauncher.getInstance().setCache(cache);
+              }
+
               createAndStartCacheServers(cacheServerCreation, cache);
 
               if (cache.getCachePerfStats().getReliableRegionsMissing() == 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index 6115f7c..f265def 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -251,6 +251,8 @@ public class LonerDistributionManager implements DistributionManager {
 
   public void removeMembershipListener(MembershipListener l) {}
 
+  public void clearMembershipListeners() {}
+
   public void removeAllMembershipListener(MembershipListener l) {}
 
   public void addAdminConsole(InternalDistributedMember p_id) {}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index cb19969..c015cb4 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -1596,7 +1596,7 @@ public class GMSMembershipManager implements MembershipManager, Manager {
         for (final Object o : cache.getCacheServers()) {
           CacheServerImpl cs = (CacheServerImpl) o;
           if (cs.isDefaultServer()) {
-            CacheServerCreation bsc = new CacheServerCreation(cache, cs);
+            CacheServerCreation bsc = new CacheServerCreation(cache, cs, false);
             list.add(bsc);
           }
         }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index d174157..dfb65a2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -263,7 +263,7 @@ public class DiskStoreImpl implements DiskStore {
   public static volatile HashSet<String> TEST_CHK_FALLOC_DIRS;
   public static volatile HashSet<String> TEST_NO_FALLOC_DIRS;
 
-  private final InternalCache cache;
+  private InternalCache cache;
 
   /** The stats for this store */
   private final DiskStoreStats stats;
@@ -2368,6 +2368,7 @@ public class DiskStoreImpl implements DiskStore {
       }
     } finally {
       this.closed = true;
+      this.cache = null;
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 467feee..7beecdf 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -2202,6 +2202,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
             }
           } catch (CancelException ignore) {
           }
+          ((AbstractGatewaySender) sender).resetCache();
         }
 
         destroyGatewaySenderLockService();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index b0ec44d..e2aa0f2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -92,7 +92,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
   private final AtomicInteger uniqId;
 
   private final DistributionManager dm;
-  private final InternalCache cache;
+  private InternalCache cache;
 
   // The DistributionMemberID used to construct TXId's
   private final InternalDistributedMember distributionMgrId;
@@ -649,6 +649,8 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
     if (isClosed()) {
       return;
     }
+    this.cache = null;
+    this.currentInstance = null;
     TXStateProxy[] proxies = null;
     synchronized (this.hostedTXStates) {
       // After this, newly added TXStateProxy would not operate on the TXState.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
index 14c1d53..fa90fcf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
@@ -570,6 +570,10 @@ public class TombstoneService {
               try {
                 // this thread should not reference other sweeper state, which is not synchronized
                 for (Map.Entry<DistributedRegion, Set<Object>> mapEntry : reapedKeys.entrySet()) {
+                  if (isStopped) {
+                    logger.info("expireBatch is stopped due to close");
+                    break;
+                  }
                   DistributedRegion r = mapEntry.getKey();
                   Set<Object> rKeysReaped = mapEntry.getValue();
                   r.distributeTombstoneGC(rKeysReaped);
@@ -775,7 +779,7 @@ public class TombstoneService {
     protected final CachePerfStats stats;
     private final CancelCriterion cancelCriterion;
 
-    private volatile boolean isStopped;
+    protected volatile boolean isStopped;
 
     TombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion,
         long expiryTime, String threadName) {
@@ -923,13 +927,16 @@ public class TombstoneService {
       if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) {
         logger.trace(LogMarker.TOMBSTONE_VERBOSE, "sleeping for {}", sleepTime);
       }
-      synchronized (this) {
-        if (isStopped) {
-          return;
-        }
-        try {
-          this.wait(sleepTime);
-        } catch (InterruptedException ignore) {
+      long then = getNow();
+      while ((getNow() - then) <= sleepTime) {
+        synchronized (this) {
+          if (isStopped) {
+            return;
+          }
+          try {
+            this.wait(500);
+          } catch (InterruptedException ignore) {
+          }
         }
       }
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index b099eb1..2eb7abe 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -278,6 +278,11 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
     return senderAdvisor;
   }
 
+  public void resetCache() {
+    logger.info("Cache reference is reset for GatewaySender " + this.getId());
+    this.cache = null;
+  }
+
   @Override
   public GatewaySenderStats getStatistics() {
     return statistics;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 057697a..d3cbd43 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -288,7 +288,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     // still, it is safer approach to synchronize it
     synchronized (ParallelGatewaySenderQueue.class) {
       if (removalThread == null) {
-        removalThread = new BatchRemovalThread(this.sender.getCache(), this);
+        removalThread = new BatchRemovalThread(this);
         removalThread.start();
       }
     }
@@ -310,9 +310,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       if (this.userRegionNameToshadowPRMap.containsKey(regionName))
         return;
 
-      InternalCache cache = sender.getCache();
       final String prQName = getQueueName(sender.getId(), userRegion.getFullPath());
-      prQ = (PartitionedRegion) cache.getRegion(prQName);
+      prQ = (PartitionedRegion) sender.getCache().getRegion(prQName);
       if (prQ == null) {
         // TODO:REF:Avoid deprecated apis
         AttributesFactory fact = new AttributesFactory();
@@ -356,10 +355,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         }
 
         ParallelGatewaySenderQueueMetaRegion meta =
-            new ParallelGatewaySenderQueueMetaRegion(prQName, ra, null, cache, sender);
+            new ParallelGatewaySenderQueueMetaRegion(prQName, ra, null, sender.getCache(), sender);
 
         try {
-          prQ = (PartitionedRegion) cache.createVMRegion(prQName, ra,
+          prQ = (PartitionedRegion) sender.getCache().createVMRegion(prQName, ra,
               new InternalRegionArguments().setInternalMetaRegion(meta).setDestroyLockFlag(true)
                   .setSnapshotInputStream(null).setImageTarget(null));
 
@@ -399,7 +398,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         // in case shadowPR exists already (can be possible when sender is
         // started from stop operation)
         if (this.index == 0) // HItesh: for first processor only
-          handleShadowPRExistsScenario(cache, prQ);
+          handleShadowPRExistsScenario(sender.getCache(), prQ);
       }
       /*
        * Here, enqueueTempEvents need to be invoked when a sender is already running and userPR is
@@ -455,11 +454,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
                 .toLocalizedString(new Object[] {this.sender.getId(), userPR.getFullPath()}));
       }
 
-      InternalCache cache = sender.getCache();
       boolean isAccessor = (userPR.getLocalMaxMemory() == 0);
 
       final String prQName = sender.getId() + QSTRING + convertPathToName(userPR.getFullPath());
-      prQ = (PartitionedRegion) cache.getRegion(prQName);
+      prQ = (PartitionedRegion) sender.getCache().getRegion(prQName);
       if (prQ == null) {
         // TODO:REF:Avoid deprecated apis
 
@@ -506,10 +504,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         }
 
         ParallelGatewaySenderQueueMetaRegion meta =
-            metaRegionFactory.newMetataRegion(cache, prQName, ra, sender);
+            metaRegionFactory.newMetataRegion(sender.getCache(), prQName, ra, sender);
 
         try {
-          prQ = (PartitionedRegion) cache.createVMRegion(prQName, ra,
+          prQ = (PartitionedRegion) sender.getCache().createVMRegion(prQName, ra,
               new InternalRegionArguments().setInternalMetaRegion(meta).setDestroyLockFlag(true)
                   .setInternalRegion(true).setSnapshotInputStream(null).setImageTarget(null));
           // at this point we should be able to assert prQ == meta;
@@ -520,7 +518,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
             return; // return from here if accessor node
 
           // Add the overflow statistics to the mbean
-          addOverflowStatisticsToMBean(cache, prQ);
+          addOverflowStatisticsToMBean(sender.getCache(), prQ);
 
           // Wait for buckets to be recovered.
           prQ.shadowPRWaitForBucketRecovery();
@@ -539,7 +537,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         // in case shadowPR exists already (can be possible when sender is
         // started from stop operation)
         if (this.index == 0) // HItesh:for first parallelGatewaySenderQueue only
-          handleShadowPRExistsScenario(cache, prQ);
+          handleShadowPRExistsScenario(sender.getCache(), prQ);
       }
 
     } finally {
@@ -1014,8 +1012,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
           }
         } else {
           String regionPath = event.getRegionPath();
-          InternalCache cache = this.sender.getCache();
-          Region region = (PartitionedRegion) cache.getRegion(regionPath);
+          Region region = (PartitionedRegion) sender.getCache().getRegion(regionPath);
           if (region != null && !region.isDestroyed()) {
             // TODO: We have to get colocated parent region for this region
             if (region instanceof DistributedRegion) {
@@ -1600,17 +1597,14 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
      */
     private volatile boolean shutdown = false;
 
-    private final InternalCache cache;
-
     private final ParallelGatewaySenderQueue parallelQueue;
 
     /**
      * Constructor : Creates and initializes the thread
      */
-    public BatchRemovalThread(InternalCache c, ParallelGatewaySenderQueue queue) {
+    public BatchRemovalThread(ParallelGatewaySenderQueue queue) {
       super("BatchRemovalThread for GatewaySender_" + queue.sender.getId() + "_" + queue.index);
       this.setDaemon(true);
-      this.cache = c;
       this.parallelQueue = queue;
     }
 
@@ -1618,7 +1612,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       if (shutdown) {
         return true;
       }
-      if (cache.getCancelCriterion().isCancelInProgress()) {
+      if (parallelQueue.sender.getCache().getCancelCriterion().isCancelInProgress()) {
         return true;
       }
       return false;
@@ -1627,7 +1621,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     @Override
     public void run() {
       try {
-        InternalDistributedSystem ids = cache.getInternalDistributedSystem();
+        InternalDistributedSystem ids =
+            parallelQueue.sender.getCache().getInternalDistributedSystem();
         DistributionManager dm = ids.getDistributionManager();
         for (;;) {
           try { // be somewhat tolerant of failures
@@ -1684,7 +1679,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
               buckToDispatchLock.unlock();
             }
             // Get all the data-stores wherever userPRs are present
-            Set<InternalDistributedMember> recipients = getAllRecipients(cache, temp);
+            Set<InternalDistributedMember> recipients =
+                getAllRecipients(parallelQueue.sender.getCache(), temp);
             if (!recipients.isEmpty()) {
               ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(temp);
               pqrm.setRecipients(recipients);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 6cfe7f4..357d992 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -213,7 +213,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     initializeRegion(abstractSender, listener);
     // Increment queue size. Fix for bug 51988.
     this.stats.incQueueSize(this.region.size());
-    this.removalThread = new BatchRemovalThread(abstractSender.getCache());
+    this.removalThread = new BatchRemovalThread();
     this.removalThread.start();
     this.sender = abstractSender;
     if (logger.isDebugEnabled()) {
@@ -1042,22 +1042,19 @@ public class SerialGatewaySenderQueue implements RegionQueue {
      */
     private volatile boolean shutdown = false;
 
-    private final InternalCache cache;
-
     /**
      * Constructor : Creates and initializes the thread
      *
      */
-    public BatchRemovalThread(InternalCache c) {
+    public BatchRemovalThread() {
       this.setDaemon(true);
-      this.cache = c;
     }
 
     private boolean checkCancelled() {
       if (shutdown) {
         return true;
       }
-      if (cache.getCancelCriterion().isCancelInProgress()) {
+      if (sender.getCache().getCancelCriterion().isCancelInProgress()) {
         return true;
       }
       return false;
@@ -1065,7 +1062,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
 
     @Override
     public void run() {
-      InternalDistributedSystem ids = cache.getInternalDistributedSystem();
+      InternalDistributedSystem ids = sender.getCache().getInternalDistributedSystem();
 
       try { // ensure exit message is printed
         // Long waitTime = Long.getLong(QUEUE_REMOVAL_WAIT_TIME, 1000);
@@ -1113,7 +1110,8 @@ public class SerialGatewaySenderQueue implements RegionQueue {
             }
             // release not needed since disallowOffHeapValues called
             EntryEventImpl event = EntryEventImpl.create((LocalRegion) region, Operation.DESTROY,
-                (lastDestroyedKey + 1), null/* newValue */, null, false, cache.getMyId());
+                (lastDestroyedKey + 1), null/* newValue */, null, false,
+                sender.getCache().getMyId());
             event.disallowOffHeapValues();
             event.setTailKey(temp);
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java
index 07c2685..a9f2e46 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java
@@ -54,8 +54,8 @@ public class CacheServerCreation extends AbstractCacheServer {
    * Constructor for retaining bridge server information during auto-reconnect
    *
    */
-  public CacheServerCreation(InternalCache cache, CacheServer other) {
-    super(cache);
+  public CacheServerCreation(InternalCache cache, CacheServer other, boolean attachListener) {
+    super(cache, attachListener);
     setPort(other.getPort());
     setBindAddress(other.getBindAddress());
     setHostnameForClients(other.getHostnameForClients());