You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/04/21 19:54:59 UTC
[2/3] incubator-geode git commit: GEODE-1238: ensure EntryEventImpls
are released
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
index 049f7df..c75286e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@ -252,6 +252,7 @@ import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.sequencelog.RegionLogger;
import com.gemstone.gemfire.internal.util.TransformUtils;
@@ -2341,7 +2342,7 @@ public class PartitionedRegion extends LocalRegion implements
if (isDebugEnabled) {
logger.debug("PR.postPutAll encountered exception at sendMsgByBucket, ",ex);
}
- EntryEventImpl firstEvent = prMsg.getFirstEvent(this);
+ @Released EntryEventImpl firstEvent = prMsg.getFirstEvent(this);
try {
partialKeys.saveFailedKey(firstEvent.getKey(), ex);
} finally {
@@ -2442,7 +2443,7 @@ public class PartitionedRegion extends LocalRegion implements
if (isDebugEnabled) {
logger.debug("PR.postRemoveAll encountered exception at sendMsgByBucket, ",ex);
}
- EntryEventImpl firstEvent = prMsg.getFirstEvent(this);
+ @Released EntryEventImpl firstEvent = prMsg.getFirstEvent(this);
try {
partialKeys.saveFailedKey(firstEvent.getKey(), ex);
} finally {
@@ -2489,7 +2490,7 @@ public class PartitionedRegion extends LocalRegion implements
final boolean isDebugEnabled = logger.isDebugEnabled();
// retry the put remotely until it finds the right node managing the bucket
- EntryEventImpl event = prMsg.getFirstEvent(this);
+ @Released EntryEventImpl event = prMsg.getFirstEvent(this);
try {
RetryTimeKeeper retryTime = null;
InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId.intValue(), null);
@@ -2626,7 +2627,8 @@ public class PartitionedRegion extends LocalRegion implements
private VersionedObjectList sendMsgByBucket(final Integer bucketId, RemoveAllPRMessage prMsg)
{
// retry the put remotely until it finds the right node managing the bucket
- EntryEventImpl event = prMsg.getFirstEvent(this);
+ @Released EntryEventImpl event = prMsg.getFirstEvent(this);
+ try {
RetryTimeKeeper retryTime = null;
InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId.intValue(), null);
if (logger.isDebugEnabled()) {
@@ -2751,6 +2753,9 @@ public class PartitionedRegion extends LocalRegion implements
this.prStats.incRemoveAllRetries();
} // for
// NOTREACHED
+ } finally {
+ event.release();
+ }
}
public VersionedObjectList tryToSendOnePutAllMessage(PutAllPRMessage prMsg,InternalDistributedMember currentTarget) throws DataLocationException {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
index 7b4504d..f0a6543 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
@@ -44,6 +44,7 @@ import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
/**
* Internal implementation of {@link RegionMap}for regions whose DataPolicy is
@@ -279,7 +280,7 @@ final class ProxyRegionMap implements RegionMap {
if (AbstractRegionMap.shouldCreateCBEvent(this.owner,
!inTokenMode)) {
// fix for bug 39526
- EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner, op,
+ @Released EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner, op,
key, null, txId, txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext, txEntryState, versionTag, tailKey);
boolean cbEventInPending = false;
try {
@@ -310,7 +311,7 @@ final class ProxyRegionMap implements RegionMap {
this.owner.isInitialized())) {
// fix for bug 39526
boolean cbEventInPending = false;
- EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner,
+ @Released EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
key, newValue, txId, txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext, txEntryState, versionTag, tailKey);
try {
@@ -344,7 +345,7 @@ final class ProxyRegionMap implements RegionMap {
this.owner.isInitialized())) {
// fix for bug 39526
boolean cbEventInPending = false;
- EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner, putOp, key,
+ @Released EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner, putOp, key,
newValue, txId, txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext, txEntryState, versionTag, tailKey);
try {
AbstractRegionMap.switchEventOwnerAndOriginRemote(e, txEntryState == null);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/QueuedOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/QueuedOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/QueuedOperation.java
index fa7aea1..2d5a698 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/QueuedOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/QueuedOperation.java
@@ -18,6 +18,7 @@ package com.gemstone.gemfire.internal.cache;
import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.*;
import com.gemstone.gemfire.cache.*;
import com.gemstone.gemfire.distributed.DistributedMember;
@@ -80,7 +81,7 @@ public class QueuedOperation
else {
// it is an entry operation
//TODO :EventID should be passed from the sender & should be reused here
- EntryEventImpl ee = EntryEventImpl.create(
+ @Released EntryEventImpl ee = EntryEventImpl.create(
lr, this.op, this.key, null,
this.cbArg, true, src);
try {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteDestroyMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteDestroyMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteDestroyMessage.java
index 4a8c101..750953a 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteDestroyMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteDestroyMessage.java
@@ -57,6 +57,7 @@ import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE;
import static com.gemstone.gemfire.internal.cache.DistributedCacheOperation.VALUE_IS_BYTES;
@@ -359,7 +360,7 @@ public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply
if (r.keyRequiresRegionContext()) {
((KeyWithRegionContext)this.key).setRegionContext(r);
}
- EntryEventImpl event = null;
+ @Released EntryEventImpl event = null;
try {
if (this.bridgeContext != null) {
event = EntryEventImpl.create(r, getOperation(), getKey(), null/*newValue*/,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteInvalidateMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteInvalidateMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteInvalidateMessage.java
index ee01c9b..c4ed1cc 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteInvalidateMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteInvalidateMessage.java
@@ -51,6 +51,8 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+
import static com.gemstone.gemfire.internal.cache.DistributedCacheOperation.VALUE_IS_BYTES;
import static com.gemstone.gemfire.internal.cache.DistributedCacheOperation.VALUE_IS_SERIALIZED_OBJECT;
@@ -189,7 +191,7 @@ public final class RemoteInvalidateMessage extends RemoteDestroyMessage {
if (r.keyRequiresRegionContext()) {
((KeyWithRegionContext)key).setRegionContext(r);
}
- final EntryEventImpl event = EntryEventImpl.create(
+ @Released final EntryEventImpl event = EntryEventImpl.create(
r,
getOperation(),
key,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutAllMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutAllMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutAllMessage.java
index c706e4f..ff09af9 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutAllMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutAllMessage.java
@@ -59,6 +59,7 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
/**
* A Replicate Region putAll message. Meant to be sent only to
@@ -356,7 +357,7 @@ public final class RemotePutAllMessage extends RemoteOperationMessageWithDirectR
final DistributedRegion dr = (DistributedRegion)r;
// create a base event and a DPAO for PutAllMessage distributed btw redundant buckets
- EntryEventImpl baseEvent = EntryEventImpl.create(
+ @Released EntryEventImpl baseEvent = EntryEventImpl.create(
r, Operation.PUTALL_CREATE,
null, null, this.callbackArg, false, eventSender, !skipCallbacks);
try {
@@ -383,7 +384,7 @@ public final class RemotePutAllMessage extends RemoteOperationMessageWithDirectR
// final boolean requiresRegionContext = dr.keyRequiresRegionContext();
InternalDistributedMember myId = r.getDistributionManager().getDistributionManagerId();
for (int i = 0; i < putAllDataCount; ++i) {
- EntryEventImpl ev = PutAllPRMessage.getEventFromEntry(r, myId, eventSender, i, putAllData, false, bridgeContext, posDup, !skipCallbacks, isPutDML);
+ @Released EntryEventImpl ev = PutAllPRMessage.getEventFromEntry(r, myId, eventSender, i, putAllData, false, bridgeContext, posDup, !skipCallbacks, isPutDML);
try {
ev.setPutAllOperation(dpao);
if (logger.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutMessage.java
index 9f51b39..3fccc9a 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutMessage.java
@@ -57,6 +57,7 @@ import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.util.BlobHelper;
import com.gemstone.gemfire.internal.util.Breadcrumbs;
@@ -683,7 +684,7 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
if (r.keyRequiresRegionContext()) {
((KeyWithRegionContext)this.key).setRegionContext(r);
}
- this.event = EntryEventImpl.create(
+ @Released EntryEventImpl eei = EntryEventImpl.create(
r,
getOperation(),
getKey(),
@@ -693,6 +694,7 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
eventSender,
true/*generateCallbacks*/,
false/*initializeId*/);
+ this.event = eei;
try {
if (this.versionTag != null) {
this.versionTag.replaceNullIDs(getSender());
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteRemoveAllMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteRemoveAllMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteRemoveAllMessage.java
index 7f3a138..01e8d9e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteRemoveAllMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteRemoveAllMessage.java
@@ -59,6 +59,7 @@ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
/**
* A Replicate Region removeAll message. Meant to be sent only to
@@ -340,7 +341,7 @@ public final class RemoteRemoveAllMessage extends RemoteOperationMessageWithDire
final DistributedRegion dr = (DistributedRegion)r;
// create a base event and a op for RemoveAllMessage distributed btw redundant buckets
- EntryEventImpl baseEvent = EntryEventImpl.create(
+ @Released EntryEventImpl baseEvent = EntryEventImpl.create(
r, Operation.REMOVEALL_DESTROY,
null, null, this.callbackArg, false, eventSender, true);
try {
@@ -365,7 +366,7 @@ public final class RemoteRemoveAllMessage extends RemoteOperationMessageWithDire
public void run() {
InternalDistributedMember myId = r.getDistributionManager().getDistributionManagerId();
for (int i = 0; i < removeAllDataCount; ++i) {
- EntryEventImpl ev = RemoveAllPRMessage.getEventFromEntry(r, myId, eventSender, i, removeAllData, false, bridgeContext, posDup, false);
+ @Released EntryEventImpl ev = RemoveAllPRMessage.getEventFromEntry(r, myId, eventSender, i, removeAllData, false, bridgeContext, posDup, false);
try {
ev.setRemoveAllOperation(op);
if (logger.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
index d9729a7..6642ec6 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
@@ -222,8 +222,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
if (scope == Scope.LOCAL && (region.getPartitionAttributes() == null)) {
return false;
}
- @Released
- CacheEvent listenerEvent = getEventForListener(event);
+ @Released CacheEvent listenerEvent = getEventForListener(event);
try {
if (action == BEFOREUPDATE && listenerEvent.getOperation().isCreate()) {
action = BEFORECREATE;
@@ -870,8 +869,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
return false;
}
}
- @Released
- CacheEvent event = getEventForListener(pevent);
+ @Released CacheEvent event = getEventForListener(pevent);
int action = paction;
if (event.getOperation().isCreate() && action == BEFOREUPDATE) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
index d1644c7..4908e09 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
@@ -75,6 +75,7 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
/** TXCommitMessage is the message that contains all the information
* that needs to be distributed, on commit, to other cache members.
@@ -1307,6 +1308,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
/*
* This happens when we don't have the bucket and are getting adjunct notification
*/
+ // No need to release because it is added to pendingCallbacks and they will be released later
EntryEventImpl eei = AbstractRegionMap.createCBEvent(this.r, entryOp.op, entryOp.key, entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,entryOp.filterRoutingInfo,this.msg.bridgeContext, null, entryOp.versionTag, entryOp.tailKey);
if(entryOp.filterRoutingInfo!=null) {
eei.setLocalFilterInfo(entryOp.filterRoutingInfo.getFilterInfo(this.r.getCache().getMyId()));
@@ -1407,7 +1409,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
/*
* This happens when we don't have the bucket and are getting adjunct notification
*/
- EntryEventImpl eei = AbstractRegionMap.createCBEvent(this.r, entryOp.op, entryOp.key, entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,entryOp.filterRoutingInfo,this.msg.bridgeContext, null, entryOp.versionTag, entryOp.tailKey);
+ @Released EntryEventImpl eei = AbstractRegionMap.createCBEvent(this.r, entryOp.op, entryOp.key, entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,entryOp.filterRoutingInfo,this.msg.bridgeContext, null, entryOp.versionTag, entryOp.tailKey);
try {
if(entryOp.filterRoutingInfo!=null) {
eei.setLocalFilterInfo(entryOp.filterRoutingInfo.getFilterInfo(this.r.getCache().getMyId()));
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
index 1964c28..a4c6004 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
@@ -948,16 +948,15 @@ public class TXEntryState implements Releasable
// + " isDis=" + isLocalEventDistributed());
// System.out.flush();
// }
+ @Retained
EntryEvent getEvent(LocalRegion r, Object key, TXState txs)
{
// dumpOp();
- //TODO:ASIF : Shopuld we generate EventID ? At this point not generating
LocalRegion eventRegion = r;
if (r.isUsedForPartitionedRegionBucket()) {
eventRegion = r.getPartitionedRegion();
}
- EntryEventImpl result = new TxEntryEventImpl(eventRegion, key);
- // OFFHEAP: freeOffHeapResources on this event is called from TXEvent.freeOffHeapResources.
+ @Retained EntryEventImpl result = new TxEntryEventImpl(eventRegion, key);
boolean returnedResult = false;
try {
if (this.destroy == DESTROY_NONE || isOpDestroy()) {
@@ -2065,6 +2064,7 @@ public class TXEntryState implements Releasable
/**
* Creates a local tx entry event
*/
+ @Retained
TxEntryEventImpl(LocalRegion r, Object key) {
//TODO:ASIF :Check if the eventID should be created. Currently not
// creating it
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRegionState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRegionState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRegionState.java
index c948f55..116edf7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRegionState.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRegionState.java
@@ -477,6 +477,7 @@ public class TXRegionState {
Object eKey = me.getKey();
TXEntryState txes = (TXEntryState)me.getValue();
if (txes.isDirty() && txes.isOpAnyEvent(r)) {
+ // OFFHEAP: these events are released when TXEvent.release is called
events.add(txes.getEvent(r, eKey, txs));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
index c0493ac..a19ce92 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
@@ -28,6 +28,8 @@ import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.TransactionEvent;
import com.gemstone.gemfire.cache.TransactionId;
import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
/**
* <p>
@@ -44,6 +46,8 @@ public class TXRmtEvent implements TransactionEvent
private Cache cache;
+ // This list of EntryEventImpls are released by calling freeOffHeapResources
+ @Released
private List events;
TXRmtEvent(TransactionId txId, Cache cache) {
@@ -204,6 +208,7 @@ public class TXRmtEvent implements TransactionEvent
return (events == null) || events.isEmpty();
}
+ @Retained
private EntryEventImpl createEvent(LocalRegion r, Operation op,
RegionEntry re, Object key, Object newValue,Object aCallbackArgument)
{
@@ -214,7 +219,7 @@ public class TXRmtEvent implements TransactionEvent
if (r.isUsedForPartitionedRegionBucket()) {
eventRegion = r.getPartitionedRegion();
}
- EntryEventImpl event = EntryEventImpl.create(
+ @Retained EntryEventImpl event = EntryEventImpl.create(
eventRegion, op, key, newValue,
aCallbackArgument, // callbackArg
true, // originRemote
@@ -264,7 +269,5 @@ public class TXRmtEvent implements TransactionEvent
e.release();
}
}
- // TODO Auto-generated method stub
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
index 3bec397..a67d3cc 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
@@ -62,6 +62,7 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
import com.gemstone.gemfire.internal.cache.tx.TransactionalOperation.ServerRegionOperation;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
/** TXState is the entity that tracks the transaction state on a per
@@ -487,7 +488,7 @@ public class TXState implements TXStateInterface {
/*
* The event must contain the bucket region
*/
- EntryEventImpl ev = (EntryEventImpl)o.es.getEvent(o.r, o.key, o.es.getTXRegionState().getTXState());
+ @Released EntryEventImpl ev = (EntryEventImpl)o.es.getEvent(o.r, o.key, o.es.getTXRegionState().getTXState());
try {
/*
* The routing information is derived from the PR advisor, not the bucket advisor.
@@ -1755,7 +1756,7 @@ public class TXState implements TXStateInterface {
// final boolean requiresRegionContext = theRegion.keyRequiresRegionContext();
InternalDistributedMember myId = theRegion.getDistributionManager().getDistributionManagerId();
for (int i = 0; i < putallOp.putAllDataSize; ++i) {
- EntryEventImpl ev = PutAllPRMessage.getEventFromEntry(theRegion, myId,myId, i, putallOp.putAllData, false, putallOp.getBaseEvent().getContext(), false, !putallOp.getBaseEvent().isGenerateCallbacks(), false);
+ @Released EntryEventImpl ev = PutAllPRMessage.getEventFromEntry(theRegion, myId,myId, i, putallOp.putAllData, false, putallOp.getBaseEvent().getContext(), false, !putallOp.getBaseEvent().isGenerateCallbacks(), false);
try {
ev.setPutAllOperation(putallOp);
if (theRegion.basicPut(ev, false, false, null, false)) {
@@ -1786,11 +1787,13 @@ public class TXState implements TXStateInterface {
public void run() {
InternalDistributedMember myId = theRegion.getDistributionManager().getDistributionManagerId();
for (int i = 0; i < op.removeAllDataSize; ++i) {
- EntryEventImpl ev = RemoveAllPRMessage.getEventFromEntry(theRegion, myId, myId, i, op.removeAllData, false, op.getBaseEvent().getContext(), false, !op.getBaseEvent().isGenerateCallbacks());
+ @Released EntryEventImpl ev = RemoveAllPRMessage.getEventFromEntry(theRegion, myId, myId, i, op.removeAllData, false, op.getBaseEvent().getContext(), false, !op.getBaseEvent().isGenerateCallbacks());
ev.setRemoveAllOperation(op);
try {
theRegion.basicDestroy(ev, true/* should we invoke cacheWriter? */, null);
} catch (EntryNotFoundException ignore) {
+ } finally {
+ ev.release();
}
successfulOps.addKeyAndVersion(op.removeAllData[i].key, null);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TimestampedEntryEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TimestampedEntryEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TimestampedEntryEventImpl.java
index fca7d62..ce5cfab 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TimestampedEntryEventImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TimestampedEntryEventImpl.java
@@ -17,6 +17,7 @@
package com.gemstone.gemfire.internal.cache;
import com.gemstone.gemfire.cache.util.TimestampedEntryEvent;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
/**
* A subclass of EntryEventImpl used in WAN conflict resolution
@@ -30,6 +31,7 @@ public class TimestampedEntryEventImpl extends EntryEventImpl implements
private long newTimestamp;
private long oldTimestamp;
+ @Retained
public TimestampedEntryEventImpl(EntryEventImpl event, int newDSID, int oldDSID, long newTimestamp, long oldTimestamp) {
super(event);
this.newDSID = newDSID;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateEntryVersionOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateEntryVersionOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateEntryVersionOperation.java
index a3d1c19..fce4dee 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateEntryVersionOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateEntryVersionOperation.java
@@ -35,6 +35,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
/**
* This operation updates Version stamp of an entry if entry is available and
@@ -90,6 +91,7 @@ public class UpdateEntryVersionOperation extends DistributedCacheOperation {
}
@Override
+ @Retained
protected InternalCacheEvent createEvent(DistributedRegion rgn)
throws EntryNotFoundException {
@@ -97,7 +99,7 @@ public class UpdateEntryVersionOperation extends DistributedCacheOperation {
((KeyWithRegionContext)this.key).setRegionContext(rgn);
}
- EntryEventImpl ev = EntryEventImpl.create(rgn, getOperation(), this.key,
+ @Retained EntryEventImpl ev = EntryEventImpl.create(rgn, getOperation(), this.key,
null /* newValue */, this.callbackArg /*callbackArg*/, true /* originRemote*/ , getSender(), false /*generateCallbacks*/);
ev.setEventId(this.eventId);
ev.setVersionTag(this.versionTag);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateOperation.java
index 452c38f..fc9c7ff 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateOperation.java
@@ -45,6 +45,7 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.offheap.MemoryAllocatorImpl;
import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.util.BlobHelper;
import com.gemstone.gemfire.internal.util.Breadcrumbs;
@@ -198,6 +199,7 @@ public class UpdateOperation extends AbstractUpdateOperation
}
@Override
+ @Retained
protected InternalCacheEvent createEvent(DistributedRegion rgn)
throws EntryNotFoundException {
EntryEventImpl ev = createEntryEvent(rgn);
@@ -325,6 +327,7 @@ public class UpdateOperation extends AbstractUpdateOperation
}
}
+ @Retained
protected EntryEventImpl createEntryEvent(DistributedRegion rgn)
{
Object argNewValue = null;
@@ -333,7 +336,7 @@ public class UpdateOperation extends AbstractUpdateOperation
if (rgn.keyRequiresRegionContext()) {
((KeyWithRegionContext)this.key).setRegionContext(rgn);
}
- EntryEventImpl result = EntryEventImpl.create(rgn, getOperation(), this.key,
+ @Retained EntryEventImpl result = EntryEventImpl.create(rgn, getOperation(), this.key,
argNewValue, // oldValue,
this.callbackArg, originRemote, getSender(), generateCallbacks);
setOldValueInEvent(result);
@@ -568,6 +571,7 @@ public class UpdateOperation extends AbstractUpdateOperation
protected transient ClientProxyMembershipID clientID;
@Override
+ @Retained
final public EntryEventImpl createEntryEvent(DistributedRegion rgn)
{
// Object oldValue = null;
@@ -579,7 +583,7 @@ public class UpdateOperation extends AbstractUpdateOperation
if (rgn.keyRequiresRegionContext()) {
((KeyWithRegionContext)this.key).setRegionContext(rgn);
}
- EntryEventImpl ev = EntryEventImpl.create(rgn, getOperation(), this.key,
+ @Retained EntryEventImpl ev = EntryEventImpl.create(rgn, getOperation(), this.key,
argNewValue, this.callbackArg, originRemote, getSender(),
generateCallbacks);
ev.setContext(this.clientID);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/VersionTagHolder.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/VersionTagHolder.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/VersionTagHolder.java
new file mode 100644
index 0000000..48258df
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/VersionTagHolder.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+
+/**
+ * This class is just used to hold the version tag for an operation.
+ * It does need release to be called (unlike other EntryEventImpls)
+ * because it never has off-heap value references.
+ */
+public class VersionTagHolder extends EntryEventImpl {
+ public VersionTagHolder(VersionTag<?> tag) {
+ setVersionTag(tag);
+ disallowOffHeapValues();
+ }
+ public VersionTagHolder() {
+ this(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/DestroyMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/DestroyMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/DestroyMessage.java
index a30b9f2..3a6f04e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/DestroyMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/DestroyMessage.java
@@ -59,6 +59,8 @@ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
/**
* A class that specifies a destroy operation.
@@ -249,7 +251,7 @@ public class DestroyMessage extends PartitionMessageWithDirectReply {
if (eventSender == null) {
eventSender = getSender();
}
- EntryEventImpl event = null;
+ @Released EntryEventImpl event = null;
try {
if (r.keyRequiresRegionContext()) {
((KeyWithRegionContext)this.key).setRegionContext(r);
@@ -323,7 +325,7 @@ public class DestroyMessage extends PartitionMessageWithDirectReply {
}
}
else {
- EntryEventImpl e2 = createListenerEvent(event, r, dm.getDistributionManagerId());
+ @Released EntryEventImpl e2 = createListenerEvent(event, r, dm.getDistributionManagerId());
try {
r.invokeDestroyCallbacks(EnumListenerEvent.AFTER_DESTROY, e2, r.isInitialized(), true);
} finally {
@@ -411,7 +413,10 @@ public class DestroyMessage extends PartitionMessageWithDirectReply {
return this.eventId;
}
- /** create a new EntryEvent to be used in notifying listeners, bridge servers, etc. */
+ /** create a new EntryEvent to be used in notifying listeners, bridge servers, etc.
+ * Caller must release result if it is != to sourceEvent
+ */
+ @Retained
EntryEventImpl createListenerEvent(EntryEventImpl sourceEvent, PartitionedRegion r,
InternalDistributedMember member) {
final EntryEventImpl e2;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
index 93501aa..d3078a9 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
@@ -55,6 +55,7 @@ import com.gemstone.gemfire.internal.cache.InitialImageOperation;
import com.gemstone.gemfire.internal.cache.KeyWithRegionContext;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
+import com.gemstone.gemfire.internal.cache.VersionTagHolder;
import com.gemstone.gemfire.internal.cache.tier.InterestType;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
@@ -296,8 +297,7 @@ public final class FetchBulkEntriesMessage extends PartitionMessage
while (it.hasNext()) {
Object key = it.next();
- EntryEventImpl clientEvent = EntryEventImpl
- .createVersionTagHolder();
+ VersionTagHolder clientEvent = new VersionTagHolder();
Object value = map.get(key, null, true, true, true, null,
clientEvent, allowTombstones, false);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
index 047d1c7..d7e50f1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
@@ -58,6 +58,7 @@ import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
import com.gemstone.gemfire.internal.cache.TXManagerImpl;
import com.gemstone.gemfire.internal.cache.TXStateProxy;
import com.gemstone.gemfire.internal.cache.Token;
+import com.gemstone.gemfire.internal.cache.VersionTagHolder;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
@@ -182,7 +183,7 @@ public final class GetMessage extends PartitionMessageWithDirectReply
Object val = null;
try {
if (ds != null) {
- EntryEventImpl event = EntryEventImpl.createVersionTagHolder();
+ VersionTagHolder event = new VersionTagHolder();
try {
if (r.keyRequiresRegionContext()) {
((KeyWithRegionContext)this.key).setRegionContext(r);
@@ -214,8 +215,6 @@ public final class GetMessage extends PartitionMessageWithDirectReply
catch (DataLocationException e) {
sendReply(getSender(), getProcessorId(), dm, new ReplyException(e), r, startTime);
return false;
- } finally {
- event.release();
}
if (logger.isTraceEnabled(LogMarker.DM)) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/InvalidateMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/InvalidateMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/InvalidateMessage.java
index 8c14e49..60508c1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/InvalidateMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/InvalidateMessage.java
@@ -54,6 +54,7 @@ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
public final class InvalidateMessage extends DestroyMessage {
private static final Logger logger = LogService.getLogger();
@@ -179,7 +180,7 @@ public final class InvalidateMessage extends DestroyMessage {
if (r.keyRequiresRegionContext()) {
((KeyWithRegionContext)key).setRegionContext(r);
}
- final EntryEventImpl event = EntryEventImpl.create(
+ @Released final EntryEventImpl event = EntryEventImpl.create(
r,
getOperation(),
key,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PRUpdateEntryVersionMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PRUpdateEntryVersionMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PRUpdateEntryVersionMessage.java
index 7fcb031..e1766dc 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PRUpdateEntryVersionMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PRUpdateEntryVersionMessage.java
@@ -130,6 +130,7 @@ public class PRUpdateEntryVersionMessage extends
((KeyWithRegionContext) key).setRegionContext(pr);
}
+ // release not needed because disallowOffHeapValues called
final EntryEventImpl event = EntryEventImpl.create(pr, getOperation(),
getKey(), null, /* newValue */
null, /* callbackargs */
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
index cace5cc..a88f96f 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
@@ -72,6 +72,8 @@ import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationE
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
/**
* A Partitioned Region update message. Meant to be sent only to
@@ -359,12 +361,13 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
}
/* we need a event with content for waitForNodeOrCreateBucket() */
+ @Retained
public EntryEventImpl getFirstEvent(PartitionedRegion r) {
if (putAllPRDataSize == 0) {
return null;
}
- EntryEventImpl ev = EntryEventImpl.create(r,
+ @Retained EntryEventImpl ev = EntryEventImpl.create(r,
putAllPRData[0].getOp(),
putAllPRData[0].getKey(),
putAllPRData[0].getValue(),
@@ -408,7 +411,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
}
DistributedPutAllOperation dpao = null;
- EntryEventImpl baseEvent = null;
+ @Released EntryEventImpl baseEvent = null;
BucketRegion bucketRegion = null;
PartitionedRegionDataStore ds = r.getDataStore();
InternalDistributedMember myId = r.getDistributionManager().getDistributionManagerId();
@@ -486,7 +489,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
* in this request, because these request will be blocked by foundKey
*/
for (int i=0; i<putAllPRDataSize; i++) {
- EntryEventImpl ev = getEventFromEntry(r, myId, eventSender, i,putAllPRData,notificationOnly,bridgeContext,posDup,skipCallbacks, this.isPutDML);
+ @Released EntryEventImpl ev = getEventFromEntry(r, myId, eventSender, i,putAllPRData,notificationOnly,bridgeContext,posDup,skipCallbacks, this.isPutDML);
try {
key = ev.getKey();
@@ -596,6 +599,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
return true;
}
+ @Retained
public static EntryEventImpl getEventFromEntry(LocalRegion r,
InternalDistributedMember myId, InternalDistributedMember eventSender,
int idx, DistributedPutAllOperation.PutAllEntryData[] data,
@@ -610,7 +614,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
// true/* generate Callbacks */,
// prd.getEventID());
- EntryEventImpl ev = EntryEventImpl.create(r, prd.getOp(), prd.getKey(), prd
+ @Retained EntryEventImpl ev = EntryEventImpl.create(r, prd.getOp(), prd.getKey(), prd
.getValue(), null, false, eventSender, !skipCallbacks, prd.getEventID());
boolean evReturned = false;
try {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
index 77a47b7..db11a8e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
@@ -67,6 +67,8 @@ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.util.BlobHelper;
@@ -450,7 +452,10 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
// }
- /** create a new EntryEvent to be used in notifying listeners, bridge servers, etc. */
+ /** create a new EntryEvent to be used in notifying listeners, bridge servers, etc.
+ * Caller must release result if it is != to sourceEvent
+ */
+ @Retained
EntryEventImpl createListenerEvent(EntryEventImpl sourceEvent, PartitionedRegion r,
InternalDistributedMember member) {
final EntryEventImpl e2;
@@ -795,7 +800,7 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
if (r.keyRequiresRegionContext()) {
((KeyWithRegionContext)this.key).setRegionContext(r);
}
- final EntryEventImpl ev = EntryEventImpl.create(
+ @Released final EntryEventImpl ev = EntryEventImpl.create(
r,
getOperation(),
getKey(),
@@ -906,7 +911,7 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
}
}
else { // notificationOnly
- EntryEventImpl e2 = createListenerEvent(ev, r, dm.getDistributionManagerId());
+ @Released EntryEventImpl e2 = createListenerEvent(ev, r, dm.getDistributionManagerId());
final EnumListenerEvent le;
try {
if (e2.getOperation().isCreate()) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RemoveAllPRMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RemoveAllPRMessage.java
index c907943..fb46e0c 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RemoveAllPRMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -73,6 +73,8 @@ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
/**
* PR removeAll
@@ -339,12 +341,13 @@ public final class RemoveAllPRMessage extends PartitionMessageWithDirectReply
}
/* we need a event with content for waitForNodeOrCreateBucket() */
+ @Retained
public EntryEventImpl getFirstEvent(PartitionedRegion r) {
if (removeAllPRDataSize == 0) {
return null;
}
- EntryEventImpl ev = EntryEventImpl.create(r,
+ @Retained EntryEventImpl ev = EntryEventImpl.create(r,
removeAllPRData[0].getOp(),
removeAllPRData[0].getKey(),
null /*value*/,
@@ -388,7 +391,7 @@ public final class RemoveAllPRMessage extends PartitionMessageWithDirectReply
}
DistributedRemoveAllOperation op = null;
- EntryEventImpl baseEvent = null;
+ @Released EntryEventImpl baseEvent = null;
BucketRegion bucketRegion = null;
PartitionedRegionDataStore ds = r.getDataStore();
InternalDistributedMember myId = r.getDistributionManager().getDistributionManagerId();
@@ -466,7 +469,7 @@ public final class RemoveAllPRMessage extends PartitionMessageWithDirectReply
* in this request, because these request will be blocked by foundKey
*/
for (int i=0; i<removeAllPRDataSize; i++) {
- EntryEventImpl ev = getEventFromEntry(r, myId, eventSender, i,removeAllPRData,notificationOnly,bridgeContext,posDup,skipCallbacks);
+ @Released EntryEventImpl ev = getEventFromEntry(r, myId, eventSender, i,removeAllPRData,notificationOnly,bridgeContext,posDup,skipCallbacks);
try {
key = ev.getKey();
@@ -574,14 +577,14 @@ public final class RemoveAllPRMessage extends PartitionMessageWithDirectReply
public boolean canStartRemoteTransaction() {
return true;
}
-
+ @Retained
public static EntryEventImpl getEventFromEntry(LocalRegion r,
InternalDistributedMember myId, InternalDistributedMember eventSender,
int idx, DistributedRemoveAllOperation.RemoveAllEntryData[] data,
boolean notificationOnly, ClientProxyMembershipID bridgeContext,
boolean posDup, boolean skipCallbacks) {
RemoveAllEntryData dataItem = data[idx];
- EntryEventImpl ev = EntryEventImpl.create(r, dataItem.getOp(), dataItem.getKey(), null, null, false, eventSender, !skipCallbacks, dataItem.getEventID());
+ @Retained EntryEventImpl ev = EntryEventImpl.create(r, dataItem.getOp(), dataItem.getKey(), null, null, false, eventSender, !skipCallbacks, dataItem.getEventID());
boolean evReturned = false;
try {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
index 0be56f9..c264616 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
@@ -69,6 +69,7 @@ import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper;
import com.gemstone.gemfire.internal.cache.TXManagerImpl;
import com.gemstone.gemfire.internal.cache.TXStateProxy;
import com.gemstone.gemfire.internal.cache.Token;
+import com.gemstone.gemfire.internal.cache.VersionTagHolder;
import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
import com.gemstone.gemfire.internal.cache.tier.Command;
import com.gemstone.gemfire.internal.cache.tier.InterestType;
@@ -1141,7 +1142,7 @@ public abstract class BaseCommand implements Command {
if (region != null) {
if (region.containsKey(entryKey) || region.containsTombstone(entryKey)) {
- EntryEventImpl versionHolder = EntryEventImpl.createVersionTagHolder();
+ VersionTagHolder versionHolder = new VersionTagHolder();
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
// From Get70.getValueAndIsObject()
Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true, false);
@@ -1237,7 +1238,7 @@ public abstract class BaseCommand implements Command {
}
for (Object key : region.keySet(true)) {
- EntryEventImpl versionHolder = EntryEventImpl.createVersionTagHolder();
+ VersionTagHolder versionHolder = new VersionTagHolder();
if (keyPattern != null) {
if (!(key instanceof String)) {
// key is not a String, cannot apply regex to this entry
@@ -1338,11 +1339,11 @@ public abstract class BaseCommand implements Command {
VersionedObjectList values, Object riKeys, Set keySet, ServerConnection servConn)
throws IOException {
Object key = null;
- EntryEventImpl versionHolder = null;
+ VersionTagHolder versionHolder = null;
ClientProxyMembershipID requestingClient = servConn == null ? null : servConn.getProxyID();
for (Iterator it = keySet.iterator(); it.hasNext();) {
key = it.next();
- versionHolder = EntryEventImpl.createVersionTagHolder();
+ versionHolder = new VersionTagHolder();
Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true, false);
@@ -1545,8 +1546,7 @@ public abstract class BaseCommand implements Command {
for (Iterator it = keyList.iterator(); it.hasNext();) {
Object key = it.next();
if (region.containsKey(key) || region.containsTombstone(key)) {
- EntryEventImpl versionHolder = EntryEventImpl
- .createVersionTagHolder();
+ VersionTagHolder versionHolder = new VersionTagHolder();
ClientProxyMembershipID id = servConn == null ? null : servConn
.getProxyID();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java
index 8968f62..ce067d7 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java
@@ -90,6 +90,7 @@ import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
import com.gemstone.gemfire.security.AuthenticationFailedException;
import com.gemstone.gemfire.security.AuthenticationRequiredException;
@@ -771,7 +772,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater,
else if (region.hasServerProxy()
&& ServerResponseMatrix.checkForValidStateAfterNotification(region,
key, m.getMessageType()) && (withInterest || !withCQs)) {
- EntryEventImpl newEvent = null;
+ @Released EntryEventImpl newEvent = null;
try {
// Create an event and put the entry
newEvent = EntryEventImpl.create(
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy.java
index 888cf62..593cc08 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy.java
@@ -21,6 +21,7 @@ package com.gemstone.gemfire.internal.cache.tier.sockets.command;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.EventIDHolder;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
@@ -152,7 +153,7 @@ public class Destroy extends BaseCommand {
}
}
region.basicBridgeDestroy(key, callbackArg, servConn.getProxyID(),
- true, new EntryEventImpl(eventId));
+ true, new EventIDHolder(eventId));
servConn.setModificationInfo(true, regionName, key);
}
catch (EntryNotFoundException e) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java
index 513e902..1da422a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java
@@ -24,6 +24,7 @@ import com.gemstone.gemfire.internal.cache.Token;
import com.gemstone.gemfire.internal.cache.OpType;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.EventIDHolder;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
@@ -201,7 +202,7 @@ public class Destroy65 extends BaseCommand {
.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
EventID eventId = new EventID(servConn.getEventMemberIDByteArray(),
threadId, sequenceId);
- EntryEventImpl clientEvent = new EntryEventImpl(eventId);
+ EventIDHolder clientEvent = new EventIDHolder(eventId);
Breadcrumbs.setEventId(eventId);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index e2fb686..fce1175 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -37,6 +37,7 @@ import com.gemstone.gemfire.i18n.LogWriterI18n;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.EventIDHolder;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.KeyWithRegionContext;
import com.gemstone.gemfire.internal.cache.LocalRegion;
@@ -194,7 +195,7 @@ public class GatewayReceiverCommand extends BaseCommand {
int actionType = actionTypePart.getInt();
long versionTimeStamp = VersionTag.ILLEGAL_VERSION_TIMESTAMP;
- EntryEventImpl clientEvent = null;
+ EventIDHolder clientEvent = null;
boolean callbackArgExists = false;
@@ -306,7 +307,7 @@ public class GatewayReceiverCommand extends BaseCommand {
if (region == null) {
handleRegionNull(servConn, regionName, batchId);
} else {
- clientEvent = new EntryEventImpl(eventId);
+ clientEvent = new EventIDHolder(eventId);
if (versionTimeStamp > 0) {
VersionTag tag = VersionTag.create(region.getVersionMember());
tag.setIsGatewayTag(true);
@@ -415,7 +416,7 @@ public class GatewayReceiverCommand extends BaseCommand {
if (region == null) {
handleRegionNull(servConn, regionName, batchId);
} else {
- clientEvent = new EntryEventImpl(eventId);
+ clientEvent = new EventIDHolder(eventId);
if (versionTimeStamp > 0) {
VersionTag tag = VersionTag.create(region.getVersionMember());
tag.setIsGatewayTag(true);
@@ -514,7 +515,7 @@ public class GatewayReceiverCommand extends BaseCommand {
if (region == null) {
handleRegionNull(servConn, regionName, batchId);
} else {
- clientEvent = new EntryEventImpl(eventId);
+ clientEvent = new EventIDHolder(eventId);
if (versionTimeStamp > 0) {
VersionTag tag = VersionTag.create(region.getVersionMember());
tag.setIsGatewayTag(true);
@@ -596,7 +597,7 @@ public class GatewayReceiverCommand extends BaseCommand {
handleRegionNull(servConn, regionName, batchId);
} else {
- clientEvent = new EntryEventImpl(eventId);
+ clientEvent = new EventIDHolder(eventId);
if (versionTimeStamp > 0) {
VersionTag tag = VersionTag.create(region.getVersionMember());
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
index e382c57..b2be3c3 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
@@ -30,6 +30,7 @@ import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.Token;
+import com.gemstone.gemfire.internal.cache.VersionTagHolder;
import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
import com.gemstone.gemfire.internal.cache.tier.Command;
import com.gemstone.gemfire.internal.cache.tier.MessageType;
@@ -303,13 +304,9 @@ public class Get70 extends BaseCommand {
// }
// } else {
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
- EntryEventImpl versionHolder = EntryEventImpl.createVersionTagHolder();
- try {
+ VersionTagHolder versionHolder = new VersionTagHolder();
// TODO OFFHEAP: optimize
data = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, versionHolder, true, true /*allowReadFromHDFS*/);
- }finally {
- versionHolder.release();
- }
// }
versionTag = versionHolder.getVersionTag();
@@ -368,12 +365,8 @@ public class Get70 extends BaseCommand {
@Retained Object data = null;
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
- EntryEventImpl versionHolder = EntryEventImpl.createVersionTagHolder();
- try {
- data = ((LocalRegion) region).getRetained(key, callbackArg, true, true, id, versionHolder, true);
- }finally {
- versionHolder.release();
- }
+ VersionTagHolder versionHolder = new VersionTagHolder();
+ data = ((LocalRegion) region).getRetained(key, callbackArg, true, true, id, versionHolder, true);
versionTag = versionHolder.getVersionTag();
// If it is Token.REMOVED, Token.DESTROYED,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate.java
index b037a88..3f313c9 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate.java
@@ -28,6 +28,7 @@ import com.gemstone.gemfire.cache.operations.InvalidateOperationContext;
import com.gemstone.gemfire.distributed.internal.DistributionStats;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.EventIDHolder;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
@@ -153,7 +154,7 @@ public class Invalidate extends BaseCommand {
.invalidateAuthorize(regionName, key, callbackArg);
callbackArg = invalidateContext.getCallbackArg();
}
- EntryEventImpl clientEvent = new EntryEventImpl(eventId);
+ EventIDHolder clientEvent = new EventIDHolder(eventId);
// msg.isRetry might be set by v7.0 and later clients
if (msg.isRetry()) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put.java
index 46f4471..35cc085 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put.java
@@ -21,6 +21,7 @@ package com.gemstone.gemfire.internal.cache.tier.sockets.command;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.EventIDHolder;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
import com.gemstone.gemfire.internal.cache.tier.Command;
@@ -192,12 +193,12 @@ public class Put extends BaseCommand {
// isObject
// the true after null doesn't matter and is not used.
result = region.basicBridgeCreate(key, null, true, callbackArg,
- servConn.getProxyID(), true, new EntryEventImpl(eventId), false);
+ servConn.getProxyID(), true, new EventIDHolder(eventId), false);
}
else {
// Put the entry
result = region.basicBridgePut(key, value, null, isObject, callbackArg,
- servConn.getProxyID(), true, new EntryEventImpl(eventId), servConn.isSqlFabricSystem());
+ servConn.getProxyID(), true, new EventIDHolder(eventId), servConn.isSqlFabricSystem());
}
if (result) {
servConn.setModificationInfo(true, regionName, key);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put61.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put61.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put61.java
index 2943091..cc4c1d6 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put61.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put61.java
@@ -30,6 +30,7 @@ import com.gemstone.gemfire.cache.operations.PutOperationContext;
import com.gemstone.gemfire.distributed.internal.DistributionStats;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.EventIDHolder;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
@@ -223,7 +224,7 @@ public class Put61 extends BaseCommand {
// isObject
// the true after null doesn't matter and is not used.
result = region.basicBridgeCreate(key, null, true, callbackArg,
- servConn.getProxyID(), true, new EntryEventImpl(eventId), false);
+ servConn.getProxyID(), true, new EventIDHolder(eventId), false);
}
else {
// Put the entry
@@ -232,7 +233,7 @@ public class Put61 extends BaseCommand {
delta = valuePart.getSerializedForm();
}
result = region.basicBridgePut(key, value, delta, isObject,
- callbackArg, servConn.getProxyID(), true, new EntryEventImpl(eventId), servConn
+ callbackArg, servConn.getProxyID(), true, new EventIDHolder(eventId), servConn
.isSqlFabricSystem());
}
if (result) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put65.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put65.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put65.java
index 092e2ee..9be1390 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put65.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put65.java
@@ -37,6 +37,7 @@ import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.CachedDeserializable;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.EventIDHolder;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.TXManagerImpl;
@@ -220,7 +221,7 @@ public class Put65 extends BaseCommand {
long sequenceId = EventID
.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
- EntryEventImpl clientEvent = new EntryEventImpl(
+ EventIDHolder clientEvent = new EventIDHolder(
new EventID(servConn.getEventMemberIDByteArray(),
threadId, sequenceId));
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistTxEntryEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistTxEntryEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistTxEntryEvent.java
index a108965..6e7c21c 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistTxEntryEvent.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistTxEntryEvent.java
@@ -36,6 +36,7 @@ import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation.EntryVersionsList;
import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation.PutAllEntryData;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
/**
*
@@ -46,7 +47,13 @@ public class DistTxEntryEvent extends EntryEventImpl {
protected static final byte HAS_PUTALL_OP = 0x1;
protected static final byte HAS_REMOVEALL_OP = 0x2;
- // For Serialization
+ /**
+ * TODO DISTTX: callers of this constructor need to
+ * make sure that release is called. In general
+ * the distributed tx code needs to be reviewed to
+ * see if it correctly handles off-heap.
+ */
+ @Retained
public DistTxEntryEvent(EntryEventImpl entry) {
super(entry);
}
@@ -179,7 +186,7 @@ public class DistTxEntryEvent extends EntryEventImpl {
}
}
}
-
+ // TODO DISTTX: release this event?
EntryEventImpl e = EntryEventImpl.create(
this.region, Operation.PUTALL_CREATE,
null, null, null, true, this.getDistributedMember(), true, true);
@@ -246,7 +253,7 @@ public class DistTxEntryEvent extends EntryEventImpl {
removeAllData[i].versionTag = versionTags.get(i);
}
}
-
+ // TODO DISTTX: release this event
EntryEventImpl e = EntryEventImpl.create(
this.region, Operation.REMOVEALL_DESTROY,
null, null, null, true, this.getDistributedMember(), true, true);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
index 323dcc4..6723646 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
@@ -46,6 +46,7 @@ import com.gemstone.gemfire.internal.cache.partitioned.RemoveAllPRMessage;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
import java.util.Collections;
import java.util.HashMap;
@@ -386,7 +387,7 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
partialKeys.consolidate(pre.getResult());
} catch (Exception ex) {
// If failed at other exception
- EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
+ @Released EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
try {
partialKeys.saveFailedKey(firstEvent.getKey(), ex);
} finally {
@@ -444,8 +445,12 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
partialKeys.consolidate(pre.getResult());
} catch (Exception ex) {
// If failed at other exception
- EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
+ @Released EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
+ try {
partialKeys.saveFailedKey(firstEvent.getKey(), ex);
+ } finally {
+ firstEvent.release();
+ }
}
}
pr.prStats.endRemoveAll(startTime);
@@ -477,7 +482,7 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
// retry the put remotely until it finds the right node managing the bucket
InternalDistributedMember currentTarget = pr.getOrCreateNodeForBucketWrite(bucketId.intValue(), null);
if(!currentTarget.equals(this.state.getTarget())) {
- EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
+ @Released EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
try {
throw new TransactionDataNotColocatedException(LocalizedStrings.PartitionedRegion_KEY_0_NOT_COLOCATED_WITH_TRANSACTION.toLocalizedString(firstEvent.getKey()));
} finally {
@@ -507,8 +512,12 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
// retry the put remotely until it finds the right node managing the bucket
InternalDistributedMember currentTarget = pr.getOrCreateNodeForBucketWrite(bucketId.intValue(), null);
if(!currentTarget.equals(this.state.getTarget())) {
- EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
+ @Released EntryEventImpl firstEvent = prMsg.getFirstEvent(pr);
+ try {
throw new TransactionDataNotColocatedException(LocalizedStrings.PartitionedRegion_KEY_0_NOT_COLOCATED_WITH_TRANSACTION.toLocalizedString(firstEvent.getKey()));
+ } finally {
+ firstEvent.release();
+ }
}
try {
return pr.tryToSendOneRemoveAllMessage(prMsg,currentTarget);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
index 4f3488b..94524bd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
@@ -74,6 +74,7 @@ import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.offheap.Releasable;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
@@ -875,8 +876,8 @@ public abstract class AbstractGatewaySender implements GatewaySender,
getStatistics().incEventsFiltered();
return;
}
-
- EntryEventImpl clonedEvent = new EntryEventImpl(event, false);
+ // released by this method or transfers ownership to TmpQueueEvent
+ @Released EntryEventImpl clonedEvent = new EntryEventImpl(event, false);
boolean freeClonedEvent = true;
try {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 9cde6dd..2ef5e19 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -816,10 +816,12 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
if (pdxRegion != null && pdxRegion.size() != pdxEventsMap.size()) {
for (Map.Entry<Object, Object> typeEntry : pdxRegion.entrySet()) {
if(!pdxEventsMap.containsKey(typeEntry.getKey())){
+ // event should never be off-heap so it does not need to be released
EntryEventImpl event = EntryEventImpl.create(
(LocalRegion) pdxRegion, Operation.UPDATE,
typeEntry.getKey(), typeEntry.getValue(), null, false,
cache.getMyId());
+ event.disallowOffHeapValues();
event.setEventId(new EventID(cache.getSystem()));
List<Integer> allRemoteDSIds = new ArrayList<Integer>();
for (GatewaySender sender : cache.getGatewaySenders()) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/BatchDestroyOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/BatchDestroyOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/BatchDestroyOperation.java
index 988c526..717253e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/BatchDestroyOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/BatchDestroyOperation.java
@@ -44,6 +44,7 @@ import com.gemstone.gemfire.internal.cache.RegionQueue;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
/**
* Handles distribution messaging for destroying a batch of entry in a queue region.
@@ -157,6 +158,7 @@ public class BatchDestroyOperation extends DistributedCacheOperation {
}
@Override
+ @Retained
protected final InternalCacheEvent createEvent(DistributedRegion rgn)
throws EntryNotFoundException {
EntryEventImpl ev = createEntryEvent(rgn);
@@ -176,8 +178,9 @@ public class BatchDestroyOperation extends DistributedCacheOperation {
}
}
+ @Retained
EntryEventImpl createEntryEvent(DistributedRegion rgn) {
- EntryEventImpl event = EntryEventImpl.create(rgn, getOperation(), this.key,
+ @Retained EntryEventImpl event = EntryEventImpl.create(rgn, getOperation(), this.key,
null, this.callbackArg, true, getSender());
// event.setNewEventId(); Don't set the event here...
setOldValueInEvent(event);