You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/09/21 22:34:16 UTC
[geode] 01/01: GEODE-5772: fix the potential cache leaks caused by
reconnect
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch feature/GEODE-5772
in repository https://gitbox.apache.org/repos/asf/geode.git
commit a92c904194f117ea5a14f63bf3c0bb211fd55572
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Fri Sep 21 15:33:21 2018 -0700
GEODE-5772: fix the potential cache leaks caused by reconnect
---
.../apache/geode/distributed/ServerLauncher.java | 4 +++
.../internal/ClusterDistributionManager.java | 5 +++
.../distributed/internal/DistributionManager.java | 2 ++
.../internal/InternalDistributedSystem.java | 10 ++++++
.../internal/LonerDistributionManager.java | 2 ++
.../membership/gms/mgr/GMSMembershipManager.java | 2 +-
.../apache/geode/internal/cache/DiskStoreImpl.java | 3 +-
.../geode/internal/cache/GemFireCacheImpl.java | 1 +
.../apache/geode/internal/cache/TXManagerImpl.java | 4 ++-
.../geode/internal/cache/TombstoneService.java | 23 ++++++++-----
.../internal/cache/wan/AbstractGatewaySender.java | 5 +++
.../wan/parallel/ParallelGatewaySenderQueue.java | 38 ++++++++++------------
.../cache/wan/serial/SerialGatewaySenderQueue.java | 14 ++++----
.../cache/xmlcache/CacheServerCreation.java | 4 +--
14 files changed, 75 insertions(+), 42 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
index 9b7be48..6c2fe5d 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
@@ -347,6 +347,10 @@ public class ServerLauncher extends AbstractLauncher<String> {
return this.cache;
}
+ public void setCache(Cache cache) {
+ this.cache = cache;
+ }
+
/**
* Gets the CacheConfig object used to configure additional GemFire Cache components and features
* (e.g. PDX).
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index 6b503c7..966cdc9 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -2111,6 +2111,10 @@ public class ClusterDistributionManager implements DistributionManager {
this.membershipListeners.remove(l);
}
+ public void clearMembershipListeners() {
+ this.membershipListeners.clear();
+ }
+
/**
* Adds a <code>MembershipListener</code> to this distribution manager.
*/
@@ -2280,6 +2284,7 @@ public class ClusterDistributionManager implements DistributionManager {
this.localAddress));
MembershipLogger.logShutdown(this.localAddress);
closed = true;
+ this.cache = null;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
index 9742822..f9e7e6f 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
@@ -198,6 +198,8 @@ public interface DistributionManager extends ReplySender {
*/
void removeMembershipListener(MembershipListener l);
+ void clearMembershipListeners();
+
/**
* Removes a <code>MembershipListener</code> listening for all members from this distribution
* manager.
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index 9d992a1..5d5fe8d 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -67,6 +67,7 @@ import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.DurableClientAttributes;
+import org.apache.geode.distributed.ServerLauncher;
import org.apache.geode.distributed.internal.locks.GrantorRequestProcessor;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.MembershipManager;
@@ -2693,6 +2694,11 @@ public class InternalDistributedSystem extends DistributedSystem
logger.warn("Exception disconnecting for reconnect", ee);
}
+ if (ServerLauncher.getInstance() != null) {
+ ServerLauncher.getInstance().setCache(null);
+ }
+ getDM().clearMembershipListeners();
+
try {
reconnectLock.wait(timeOut);
} catch (InterruptedException e) {
@@ -2798,6 +2804,10 @@ public class InternalDistributedSystem extends DistributedSystem
}
cache = GemFireCacheImpl.create(this.reconnectDS, config);
+ if (ServerLauncher.getInstance() != null) {
+ ServerLauncher.getInstance().setCache(cache);
+ }
+
createAndStartCacheServers(cacheServerCreation, cache);
if (cache.getCachePerfStats().getReliableRegionsMissing() == 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index 6115f7c..f265def 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -251,6 +251,8 @@ public class LonerDistributionManager implements DistributionManager {
public void removeMembershipListener(MembershipListener l) {}
+ public void clearMembershipListeners() {}
+
public void removeAllMembershipListener(MembershipListener l) {}
public void addAdminConsole(InternalDistributedMember p_id) {}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index cb19969..c015cb4 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -1596,7 +1596,7 @@ public class GMSMembershipManager implements MembershipManager, Manager {
for (final Object o : cache.getCacheServers()) {
CacheServerImpl cs = (CacheServerImpl) o;
if (cs.isDefaultServer()) {
- CacheServerCreation bsc = new CacheServerCreation(cache, cs);
+ CacheServerCreation bsc = new CacheServerCreation(cache, cs, false);
list.add(bsc);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index d174157..dfb65a2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -263,7 +263,7 @@ public class DiskStoreImpl implements DiskStore {
public static volatile HashSet<String> TEST_CHK_FALLOC_DIRS;
public static volatile HashSet<String> TEST_NO_FALLOC_DIRS;
- private final InternalCache cache;
+ private InternalCache cache;
/** The stats for this store */
private final DiskStoreStats stats;
@@ -2368,6 +2368,7 @@ public class DiskStoreImpl implements DiskStore {
}
} finally {
this.closed = true;
+ this.cache = null;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 467feee..7beecdf 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -2202,6 +2202,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
}
} catch (CancelException ignore) {
}
+ ((AbstractGatewaySender) sender).resetCache();
}
destroyGatewaySenderLockService();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index b0ec44d..e2aa0f2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -92,7 +92,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
private final AtomicInteger uniqId;
private final DistributionManager dm;
- private final InternalCache cache;
+ private InternalCache cache;
// The DistributionMemberID used to construct TXId's
private final InternalDistributedMember distributionMgrId;
@@ -649,6 +649,8 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
if (isClosed()) {
return;
}
+ this.cache = null;
+ this.currentInstance = null;
TXStateProxy[] proxies = null;
synchronized (this.hostedTXStates) {
// After this, newly added TXStateProxy would not operate on the TXState.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
index 14c1d53..fa90fcf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
@@ -570,6 +570,10 @@ public class TombstoneService {
try {
// this thread should not reference other sweeper state, which is not synchronized
for (Map.Entry<DistributedRegion, Set<Object>> mapEntry : reapedKeys.entrySet()) {
+ if (isStopped) {
+ logger.info("expireBatch is stopped due to close");
+ break;
+ }
DistributedRegion r = mapEntry.getKey();
Set<Object> rKeysReaped = mapEntry.getValue();
r.distributeTombstoneGC(rKeysReaped);
@@ -775,7 +779,7 @@ public class TombstoneService {
protected final CachePerfStats stats;
private final CancelCriterion cancelCriterion;
- private volatile boolean isStopped;
+ protected volatile boolean isStopped;
TombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion,
long expiryTime, String threadName) {
@@ -923,13 +927,16 @@ public class TombstoneService {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) {
logger.trace(LogMarker.TOMBSTONE_VERBOSE, "sleeping for {}", sleepTime);
}
- synchronized (this) {
- if (isStopped) {
- return;
- }
- try {
- this.wait(sleepTime);
- } catch (InterruptedException ignore) {
+ long then = getNow();
+ while ((getNow() - then) <= sleepTime) {
+ synchronized (this) {
+ if (isStopped) {
+ return;
+ }
+ try {
+ this.wait(500);
+ } catch (InterruptedException ignore) {
+ }
}
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index b099eb1..2eb7abe 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -278,6 +278,11 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
return senderAdvisor;
}
+ public void resetCache() {
+ logger.info("Cache reference is reset for GatewaySender " + this.getId());
+ this.cache = null;
+ }
+
@Override
public GatewaySenderStats getStatistics() {
return statistics;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 057697a..d3cbd43 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -288,7 +288,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
// still, it is safer approach to synchronize it
synchronized (ParallelGatewaySenderQueue.class) {
if (removalThread == null) {
- removalThread = new BatchRemovalThread(this.sender.getCache(), this);
+ removalThread = new BatchRemovalThread(this);
removalThread.start();
}
}
@@ -310,9 +310,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
if (this.userRegionNameToshadowPRMap.containsKey(regionName))
return;
- InternalCache cache = sender.getCache();
final String prQName = getQueueName(sender.getId(), userRegion.getFullPath());
- prQ = (PartitionedRegion) cache.getRegion(prQName);
+ prQ = (PartitionedRegion) sender.getCache().getRegion(prQName);
if (prQ == null) {
// TODO:REF:Avoid deprecated apis
AttributesFactory fact = new AttributesFactory();
@@ -356,10 +355,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
ParallelGatewaySenderQueueMetaRegion meta =
- new ParallelGatewaySenderQueueMetaRegion(prQName, ra, null, cache, sender);
+ new ParallelGatewaySenderQueueMetaRegion(prQName, ra, null, sender.getCache(), sender);
try {
- prQ = (PartitionedRegion) cache.createVMRegion(prQName, ra,
+ prQ = (PartitionedRegion) sender.getCache().createVMRegion(prQName, ra,
new InternalRegionArguments().setInternalMetaRegion(meta).setDestroyLockFlag(true)
.setSnapshotInputStream(null).setImageTarget(null));
@@ -399,7 +398,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
// in case shadowPR exists already (can be possible when sender is
// started from stop operation)
if (this.index == 0) // HItesh: for first processor only
- handleShadowPRExistsScenario(cache, prQ);
+ handleShadowPRExistsScenario(sender.getCache(), prQ);
}
/*
* Here, enqueueTempEvents need to be invoked when a sender is already running and userPR is
@@ -455,11 +454,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
.toLocalizedString(new Object[] {this.sender.getId(), userPR.getFullPath()}));
}
- InternalCache cache = sender.getCache();
boolean isAccessor = (userPR.getLocalMaxMemory() == 0);
final String prQName = sender.getId() + QSTRING + convertPathToName(userPR.getFullPath());
- prQ = (PartitionedRegion) cache.getRegion(prQName);
+ prQ = (PartitionedRegion) sender.getCache().getRegion(prQName);
if (prQ == null) {
// TODO:REF:Avoid deprecated apis
@@ -506,10 +504,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
ParallelGatewaySenderQueueMetaRegion meta =
- metaRegionFactory.newMetataRegion(cache, prQName, ra, sender);
+ metaRegionFactory.newMetataRegion(sender.getCache(), prQName, ra, sender);
try {
- prQ = (PartitionedRegion) cache.createVMRegion(prQName, ra,
+ prQ = (PartitionedRegion) sender.getCache().createVMRegion(prQName, ra,
new InternalRegionArguments().setInternalMetaRegion(meta).setDestroyLockFlag(true)
.setInternalRegion(true).setSnapshotInputStream(null).setImageTarget(null));
// at this point we should be able to assert prQ == meta;
@@ -520,7 +518,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
return; // return from here if accessor node
// Add the overflow statistics to the mbean
- addOverflowStatisticsToMBean(cache, prQ);
+ addOverflowStatisticsToMBean(sender.getCache(), prQ);
// Wait for buckets to be recovered.
prQ.shadowPRWaitForBucketRecovery();
@@ -539,7 +537,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
// in case shadowPR exists already (can be possible when sender is
// started from stop operation)
if (this.index == 0) // HItesh:for first parallelGatewaySenderQueue only
- handleShadowPRExistsScenario(cache, prQ);
+ handleShadowPRExistsScenario(sender.getCache(), prQ);
}
} finally {
@@ -1014,8 +1012,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
} else {
String regionPath = event.getRegionPath();
- InternalCache cache = this.sender.getCache();
- Region region = (PartitionedRegion) cache.getRegion(regionPath);
+ Region region = (PartitionedRegion) sender.getCache().getRegion(regionPath);
if (region != null && !region.isDestroyed()) {
// TODO: We have to get colocated parent region for this region
if (region instanceof DistributedRegion) {
@@ -1600,17 +1597,14 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
*/
private volatile boolean shutdown = false;
- private final InternalCache cache;
-
private final ParallelGatewaySenderQueue parallelQueue;
/**
* Constructor : Creates and initializes the thread
*/
- public BatchRemovalThread(InternalCache c, ParallelGatewaySenderQueue queue) {
+ public BatchRemovalThread(ParallelGatewaySenderQueue queue) {
super("BatchRemovalThread for GatewaySender_" + queue.sender.getId() + "_" + queue.index);
this.setDaemon(true);
- this.cache = c;
this.parallelQueue = queue;
}
@@ -1618,7 +1612,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
if (shutdown) {
return true;
}
- if (cache.getCancelCriterion().isCancelInProgress()) {
+ if (parallelQueue.sender.getCache().getCancelCriterion().isCancelInProgress()) {
return true;
}
return false;
@@ -1627,7 +1621,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
@Override
public void run() {
try {
- InternalDistributedSystem ids = cache.getInternalDistributedSystem();
+ InternalDistributedSystem ids =
+ parallelQueue.sender.getCache().getInternalDistributedSystem();
DistributionManager dm = ids.getDistributionManager();
for (;;) {
try { // be somewhat tolerant of failures
@@ -1684,7 +1679,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
buckToDispatchLock.unlock();
}
// Get all the data-stores wherever userPRs are present
- Set<InternalDistributedMember> recipients = getAllRecipients(cache, temp);
+ Set<InternalDistributedMember> recipients =
+ getAllRecipients(parallelQueue.sender.getCache(), temp);
if (!recipients.isEmpty()) {
ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(temp);
pqrm.setRecipients(recipients);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 6cfe7f4..357d992 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -213,7 +213,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
initializeRegion(abstractSender, listener);
// Increment queue size. Fix for bug 51988.
this.stats.incQueueSize(this.region.size());
- this.removalThread = new BatchRemovalThread(abstractSender.getCache());
+ this.removalThread = new BatchRemovalThread();
this.removalThread.start();
this.sender = abstractSender;
if (logger.isDebugEnabled()) {
@@ -1042,22 +1042,19 @@ public class SerialGatewaySenderQueue implements RegionQueue {
*/
private volatile boolean shutdown = false;
- private final InternalCache cache;
-
/**
* Constructor : Creates and initializes the thread
*
*/
- public BatchRemovalThread(InternalCache c) {
+ public BatchRemovalThread() {
this.setDaemon(true);
- this.cache = c;
}
private boolean checkCancelled() {
if (shutdown) {
return true;
}
- if (cache.getCancelCriterion().isCancelInProgress()) {
+ if (sender.getCache().getCancelCriterion().isCancelInProgress()) {
return true;
}
return false;
@@ -1065,7 +1062,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
@Override
public void run() {
- InternalDistributedSystem ids = cache.getInternalDistributedSystem();
+ InternalDistributedSystem ids = sender.getCache().getInternalDistributedSystem();
try { // ensure exit message is printed
// Long waitTime = Long.getLong(QUEUE_REMOVAL_WAIT_TIME, 1000);
@@ -1113,7 +1110,8 @@ public class SerialGatewaySenderQueue implements RegionQueue {
}
// release not needed since disallowOffHeapValues called
EntryEventImpl event = EntryEventImpl.create((LocalRegion) region, Operation.DESTROY,
- (lastDestroyedKey + 1), null/* newValue */, null, false, cache.getMyId());
+ (lastDestroyedKey + 1), null/* newValue */, null, false,
+ sender.getCache().getMyId());
event.disallowOffHeapValues();
event.setTailKey(temp);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java
index 07c2685..a9f2e46 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java
@@ -54,8 +54,8 @@ public class CacheServerCreation extends AbstractCacheServer {
* Constructor for retaining bridge server information during auto-reconnect
*
*/
- public CacheServerCreation(InternalCache cache, CacheServer other) {
- super(cache);
+ public CacheServerCreation(InternalCache cache, CacheServer other, boolean attachListener) {
+ super(cache, attachListener);
setPort(other.getPort());
setBindAddress(other.getBindAddress());
setHostnameForClients(other.getHostnameForClients());