You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2016/07/29 16:29:48 UTC
incubator-geode git commit: GEODE-1678: Fix offheap memory leak when
gateway sender events failed to be enqueued to GatewaySenderQueue.
Repository: incubator-geode
Updated Branches:
refs/heads/develop 547fc4886 -> bf6cf4147
GEODE-1678: Fix offheap memory leak when gateway sender events failed to be enqueued to GatewaySenderQueue.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/bf6cf414
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/bf6cf414
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/bf6cf414
Branch: refs/heads/develop
Commit: bf6cf41474c14ea702d8d12de26c0012db4bc13f
Parents: 547fc48
Author: eshu <es...@pivotal.io>
Authored: Fri Jul 29 09:24:45 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Fri Jul 29 09:24:45 2016 -0700
----------------------------------------------------------------------
.../gemfire/internal/cache/RegionQueue.java | 3 ++-
.../internal/cache/ha/HARegionQueue.java | 16 +++++++++---
.../ConcurrentParallelGatewaySenderQueue.java | 2 +-
.../ParallelGatewaySenderEventProcessor.java | 7 +++---
.../parallel/ParallelGatewaySenderQueue.java | 25 +++++++++++--------
.../SerialGatewaySenderEventProcessor.java | 26 +++++++++++++++-----
.../wan/serial/SerialGatewaySenderQueue.java | 4 ++-
.../cache/ha/TestBlockingHARegionQueue.java | 7 +++---
8 files changed, 62 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionQueue.java
index 5108861..44e03bd 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionQueue.java
@@ -45,8 +45,9 @@ public interface RegionQueue
*
* @throws InterruptedException
* @throws CacheException
+ * @return boolean whether object was successfully put onto the queue
*/
- public void put(Object object) throws InterruptedException, CacheException;
+ public boolean put(Object object) throws InterruptedException, CacheException;
/**
* Returns the underlying region that backs this queue.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
index 85b50a1..a9d5e6b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
@@ -573,7 +573,7 @@ public class HARegionQueue implements RegionQueue
}
/**
- * Adds an object at the queue's tail. The implemetation supports concurrent
+ * Adds an object at the queue's tail. The implementation supports concurrent
* put operations in a performant manner. This is done in following steps:
* <br>
* 1)Check if the Event being added has a sequence ID less than the Last
@@ -593,9 +593,9 @@ public class HARegionQueue implements RegionQueue
* object to put onto the queue
* @throws InterruptedException
* @throws CacheException
- *
+ * @return boolean
*/
- public void put(Object object) throws CacheException, InterruptedException {
+ public boolean put(Object object) throws CacheException, InterruptedException {
this.giiLock.readLock().lock(); // fix for bug #41681 - durable client misses event
try {
if (this.giiCount > 0) {
@@ -616,6 +616,16 @@ public class HARegionQueue implements RegionQueue
} finally {
this.giiLock.readLock().unlock();
}
+
+ //basicPut() invokes dace.putObject() to put onto HARegionQueue
+ //However, dace.putObject could return true even though
+ //the event is not put onto the HARegionQueue due to eliding events etc.
+ //So it is not reliable to be used whether offheap ref ownership is passed over to
+ //the queue (if and when HARegionQueue uses offheap). The probable
+ //solution could be that to let dace.putObject() to increase offheap REF count
+ //when it puts the event onto the region queue. Also always release (dec)
+ //the offheap REF count from the caller.
+ return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index ccdf42a..e25f472 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -65,7 +65,7 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
}
@Override
- public void put(Object object) throws InterruptedException, CacheException {
+ public boolean put(Object object) throws InterruptedException, CacheException {
throw new UnsupportedOperationException("CPGAQ method(put) is not supported");
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index 11502af..1810427 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -134,6 +134,7 @@ public class ParallelGatewaySenderEventProcessor extends
// .isPdxTypesRegion())) {
// bucketId = PartitionedRegionHelper.getHashKey(event);
// }
+ boolean queuedEvent = false;
try {
EventID eventID = ((EntryEventImpl)event).getEventId();
@@ -142,11 +143,11 @@ public class ParallelGatewaySenderEventProcessor extends
gatewayQueueEvent = new GatewaySenderEventImpl(operation, event,
substituteValue, true, eventID.getBucketID());
+
if (getSender().beforeEnqueue(gatewayQueueEvent)) {
long start = getSender().getStatistics().startTime();
try {
- this.queue.put(gatewayQueueEvent);
- gatewayQueueEvent = null;
+ queuedEvent = this.queue.put(gatewayQueueEvent);
}
catch (InterruptedException e) {
e.printStackTrace();
@@ -161,7 +162,7 @@ public class ParallelGatewaySenderEventProcessor extends
}
}
finally {
- if (gatewayQueueEvent != null) {
+ if (!queuedEvent) {
// it was not queued for some reason
gatewayQueueEvent.release();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index d21d6dc..1b5c11f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -676,9 +676,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
}
- public void put(Object object) throws InterruptedException, CacheException {
+ public boolean put(Object object) throws InterruptedException, CacheException {
final boolean isDebugEnabled = logger.isDebugEnabled();
-
+ boolean putDone = false;
//Suranjan : Can this region ever be null? Should we work with regionName and not with region instance.
// It can't be as put is happeing on the region and its still under process
GatewaySenderEventImpl value = (GatewaySenderEventImpl)object;
@@ -707,8 +707,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
logger.debug("The userRegionNameToshadowPRMap is {}", userRegionNameToshadowPRMap);
}
logger.warn(LocalizedMessage.create(LocalizedStrings.NOT_QUEUING_AS_USERPR_IS_NOT_YET_CONFIGURED, value));
- value.release();
- return;
+ //does not put into queue
+ return false;
}
PartitionedRegion prQ = this.userRegionNameToshadowPRMap.get(regionPath);
@@ -724,8 +724,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
if (isDebugEnabled) {
logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : Value : {}", key, value);
}
- value.release();
- return;
+ //does not put into queue
+ return false;
}
}else{
key = value.getEventId();
@@ -759,6 +759,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
brq.getInitializationLock().readLock().lock();
try {
putIntoBucketRegionQueue(brq, key, value);
+ putDone = true;
} finally {
brq.getInitializationLock().readLock().unlock();
}
@@ -767,7 +768,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
// above search then it means that bucket is not intended for this
// node. So lets not add this event in temp queue event as we are
// doing it for PRevent
- value.release();
+ // does not put onto the queue
} else {
// We have to handle the case where brq is null because the
// colocation
@@ -781,9 +782,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : Value : {} as shadowPR bucket is destroyed.",
key, value);
}
- value.release();
+ // does not put onto the queue
} else {
- /**
+ /*
* This is to prevent data loss, in the scenario when bucket is
* not available in the cache but we know that it will be created.
*/
@@ -803,6 +804,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
brq.getInitializationLock().readLock().lock();
try {
putIntoBucketRegionQueue(brq, key, value);
+ putDone = true;
} finally {
brq.getInitializationLock().readLock().unlock();
}
@@ -813,6 +815,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
// this.bucketToTempQueueMap.put(bucketId, tempQueue);
// }
tempQueue.add(value);
+ putDone = true;
// For debugging purpose.
if (isDebugEnabled) {
logger.debug("The value {} is enqueued to the tempQueue for the BucketRegionQueue.", value);
@@ -840,17 +843,19 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
if (!thisbucketDestroyed) {
putIntoBucketRegionQueue(brq, key, value);
+ putDone = true;
} else {
if (isDebugEnabled) {
logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : Value : {} as shadowPR bucket is destroyed.",
key, value);
}
- value.release();
+ // does not put onto the queue
}
}
} finally {
notifyEventProcessorIfRequired();
}
+ return putDone;
}
public void notifyEventProcessorIfRequired() {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index ba839f4..2b1eb3d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -330,8 +330,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
it.remove();
boolean queuedEvent = false;
try {
- queuePrimaryEvent(gatewayEvent);
- queuedEvent = true;
+ queuedEvent = queuePrimaryEvent(gatewayEvent);
} catch (IOException ex) {
if (!stopped()) {
logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayImpl_EVENT_DROPPED_DURING_FAILOVER_0, gatewayEvent), ex);
@@ -431,11 +430,24 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
}
// If it is, create and enqueue an initialized GatewayEventImpl
senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue); // OFFHEAP ok
- queuePrimaryEvent(senderEvent);
+
+ boolean queuedEvent =false;
+ try {
+ queuedEvent = queuePrimaryEvent(senderEvent);
+ } finally {
+ //When queuePrimaryEvent() failed with some exception, it could
+ //occur after the GatewaySenderEventImpl is put onto the queue.
+ //In that case, the GatewaySenderEventImpl could be released here,
+ //and IllegalStateException could be thrown if getDeserializedValue is called
+ //when the event is accessed through the region queue.
+ if (!queuedEvent) {
+ GatewaySenderEventImpl.release(senderEvent);
+ }
+ }
}
}
- private void queuePrimaryEvent(GatewaySenderEventImpl gatewayEvent)
+ private boolean queuePrimaryEvent(GatewaySenderEventImpl gatewayEvent)
throws IOException, CacheException {
// Queue the event
GatewaySenderStats statistics = this.sender.getStatistics();
@@ -447,11 +459,12 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
logger.debug("Event {} is not added to queue.", gatewayEvent);
}
statistics.incEventsFiltered();
- return;
+ return false;
}
long start = statistics.startTime();
+ boolean putDone = false;
try {
- this.queue.put(gatewayEvent);
+ putDone = this.queue.put(gatewayEvent);
} catch (InterruptedException e) {
// Asif Not expected from SingleWriteSingleReadRegionQueue as it does not
// throw
@@ -480,6 +493,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
new Object[] { sender.getId(), Integer.valueOf(AbstractGatewaySender.QUEUE_SIZE_THRESHOLD) }));
this.eventQueueSizeWarning = true;
}
+ return putDone;
}
protected void waitForFailoverCompletion() {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index efa7870..1ae55ac 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -232,7 +232,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
getRegion().localDestroyRegion();
}
- public synchronized void put(Object event) throws CacheException {
+ public synchronized boolean put(Object event) throws CacheException {
GatewaySenderEventImpl eventImpl = (GatewaySenderEventImpl)event;
final Region r = eventImpl.getRegion();
final boolean isPDXRegion = (r instanceof DistributedRegion && r.getName()
@@ -249,9 +249,11 @@ public class SerialGatewaySenderQueue implements RegionQueue {
// else {
// synchronized (this) {
putAndGetKey(event);
+ return true;
//}
//}
}
+ return false;
}
private long putAndGetKey(Object object) throws CacheException {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bf6cf414/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/TestBlockingHARegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/TestBlockingHARegionQueue.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/TestBlockingHARegionQueue.java
index d57290a..ced87bd 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/TestBlockingHARegionQueue.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/TestBlockingHARegionQueue.java
@@ -56,12 +56,12 @@ public class TestBlockingHARegionQueue extends HARegionQueue.TestOnlyHARegionQue
* @throws CacheException
* @throws InterruptedException
*
- * @throws InterruptedException
+ * @return boolean whether object was successfully put onto the queue
*/
- public void put(Object object) throws CacheException, InterruptedException
+ public boolean put(Object object) throws CacheException, InterruptedException
{
- super.put(object);
+ boolean putDone = super.put(object);
if (takeFirst) {
this.take();
@@ -71,6 +71,7 @@ public class TestBlockingHARegionQueue extends HARegionQueue.TestOnlyHARegionQue
synchronized (forWaiting) {
forWaiting.notifyAll();
}
+ return putDone;
}
/**