You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/22 19:28:15 UTC
[07/25] incubator-geode git commit: GEODE-1238: ensure
EntryEventImpls are released
GEODE-1238: ensure EntryEventImpls are released
-- reviewed all EntryEventImpl creators
-- created two new subclasses to mark ones that are never off-heap
-- added annotations and comments on all creators
-- added missing release calls
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/70221350
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/70221350
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/70221350
Branch: refs/heads/feature/GEODE-17-2
Commit: 7022135035f5a1804eff38c2a159c0352728373d
Parents: ff914bd
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Fri Apr 15 11:42:19 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Thu Apr 21 10:53:30 2016 -0700
----------------------------------------------------------------------
.../client/internal/ServerRegionProxy.java | 3 +-
.../internal/admin/ClientStatsManager.java | 3 +-
.../cache/AbstractBucketRegionQueue.java | 3 +-
.../internal/cache/AbstractLRURegionMap.java | 4 +
.../internal/cache/AbstractRegionEntry.java | 2 +-
.../internal/cache/AbstractRegionMap.java | 15 +-
.../gemfire/internal/cache/BucketRegion.java | 14 +-
.../internal/cache/BucketRegionQueue.java | 7 +-
.../internal/cache/DestroyOperation.java | 6 +-
.../internal/cache/DistPeerTXStateStub.java | 2 +
.../gemfire/internal/cache/DistTXState.java | 9 +-
.../cache/DistTXStateOnCoordinator.java | 2 +
.../DistTXStateProxyImplOnCoordinator.java | 2 +
.../cache/DistributedCacheOperation.java | 4 +-
.../cache/DistributedPutAllOperation.java | 23 ++-
.../internal/cache/DistributedRegion.java | 9 +-
.../cache/DistributedRemoveAllOperation.java | 22 ++-
.../gemfire/internal/cache/EntryEventImpl.java | 41 +-----
.../gemfire/internal/cache/EntryExpiryTask.java | 9 +-
.../gemfire/internal/cache/EventIDHolder.java | 35 +++++
.../gemfire/internal/cache/FilterProfile.java | 13 +-
.../gemfire/internal/cache/HARegion.java | 5 +-
.../internal/cache/InitialImageOperation.java | 2 +-
.../internal/cache/InvalidateOperation.java | 5 +-
.../gemfire/internal/cache/LocalRegion.java | 141 ++++++++++---------
.../internal/cache/PartitionedRegion.java | 13 +-
.../gemfire/internal/cache/ProxyRegionMap.java | 7 +-
.../gemfire/internal/cache/QueuedOperation.java | 3 +-
.../internal/cache/RemoteDestroyMessage.java | 3 +-
.../internal/cache/RemoteInvalidateMessage.java | 4 +-
.../internal/cache/RemotePutAllMessage.java | 5 +-
.../internal/cache/RemotePutMessage.java | 4 +-
.../internal/cache/RemoteRemoveAllMessage.java | 5 +-
.../cache/SearchLoadAndWriteProcessor.java | 6 +-
.../gemfire/internal/cache/TXCommitMessage.java | 4 +-
.../gemfire/internal/cache/TXEntryState.java | 6 +-
.../gemfire/internal/cache/TXRegionState.java | 1 +
.../gemfire/internal/cache/TXRmtEvent.java | 9 +-
.../gemfire/internal/cache/TXState.java | 9 +-
.../cache/TimestampedEntryEventImpl.java | 2 +
.../cache/UpdateEntryVersionOperation.java | 4 +-
.../gemfire/internal/cache/UpdateOperation.java | 8 +-
.../internal/cache/VersionTagHolder.java | 34 +++++
.../cache/partitioned/DestroyMessage.java | 11 +-
.../partitioned/FetchBulkEntriesMessage.java | 4 +-
.../internal/cache/partitioned/GetMessage.java | 5 +-
.../cache/partitioned/InvalidateMessage.java | 3 +-
.../PRUpdateEntryVersionMessage.java | 1 +
.../cache/partitioned/PutAllPRMessage.java | 12 +-
.../internal/cache/partitioned/PutMessage.java | 11 +-
.../cache/partitioned/RemoveAllPRMessage.java | 13 +-
.../cache/tier/sockets/BaseCommand.java | 12 +-
.../cache/tier/sockets/CacheClientUpdater.java | 3 +-
.../cache/tier/sockets/command/Destroy.java | 3 +-
.../cache/tier/sockets/command/Destroy65.java | 3 +-
.../sockets/command/GatewayReceiverCommand.java | 11 +-
.../cache/tier/sockets/command/Get70.java | 15 +-
.../cache/tier/sockets/command/Invalidate.java | 3 +-
.../cache/tier/sockets/command/Put.java | 5 +-
.../cache/tier/sockets/command/Put61.java | 5 +-
.../cache/tier/sockets/command/Put65.java | 3 +-
.../internal/cache/tx/DistTxEntryEvent.java | 13 +-
.../cache/tx/PartitionedTXRegionStub.java | 17 ++-
.../cache/wan/AbstractGatewaySender.java | 5 +-
.../AbstractGatewaySenderEventProcessor.java | 2 +
.../cache/wan/serial/BatchDestroyOperation.java | 5 +-
...urrentSerialGatewaySenderEventProcessor.java | 3 +-
.../wan/serial/SerialGatewaySenderQueue.java | 2 +-
.../DistributedAckRegionCCEDUnitTest.java | 6 +-
.../internal/cache/UpdateVersionJUnitTest.java | 4 +-
.../cache/ha/EventIdOptimizationDUnitTest.java | 5 +-
.../DestroyEntryPropagationDUnitTest.java | 5 +-
.../internal/cache/UpdateVersionDUnitTest.java | 4 +-
73 files changed, 437 insertions(+), 265 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionProxy.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionProxy.java
index 40f240f..23e04fd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionProxy.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionProxy.java
@@ -42,6 +42,7 @@ import com.gemstone.gemfire.internal.cache.ClientServerObserver;
import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.EventIDHolder;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.TXCommitMessage;
import com.gemstone.gemfire.internal.cache.TXManagerImpl;
@@ -192,7 +193,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc
EventID eventId,
Object callbackArg)
{
- EntryEventImpl event = new EntryEventImpl(eventId);
+ EventIDHolder event = new EventIDHolder(eventId);
PutOp.execute(con, this.pool, this.regionName, key, value, event, callbackArg, this.pool.getPRSingleHopEnabled());
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/ClientStatsManager.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/ClientStatsManager.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/ClientStatsManager.java
index 20f6dbf..2db7eb1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/ClientStatsManager.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/ClientStatsManager.java
@@ -36,6 +36,7 @@ import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.management.internal.cli.CliUtil;
/**
@@ -84,7 +85,7 @@ public class ClientStatsManager {
ServerRegionProxy regionProxy = new ServerRegionProxy(ClientHealthMonitoringRegion.ADMIN_REGION_NAME, pool);
EventID eventId = new EventID(ds);
- EntryEventImpl event = new EntryEventImpl((Object)null);
+ @Released EntryEventImpl event = new EntryEventImpl((Object)null);
try {
event.setEventId(eventId);
regionProxy.putForMetaRegion(ds.getMemberId(), stats, null, event, null, true);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
index 2c8f493..1f8da88 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
@@ -48,6 +48,7 @@ import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
public abstract class AbstractBucketRegionQueue extends BucketRegion {
protected static final Logger logger = LogService.getLogger();
@@ -232,7 +233,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
if (logger.isDebugEnabled()) {
logger.debug(" destroying primary key {}", key);
}
- EntryEventImpl event = getPartitionedRegion().newDestroyEntryEvent(key,
+ @Released EntryEventImpl event = getPartitionedRegion().newDestroyEntryEvent(key,
null);
event.setEventId(new EventID(cache.getSystem()));
try {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java
index a19ee66..2cc7a55 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java
@@ -177,6 +177,10 @@ public abstract class AbstractLRURegionMap extends AbstractRegionMap {
// make sure this cached deserializable is still in the entry
// @todo what if a clear is done and this entry is no longer in the region?
{
+ if (_getCCHelper().getEvictionAlgorithm().isLRUEntry()) {
+ // no need to worry about the value changing form with entry LRU.
+ return false;
+ }
Object curVal = le._getValue(); // OFFHEAP: _getValue ok
if (curVal != cd) {
if (cd instanceof StoredObject) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
index e78c8eb..debc3da 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
@@ -2070,7 +2070,7 @@ public abstract class AbstractRegionEntry implements RegionEntry,
newValue[0] = v;
}
};
- TimestampedEntryEventImpl timestampedEvent =
+ @Released TimestampedEntryEventImpl timestampedEvent =
(TimestampedEntryEventImpl)event.getTimestampedEvent(tagDsid, stampDsid, tagTime, stampTime);
// gateway conflict resolvers will usually want to see the old value
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index 1cb7a20..3286373 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -1626,7 +1626,7 @@ public abstract class AbstractRegionMap implements RegionMap {
// a receipt of a TXCommitMessage AND there are callbacks installed
// for this region
boolean invokeCallbacks = shouldCreateCBEvent(owner, isRegionReady || inRI);
- EntryEventImpl cbEvent = createCBEvent(owner, op,
+ @Released EntryEventImpl cbEvent = createCBEvent(owner, op,
key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
try {
@@ -1858,7 +1858,7 @@ public abstract class AbstractRegionMap implements RegionMap {
// the destroy is already applied on the Initial image provider, thus
// causing region entry to be absent.
// Notify clients with client events.
- EntryEventImpl cbEvent = createCBEvent(owner, op,
+ @Released EntryEventImpl cbEvent = createCBEvent(owner, op,
key, null, txId, txEvent, eventId, aCallbackArgument,
filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
try {
@@ -2397,7 +2397,7 @@ public abstract class AbstractRegionMap implements RegionMap {
final LocalRegion owner = _getOwner();
owner.checkBeforeEntrySync(txEvent);
- EntryEventImpl cbEvent = null;
+ @Released EntryEventImpl cbEvent = null;
boolean forceNewEntry = !owner.isInitialized() && owner.isAllEvents();
final boolean hasRemoteOrigin = !((TXId)txId).getMemberId().equals(owner.getMyId());
@@ -2997,7 +2997,7 @@ public abstract class AbstractRegionMap implements RegionMap {
stats.incEvictionsInProgress();
// set the flag on event saying the entry should be evicted
// and not indexed
- EntryEventImpl destroyEvent = EntryEventImpl.create (owner, Operation.DESTROY, event.getKey(),
+ @Released EntryEventImpl destroyEvent = EntryEventImpl.create (owner, Operation.DESTROY, event.getKey(),
null/* newValue */, null, false, owner.getMyId());
try {
@@ -3340,8 +3340,8 @@ public abstract class AbstractRegionMap implements RegionMap {
final boolean isTXHost = txEntryState != null;
final boolean isClientTXOriginator = owner.cache.isClient() && !hasRemoteOrigin;
final boolean isRegionReady = owner.isInitialized();
- EntryEventImpl cbEvent = null;
- EntryEventImpl sqlfEvent = null;
+ @Released EntryEventImpl cbEvent = null;
+ @Released EntryEventImpl sqlfEvent = null;
boolean invokeCallbacks = shouldCreateCBEvent(owner, isRegionReady);
boolean cbEventInPending = false;
cbEvent = createCBEvent(owner, putOp, key, newValue, txId,
@@ -3785,6 +3785,7 @@ public abstract class AbstractRegionMap implements RegionMap {
}
/** create a callback event for applying a transactional change to the local cache */
+ @Retained
public static final EntryEventImpl createCBEvent(final LocalRegion re,
Operation op, Object key, Object newValue, TransactionId txId,
TXRmtEvent txEvent,EventID eventId, Object aCallbackArgument,FilterRoutingInfo filterRoutingInfo,ClientProxyMembershipID bridgeContext, TXEntryState txEntryState, VersionTag versionTag, long tailKey)
@@ -3799,7 +3800,7 @@ public abstract class AbstractRegionMap implements RegionMap {
eventRegion = re.getPartitionedRegion();
}
- EntryEventImpl retVal = EntryEventImpl.create(
+ @Retained EntryEventImpl retVal = EntryEventImpl.create(
re, op, key, newValue,
aCallbackArgument,
txEntryState == null, originator);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index 3946fda..e0f043b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -97,6 +97,8 @@ import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.concurrent.AtomicLong5;
@@ -1118,6 +1120,7 @@ implements Bucket
* @return an event for EVICT_DESTROY
*/
@Override
+ @Retained
protected EntryEventImpl generateEvictDestroyEvent(Object key) {
EntryEventImpl event = super.generateEvictDestroyEvent(key);
event.setInvokePRCallbacks(true); //see bug 40797
@@ -1620,6 +1623,7 @@ implements Bucket
//we already distributed this info.
}
+ @Retained
EntryEventImpl createEventForPR(EntryEventImpl sourceEvent) {
EntryEventImpl e2 = new EntryEventImpl(sourceEvent);
boolean returned = false;
@@ -1663,7 +1667,7 @@ implements Bucket
}
super.invokeTXCallbacks(eventType, event, callThem);
}
- final EntryEventImpl prevent = createEventForPR(event);
+ @Released final EntryEventImpl prevent = createEventForPR(event);
try {
this.partitionedRegion.invokeTXCallbacks(eventType, prevent, this.partitionedRegion.isInitialized() ? callDispatchListenerEvent : false);
} finally {
@@ -1691,7 +1695,7 @@ implements Bucket
}
super.invokeDestroyCallbacks(eventType, event, callThem, notifyGateways);
}
- final EntryEventImpl prevent = createEventForPR(event);
+ @Released final EntryEventImpl prevent = createEventForPR(event);
try {
this.partitionedRegion.invokeDestroyCallbacks(eventType, prevent, this.partitionedRegion.isInitialized() ? callDispatchListenerEvent : false, false);
} finally {
@@ -1718,7 +1722,7 @@ implements Bucket
}
super.invokeInvalidateCallbacks(eventType, event, callThem);
}
- final EntryEventImpl prevent = createEventForPR(event);
+ @Released final EntryEventImpl prevent = createEventForPR(event);
try {
this.partitionedRegion.invokeInvalidateCallbacks(eventType, prevent, this.partitionedRegion.isInitialized() ? callDispatchListenerEvent : false);
} finally {
@@ -1749,7 +1753,7 @@ implements Bucket
super.invokePutCallbacks(eventType, event, callThem, notifyGateways);
}
- final EntryEventImpl prevent = createEventForPR(event);
+ @Released final EntryEventImpl prevent = createEventForPR(event);
try {
this.partitionedRegion.invokePutCallbacks(eventType, prevent,
this.partitionedRegion.isInitialized() ? callDispatchListenerEvent : false, false);
@@ -2577,7 +2581,7 @@ implements Bucket
public boolean customEvictDestroy(Object key)
{
checkReadiness();
- final EntryEventImpl event =
+ @Released final EntryEventImpl event =
generateCustomEvictDestroyEvent(key);
event.setCustomEviction(true);
boolean locked = false;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
index 6b76c31..8b4f240 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
@@ -53,6 +53,7 @@ import com.gemstone.gemfire.internal.concurrent.Atomics;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
/**
*
@@ -504,10 +505,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
if (logger.isDebugEnabled()) {
logger.debug(" destroying primary key {}", key);
}
- EntryEventImpl event = getPartitionedRegion().newDestroyEntryEvent(key,
+ @Released EntryEventImpl event = getPartitionedRegion().newDestroyEntryEvent(key,
null);
- event.setEventId(new EventID(cache.getSystem()));
try {
+ event.setEventId(new EventID(cache.getSystem()));
event.setRegion(this);
basicDestroy(event, true, null);
checkReadiness();
@@ -531,8 +532,6 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
+ key, rde);
}
} finally {
- //merge42180: are we considering offheap in cedar. Comment freeOffHeapReference intentionally
- //event.freeOffHeapReferences();
event.release();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DestroyOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DestroyOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DestroyOperation.java
index b0ac963..e267190 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DestroyOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DestroyOperation.java
@@ -34,6 +34,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
/**
* Handles distribution messaging for destroying an entry in a region.
@@ -125,6 +126,7 @@ public class DestroyOperation extends DistributedCacheOperation
}
@Override
+ @Retained
protected final InternalCacheEvent createEvent(DistributedRegion rgn)
throws EntryNotFoundException {
if (rgn.keyRequiresRegionContext()) {
@@ -151,9 +153,10 @@ public class DestroyOperation extends DistributedCacheOperation
}
}
+ @Retained
EntryEventImpl createEntryEvent(DistributedRegion rgn)
{
- EntryEventImpl event = EntryEventImpl.create(rgn,
+ @Retained EntryEventImpl event = EntryEventImpl.create(rgn,
getOperation(), this.key, null, this.callbackArg, true, getSender());
// event.setNewEventId(); Don't set the event here...
setOldValueInEvent(event);
@@ -258,6 +261,7 @@ public class DestroyOperation extends DistributedCacheOperation
}
@Override
+ @Retained
EntryEventImpl createEntryEvent(DistributedRegion rgn)
{
EntryEventImpl event = EntryEventImpl.create(rgn, getOperation(),
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistPeerTXStateStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistPeerTXStateStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistPeerTXStateStub.java
index cd17770..5c53df1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistPeerTXStateStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistPeerTXStateStub.java
@@ -259,6 +259,7 @@ public final class DistPeerTXStateStub extends PeerTXStateStub implements
public void postPutAll(DistributedPutAllOperation putallOp,
VersionedObjectList successfulPuts, LocalRegion region) {
super.postPutAll(putallOp, successfulPuts, region);
+ // TODO DISTTX: event is never released
EntryEventImpl event = EntryEventImpl.createPutAllEvent(putallOp, region,
Operation.PUTALL_CREATE, putallOp.getBaseEvent().getKey(), putallOp
.getBaseEvent().getValue());
@@ -271,6 +272,7 @@ public final class DistPeerTXStateStub extends PeerTXStateStub implements
public void postRemoveAll(DistributedRemoveAllOperation removeAllOp,
VersionedObjectList successfulOps, LocalRegion region) {
super.postRemoveAll(removeAllOp, successfulOps, region);
+ // TODO DISTTX: event is never released
EntryEventImpl event = EntryEventImpl.createRemoveAllEvent(removeAllOp,
region, removeAllOp.getBaseEvent().getKey());
event.setEventId(removeAllOp.getBaseEvent().getEventId());
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
index 19cc175..f8475ae 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
@@ -44,6 +44,7 @@ import com.gemstone.gemfire.internal.cache.tx.DistTxKeyInfo;
import com.gemstone.gemfire.internal.cache.tx.DistTxEntryEvent;
import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
/**
* TxState on a datanode VM
@@ -568,7 +569,7 @@ public class DistTXState extends TXState {
InternalDistributedMember myId = theRegion.getDistributionManager()
.getDistributionManagerId();
for (int i = 0; i < putallOp.putAllDataSize; ++i) {
- EntryEventImpl ev = PutAllPRMessage.getEventFromEntry(theRegion,
+ @Released EntryEventImpl ev = PutAllPRMessage.getEventFromEntry(theRegion,
myId, myId, i, putallOp.putAllData, false, putallOp
.getBaseEvent().getContext(), false, !putallOp.getBaseEvent()
.isGenerateCallbacks(), false);
@@ -628,10 +629,11 @@ public class DistTXState extends TXState {
InternalDistributedMember myId = theRegion.getDistributionManager()
.getDistributionManagerId();
for (int i = 0; i < op.removeAllDataSize; ++i) {
- EntryEventImpl ev = RemoveAllPRMessage.getEventFromEntry(theRegion,
+ @Released EntryEventImpl ev = RemoveAllPRMessage.getEventFromEntry(theRegion,
myId, myId, i, op.removeAllData, false, op.getBaseEvent()
.getContext(), false, !op.getBaseEvent()
.isGenerateCallbacks());
+ try {
ev.setRemoveAllOperation(op);
// below if condition returns true on secondary when TXState is
// updated in preCommit only on secondary
@@ -664,6 +666,9 @@ public class DistTXState extends TXState {
} catch (EntryNotFoundException ignore) {
}
successfulOps.addKeyAndVersion(op.removeAllData[i].key, null);
+ } finally {
+ ev.release();
+ }
}
}
}, op.getBaseEvent().getEventId());
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateOnCoordinator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateOnCoordinator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateOnCoordinator.java
index 10d892a..33bec1c 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateOnCoordinator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateOnCoordinator.java
@@ -240,6 +240,7 @@ public final class DistTXStateOnCoordinator extends DistTXState implements
public void postPutAll(DistributedPutAllOperation putallOp,
VersionedObjectList successfulPuts, LocalRegion region) {
super.postPutAll(putallOp, successfulPuts, region);
+ // TODO DISTTX: event is never released
EntryEventImpl event = EntryEventImpl.createPutAllEvent(putallOp, region,
Operation.PUTALL_CREATE, putallOp.getBaseEvent().getKey(), putallOp
.getBaseEvent().getValue());
@@ -252,6 +253,7 @@ public final class DistTXStateOnCoordinator extends DistTXState implements
public void postRemoveAll(DistributedRemoveAllOperation removeAllOp,
VersionedObjectList successfulOps, LocalRegion region) {
super.postRemoveAll(removeAllOp, successfulOps, region);
+ // TODO DISTTX: event is never released
EntryEventImpl event = EntryEventImpl.createRemoveAllEvent(removeAllOp,
region, removeAllOp.getBaseEvent().getKey());
event.setEventId(removeAllOp.getBaseEvent().getEventId());
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnCoordinator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnCoordinator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnCoordinator.java
index 75a1df7..67d63b1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnCoordinator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnCoordinator.java
@@ -896,6 +896,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
DistributedPutAllOperation putAllForBucket =
bucketToPutallMap.get(bucketId);;
if (putAllForBucket == null) {
+ // TODO DISTTX: event is never released
EntryEventImpl event = EntryEventImpl.createPutAllEvent(null, region,
Operation.PUTALL_CREATE, key,
putallOp.putAllData[i].getValue());
@@ -976,6 +977,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
DistributedRemoveAllOperation removeAllForBucket =
bucketToRemoveAllMap.get(bucketId);
if (removeAllForBucket == null) {
+ // TODO DISTTX: event is never released
EntryEventImpl event = EntryEventImpl.createRemoveAllEvent(op, region, key);
event.setEventId(op.removeAllData[i].getEventID());
removeAllForBucket = new DistributedRemoveAllOperation(
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
index 79ae4ee..063f823 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
@@ -74,6 +74,7 @@ import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
@@ -1104,7 +1105,6 @@ public abstract class DistributedCacheOperation {
protected void basicProcess(DistributionManager dm, LocalRegion lclRgn) {
Throwable thr = null;
boolean sendReply = true;
- InternalCacheEvent event = null;
if (logger.isTraceEnabled()) {
logger.trace("DistributedCacheOperation.basicProcess: {}", this);
@@ -1140,7 +1140,7 @@ public abstract class DistributedCacheOperation {
return;
}
- event = createEvent(rgn);
+ @Released InternalCacheEvent event = createEvent(rgn);
try {
boolean isEntry = event.getOperation().isEntry();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
index 2e83cb8..4b4d4d3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
@@ -61,6 +61,8 @@ import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
/**
* Handles distribution of a Region.putall operation.
@@ -193,8 +195,9 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
public boolean hasNext() {
return DistributedPutAllOperation.this.putAllDataSize > position;
};
+ @Retained
public Object next() {
- EntryEventImpl ev = getEventForPosition(position);
+ @Retained EntryEventImpl ev = getEventForPosition(position);
position++;
return ev;
};
@@ -214,7 +217,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
}
}
-
+ @Retained
public EntryEventImpl getEventForPosition(int position) {
PutAllEntryData entry = this.putAllData[position];
if (entry == null) {
@@ -224,7 +227,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
return entry.event;
}
LocalRegion region = (LocalRegion)this.event.getRegion();
- EntryEventImpl ev = EntryEventImpl.create(
+ @Retained EntryEventImpl ev = EntryEventImpl.create(
region,
entry.getOp(),
entry.getKey(), null/* value */, this.event.getCallbackArgument(),
@@ -813,13 +816,17 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
}
FilterRoutingInfo consolidated = new FilterRoutingInfo();
for (int i=0; i<this.putAllData.length; i++) {
- EntryEventImpl ev = getEventForPosition(i);
+ @Released EntryEventImpl ev = getEventForPosition(i);
if (ev != null) {
+ try {
FilterRoutingInfo eventRouting = advisor.adviseFilterRouting(ev, cacheOpRecipients);
if (eventRouting != null) {
consolidated.addFilterInfo(eventRouting);
}
putAllData[i].filterRouting = eventRouting;
+ } finally {
+ ev.release();
+ }
}
}
// we need to create routing information for each PUT event
@@ -1093,11 +1100,12 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
* basicOperateOnRegion
*/
@Override
+ @Retained
protected InternalCacheEvent createEvent(DistributedRegion rgn)
throws EntryNotFoundException
{
// Gester: We have to specify eventId for the message of MAP
- EntryEventImpl event = EntryEventImpl.create(
+ @Retained EntryEventImpl event = EntryEventImpl.create(
rgn,
Operation.PUTALL_UPDATE /* op */, null /* key */, null/* value */,
this.callbackArg, true /* originRemote */, getSender());
@@ -1133,7 +1141,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
*/
public void doEntryPut(PutAllEntryData entry, DistributedRegion rgn,
boolean requiresRegionContext, boolean fetchFromHDFS, boolean isPutDML) {
- EntryEventImpl ev = PutAllMessage.createEntryEvent(entry, getSender(),
+ @Released EntryEventImpl ev = PutAllMessage.createEntryEvent(entry, getSender(),
this.context, rgn,
requiresRegionContext, this.possibleDuplicate,
this.needsRouting, this.callbackArg, true, skipCallbacks);
@@ -1164,6 +1172,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
* @param callbackArg
* @return the event to be used in applying the element
*/
+ @Retained
public static EntryEventImpl createEntryEvent(PutAllEntryData entry,
InternalDistributedMember sender, ClientProxyMembershipID context,
DistributedRegion rgn, boolean requiresRegionContext,
@@ -1174,7 +1183,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
((KeyWithRegionContext)key).setRegionContext(rgn);
}
EventID evId = entry.getEventID();
- EntryEventImpl ev = EntryEventImpl.create(rgn, entry.getOp(),
+ @Retained EntryEventImpl ev = EntryEventImpl.create(rgn, entry.getOp(),
key, null/* value */, callbackArg,
originRemote, sender, !skipCallbacks,
evId);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
index 467efc6..addba8e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
@@ -121,6 +121,7 @@ import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewa
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.sequencelog.RegionLogger;
import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
@@ -2442,13 +2443,12 @@ public class DistributedRegion extends LocalRegion implements
}
long lastModified = 0L;
boolean fromServer = false;
- EntryEventImpl event = null;
+ @Released EntryEventImpl event = null;
@Retained Object result = null;
try {
{
if (this.srp != null) {
- EntryEventImpl holder = EntryEventImpl.createVersionTagHolder();
- try {
+ VersionTagHolder holder = new VersionTagHolder();
Object value = this.srp.get(key, aCallbackArgument, holder);
fromServer = value != null;
if (fromServer) {
@@ -2461,9 +2461,6 @@ public class DistributedRegion extends LocalRegion implements
clientEvent.setVersionTag(holder.getVersionTag());
}
}
- } finally {
- holder.release();
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRemoveAllOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRemoveAllOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRemoveAllOperation.java
index 70d39dc..34889aa 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRemoveAllOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRemoveAllOperation.java
@@ -55,6 +55,8 @@ import com.gemstone.gemfire.internal.cache.versions.DiskVersionTag;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
/**
* Handles distribution of a Region.removeAll operation.
@@ -175,8 +177,9 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
public boolean hasNext() {
return DistributedRemoveAllOperation.this.removeAllDataSize > position;
};
+ @Retained
public Object next() {
- EntryEventImpl ev = getEventForPosition(position);
+ @Retained EntryEventImpl ev = getEventForPosition(position);
position++;
return ev;
};
@@ -196,6 +199,7 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
}
}
+ @Retained
public EntryEventImpl getEventForPosition(int position) {
RemoveAllEntryData entry = this.removeAllData[position];
if (entry == null) {
@@ -205,7 +209,7 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
return entry.event;
}
LocalRegion region = (LocalRegion)this.event.getRegion();
- EntryEventImpl ev = EntryEventImpl.create(
+ @Retained EntryEventImpl ev = EntryEventImpl.create(
region,
entry.getOp(),
entry.getKey(), null/* value */, this.event.getCallbackArgument(),
@@ -568,13 +572,17 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
}
FilterRoutingInfo consolidated = new FilterRoutingInfo();
for (int i=0; i<this.removeAllData.length; i++) {
- EntryEventImpl ev = getEventForPosition(i);
+ @Released EntryEventImpl ev = getEventForPosition(i);
if (ev != null) {
+ try {
FilterRoutingInfo eventRouting = advisor.adviseFilterRouting(ev, cacheOpRecipients);
if (eventRouting != null) {
consolidated.addFilterInfo(eventRouting);
}
removeAllData[i].filterRouting = eventRouting;
+ } finally {
+ ev.release();
+ }
}
}
// we need to create routing information for each PUT event
@@ -838,11 +846,12 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
* basicOperateOnRegion
*/
@Override
+ @Retained
protected InternalCacheEvent createEvent(DistributedRegion rgn)
throws EntryNotFoundException
{
// Gester: We have to specify eventId for the message of MAP
- EntryEventImpl event = EntryEventImpl.create(
+ @Retained EntryEventImpl event = EntryEventImpl.create(
rgn,
Operation.REMOVEALL_DESTROY, null /* key */, null/* value */,
this.callbackArg, true /* originRemote */, getSender());
@@ -878,7 +887,7 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
*/
public void doEntryRemove(RemoveAllEntryData entry, DistributedRegion rgn,
boolean requiresRegionContext) {
- EntryEventImpl ev = RemoveAllMessage.createEntryEvent(entry, getSender(),
+ @Released EntryEventImpl ev = RemoveAllMessage.createEntryEvent(entry, getSender(),
this.context, rgn,
requiresRegionContext, this.possibleDuplicate,
this.needsRouting, this.callbackArg, true, skipCallbacks);
@@ -918,6 +927,7 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
* @param callbackArg
* @return the event to be used in applying the element
*/
+ @Retained
public static EntryEventImpl createEntryEvent(RemoveAllEntryData entry,
InternalDistributedMember sender, ClientProxyMembershipID context,
DistributedRegion rgn, boolean requiresRegionContext,
@@ -928,7 +938,7 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
((KeyWithRegionContext)key).setRegionContext(rgn);
}
EventID evId = entry.getEventID();
- EntryEventImpl ev = EntryEventImpl.create(rgn, entry.getOp(),
+ @Retained EntryEventImpl ev = EntryEventImpl.create(rgn, entry.getOp(),
key, null/* value */, callbackArg,
originRemote, sender, !skipCallbacks,
evId);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
index 3c87654..e3458e7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
@@ -214,29 +214,6 @@ public class EntryEventImpl
}
/**
- * create a new entry event that will be used for conveying version information
- * and anything else of use while processing another event
- * @return the empty event object
- */
- @Retained
- public static EntryEventImpl createVersionTagHolder() {
- return createVersionTagHolder(null);
- }
-
- /**
- * create a new entry event that will be used for conveying version information
- * and anything else of use while processing another event
- * @return the empty event object
- */
- @Retained
- public static EntryEventImpl createVersionTagHolder(VersionTag tag) {
- EntryEventImpl result = new EntryEventImpl();
- result.setVersionTag(tag);
- result.disallowOffHeapValues();
- return result;
- }
-
- /**
* Reads the contents of this message from the given input.
*/
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
@@ -389,18 +366,6 @@ public class EntryEventImpl
public EntryEventImpl(Object key2) {
this.keyInfo = new KeyInfo(key2, null, null);
}
-
- /**
- * This constructor is used to create a bridge event in server-side
- * command classes. Events created with this are not intended to be
- * used in cache operations.
- * @param id the identity of the client's event
- */
- @Retained
- public EntryEventImpl(EventID id) {
- this.eventID = id;
- this.offHeapOk = false;
- }
/**
* Creates and returns an EntryEventImpl. Generates and assigns a bucket id to the
@@ -493,7 +458,7 @@ public class EntryEventImpl
DistributedPutAllOperation putAllOp, LocalRegion region,
Operation entryOp, Object entryKey, @Retained(ENTRY_EVENT_NEW_VALUE) Object entryNewValue)
{
- EntryEventImpl e;
+ @Retained EntryEventImpl e;
if (putAllOp != null) {
EntryEventImpl event = putAllOp.getBaseEvent();
if (event.isBridgeEvent()) {
@@ -515,11 +480,12 @@ public class EntryEventImpl
return e;
}
+ @Retained
protected static EntryEventImpl createRemoveAllEvent(
DistributedRemoveAllOperation op,
LocalRegion region,
Object entryKey) {
- EntryEventImpl e;
+ @Retained EntryEventImpl e;
final Operation entryOp = Operation.REMOVEALL_DESTROY;
if (op != null) {
EntryEventImpl event = op.getBaseEvent();
@@ -2995,6 +2961,7 @@ public class EntryEventImpl
}
/** returns a copy of this event with the additional fields for WAN conflict resolution */
+ @Retained
public TimestampedEntryEvent getTimestampedEvent(
final int newDSID, final int oldDSID,
final long newTimestamp, final long oldTimestamp) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryExpiryTask.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryExpiryTask.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryExpiryTask.java
index cba7802..64b669a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryExpiryTask.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryExpiryTask.java
@@ -35,6 +35,7 @@ import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.TimeoutException;
import com.gemstone.gemfire.internal.InternalStatisticsDisabledException;
import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
public class EntryExpiryTask extends ExpiryTask {
@@ -120,7 +121,7 @@ public class EntryExpiryTask extends ExpiryTask {
RegionEntry re = getCheckedRegionEntry();
Object key = re.getKey();
LocalRegion lr = getLocalRegion();
- EntryEventImpl event = EntryEventImpl.create(
+ @Released EntryEventImpl event = EntryEventImpl.create(
lr, Operation.EXPIRE_DESTROY, key, null,
createExpireEntryCallback(lr, key), false, lr.getMyId());
try {
@@ -142,7 +143,7 @@ public class EntryExpiryTask extends ExpiryTask {
RegionEntry re = getCheckedRegionEntry();
Object key = re.getKey();
LocalRegion lr = getLocalRegion();
- EntryEventImpl event = EntryEventImpl.create(lr,
+ @Released EntryEventImpl event = EntryEventImpl.create(lr,
Operation.EXPIRE_INVALIDATE, key, null,
createExpireEntryCallback(lr, key), false, lr.getMyId());
try {
@@ -162,7 +163,7 @@ public class EntryExpiryTask extends ExpiryTask {
RegionEntry re = getCheckedRegionEntry();
Object key = re.getKey();
LocalRegion lr = getLocalRegion();
- EntryEventImpl event = EntryEventImpl.create(lr,
+ @Released EntryEventImpl event = EntryEventImpl.create(lr,
Operation.EXPIRE_LOCAL_DESTROY, key, null,
createExpireEntryCallback(lr, key), false, lr.getMyId());
try {
@@ -182,7 +183,7 @@ public class EntryExpiryTask extends ExpiryTask {
RegionEntry re = getCheckedRegionEntry();
Object key = re.getKey();
LocalRegion lr = getLocalRegion();
- EntryEventImpl event = EntryEventImpl.create(lr,
+ @Released EntryEventImpl event = EntryEventImpl.create(lr,
Operation.EXPIRE_LOCAL_INVALIDATE, key, null,
createExpireEntryCallback(lr, key), false, lr.getMyId());
try {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EventIDHolder.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EventIDHolder.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EventIDHolder.java
new file mode 100644
index 0000000..6dd8586
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EventIDHolder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+/**
+ * This class is an event that just holds an EventID.
+ * Unlike other EntryEventImpls this class does not need to be released
+ * since its values are never off-heap.
+ */
+public class EventIDHolder extends EntryEventImpl {
+ /*
+ * This constructor is used to create a bridge event in server-side
+ * command classes. Events created with this are not intended to be
+ * used in cache operations.
+ * @param id the identity of the client's event
+ */
+ public EventIDHolder(EventID id) {
+ setEventId(id);
+ disallowOffHeapValues();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/FilterProfile.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/FilterProfile.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/FilterProfile.java
index 90c4dc0..65e7a84 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/FilterProfile.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/FilterProfile.java
@@ -73,6 +73,7 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.util.concurrent.CopyOnWriteHashMap;
/**
@@ -1173,7 +1174,8 @@ public class FilterProfile implements DataSerializableFixedID {
for (int idx=0; idx < size; idx++) {
PutAllEntryData pEntry = putAllData[idx];
if (pEntry != null) {
- final EntryEventImpl ev = dpao.getEventForPosition(idx);
+ @Released final EntryEventImpl ev = dpao.getEventForPosition(idx);
+ try {
FilterRoutingInfo fri = pEntry.filterRouting;
FilterInfo fi = null;
if (fri != null) {
@@ -1212,6 +1214,9 @@ public class FilterProfile implements DataSerializableFixedID {
fi.setInterestedClientsInv(clientsInv);
}
ev.setLocalFilterInfo(fi);
+ } finally {
+ ev.release();
+ }
}
}
}
@@ -1230,7 +1235,8 @@ public class FilterProfile implements DataSerializableFixedID {
for (int idx=0; idx < size; idx++) {
RemoveAllEntryData pEntry = removeAllData[idx];
if (pEntry != null) {
- final EntryEventImpl ev = op.getEventForPosition(idx);
+ @Released final EntryEventImpl ev = op.getEventForPosition(idx);
+ try {
FilterRoutingInfo fri = pEntry.filterRouting;
FilterInfo fi = null;
if (fri != null) {
@@ -1272,6 +1278,9 @@ public class FilterProfile implements DataSerializableFixedID {
// this.region.getLogWriterI18n().fine("setting event routing to " + fi);
// }
ev.setLocalFilterInfo(fi);
+ } finally {
+ ev.release();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
index fa29bf3..3896800 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
@@ -47,6 +47,7 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.HAEventWrapper;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
/**
* This region is being implemented to suppress distribution of puts and to
@@ -229,7 +230,7 @@ public final class HARegion extends DistributedRegion
throws TimeoutException, CacheWriterException {
checkReadiness();
- EntryEventImpl event = EntryEventImpl.create(this, Operation.UPDATE, key,
+ @Released EntryEventImpl event = EntryEventImpl.create(this, Operation.UPDATE, key,
value, aCallbackArgument, false, getMyId());
try {
@@ -409,7 +410,7 @@ public final class HARegion extends DistributedRegion
op = Operation.LOCAL_LOAD_UPDATE;
}
- EntryEventImpl event = EntryEventImpl.create(
+ @Released EntryEventImpl event = EntryEventImpl.create(
this, op, key, value,
aCallbackArgument, false, getMyId(), generateCallbacks);
try {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
index a72ca8e..b43ba2d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
@@ -931,7 +931,7 @@ public class InitialImageOperation {
.create((byte[])tmpValue);
}
// dummy EntryEvent to pass for SQLF index maintenance
- final EntryEventImpl ev = EntryEventImpl.create(this.region,
+ @Released final EntryEventImpl ev = EntryEventImpl.create(this.region,
Operation.CREATE, null, null, null, true, null, false, false);
try {
ev.setKeyInfo(this.region.getKeyInfo(entry.key,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InvalidateOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InvalidateOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InvalidateOperation.java
index de6adda..6e1d91e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InvalidateOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InvalidateOperation.java
@@ -34,6 +34,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
/**
* Handles distribution messaging for invalidating an entry in a region.
@@ -108,12 +109,13 @@ public class InvalidateOperation extends DistributedCacheOperation
}
@Override
+ @Retained
protected InternalCacheEvent createEvent(DistributedRegion rgn)
throws EntryNotFoundException {
if (rgn.keyRequiresRegionContext()) {
((KeyWithRegionContext)this.key).setRegionContext(rgn);
}
- EntryEventImpl ev = EntryEventImpl.create(
+ @Retained EntryEventImpl ev = EntryEventImpl.create(
rgn, getOperation(), this.key,
null, this.callbackArg, true, getSender());
ev.setEventId(this.eventId);
@@ -194,6 +196,7 @@ public class InvalidateOperation extends DistributedCacheOperation
transient ClientProxyMembershipID context;
@Override
+ @Retained
protected InternalCacheEvent createEvent(DistributedRegion rgn)
throws EntryNotFoundException {
EntryEventImpl event = (EntryEventImpl)super.createEvent(rgn);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70221350/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 3ff48bb..4360b2a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -206,6 +206,7 @@ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
import com.gemstone.gemfire.internal.offheap.ReferenceCountHelper;
import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
@@ -1119,12 +1120,12 @@ public class LocalRegion extends AbstractRegion
public final void create(Object key, Object value, Object aCallbackArgument)
throws TimeoutException, EntryExistsException, CacheWriterException {
long startPut = CachePerfStats.getStatTime();
- EntryEventImpl event = newCreateEntryEvent(key, value, aCallbackArgument);
+ @Released EntryEventImpl event = newCreateEntryEvent(key, value, aCallbackArgument);
validatedCreate(event, startPut);
- // TODO OFFHEAP: validatedCreate calls freeOffHeapResources
+ // TODO OFFHEAP: validatedCreate calls release
}
- public final void validatedCreate(EntryEventImpl event, long startPut)
+ public final void validatedCreate(@Released EntryEventImpl event, long startPut)
throws TimeoutException, EntryExistsException, CacheWriterException {
try {
@@ -1152,14 +1153,13 @@ public class LocalRegion extends AbstractRegion
}
}
} finally {
-
event.release();
-
}
}
// split into a separate newCreateEntryEvent since SQLFabric may need to
// manipulate event before doing the put (e.g. posDup flag)
+ @Retained
public final EntryEventImpl newCreateEntryEvent(Object key, Object value,
Object aCallbackArgument) {
@@ -1191,16 +1191,16 @@ public class LocalRegion extends AbstractRegion
public final Object destroy(Object key, Object aCallbackArgument)
throws TimeoutException, EntryNotFoundException, CacheWriterException {
- EntryEventImpl event = newDestroyEntryEvent(key, aCallbackArgument);
+ @Released EntryEventImpl event = newDestroyEntryEvent(key, aCallbackArgument);
return validatedDestroy(key, event);
- // TODO OFFHEAP: validatedDestroy calls freeOffHeapResources
+ // TODO OFFHEAP: validatedDestroy calls release
}
/**
* Destroys entry without performing validations. Call this after validating
* key, callback arg, and runtime state.
*/
- public Object validatedDestroy(Object key, EntryEventImpl event)
+ public Object validatedDestroy(Object key, @Released EntryEventImpl event)
throws TimeoutException, EntryNotFoundException, CacheWriterException
{
try {
@@ -1221,6 +1221,7 @@ public class LocalRegion extends AbstractRegion
// split into a separate newDestroyEntryEvent since SQLFabric may need to
// manipulate event before doing the put (e.g. posDup flag)
+ @Retained
public final EntryEventImpl newDestroyEntryEvent(Object key,
Object aCallbackArgument) {
validateKey(key);
@@ -1731,15 +1732,15 @@ public class LocalRegion extends AbstractRegion
public Object put(Object key, Object value, Object aCallbackArgument)
throws TimeoutException, CacheWriterException {
long startPut = CachePerfStats.getStatTime();
- EntryEventImpl event = newUpdateEntryEvent(key, value, aCallbackArgument);
+ @Released EntryEventImpl event = newUpdateEntryEvent(key, value, aCallbackArgument);
//Since Sqlfire directly calls validatedPut, the freeing is done in
// validatedPut
return validatedPut(event, startPut);
- // TODO OFFHEAP: validatedPut calls freeOffHeapResources
+ // TODO OFFHEAP: validatedPut calls release
}
- public final Object validatedPut(EntryEventImpl event, long startPut)
+ public final Object validatedPut(@Released EntryEventImpl event, long startPut)
throws TimeoutException, CacheWriterException {
try {
@@ -1776,6 +1777,7 @@ public class LocalRegion extends AbstractRegion
// split into a separate newUpdateEntryEvent since SQLFabric may need to
// manipulate event before doing the put (e.g. posDup flag)
+ @Retained
public final EntryEventImpl newUpdateEntryEvent(Object key, Object value,
Object aCallbackArgument) {
@@ -1792,7 +1794,7 @@ public class LocalRegion extends AbstractRegion
// was modified to call the other EntryEventImpl constructor so that
// an id will be generated by default. Null was passed in anyway.
// generate EventID
- final EntryEventImpl event = EntryEventImpl.create(
+ @Retained final EntryEventImpl event = EntryEventImpl.create(
this, Operation.UPDATE, key,
value, aCallbackArgument, false, getMyId());
boolean eventReturned = false;
@@ -1808,6 +1810,7 @@ public class LocalRegion extends AbstractRegion
* Creates an EntryEventImpl that is optimized to not fetch data from HDFS.
* This is meant to be used by PUT dml from GemFireXD.
*/
+ @Retained
public final EntryEventImpl newPutEntryEvent(Object key, Object value,
Object aCallbackArgument) {
EntryEventImpl ev = newUpdateEntryEvent(key, value, aCallbackArgument);
@@ -2392,7 +2395,7 @@ public class LocalRegion extends AbstractRegion
protected void validatedInvalidate(Object key, Object aCallbackArgument)
throws TimeoutException, EntryNotFoundException
{
- EntryEventImpl event = EntryEventImpl.create(
+ @Released EntryEventImpl event = EntryEventImpl.create(
this, Operation.INVALIDATE,
key, null, aCallbackArgument, false, getMyId());
try {
@@ -2411,7 +2414,7 @@ public class LocalRegion extends AbstractRegion
validateKey(key);
checkReadiness();
checkForNoAccess();
- EntryEventImpl event = EntryEventImpl.create(
+ @Released EntryEventImpl event = EntryEventImpl.create(
this,
Operation.LOCAL_DESTROY, key, null, aCallbackArgument, false, getMyId());
if (generateEventID()) {
@@ -2482,7 +2485,7 @@ public class LocalRegion extends AbstractRegion
checkReadiness();
checkForNoAccess();
- EntryEventImpl event = EntryEventImpl.create(
+ @Released EntryEventImpl event = EntryEventImpl.create(
this,
Operation.LOCAL_INVALIDATE, key, null/* newValue */, callbackArgument,
false, getMyId());
@@ -3020,7 +3023,7 @@ public class LocalRegion extends AbstractRegion
final Object aCallbackArgument = keyInfo.getCallbackArg();
Object value = null;
boolean fromServer = false;
- EntryEventImpl holder = null;
+ VersionTagHolder holder = null;
/*
* First lets try the server
@@ -3028,13 +3031,9 @@ public class LocalRegion extends AbstractRegion
{
ServerRegionProxy mySRP = getServerProxy();
if (mySRP != null) {
- holder = EntryEventImpl.createVersionTagHolder();
- try {
- value = mySRP.get(key, aCallbackArgument, holder);
- fromServer = value != null;
- } finally {
- holder.release();
- }
+ holder = new VersionTagHolder();
+ value = mySRP.get(key, aCallbackArgument, holder);
+ fromServer = value != null;
}
}
@@ -3085,7 +3084,7 @@ public class LocalRegion extends AbstractRegion
op = Operation.LOCAL_LOAD_UPDATE;
}
- EntryEventImpl event
+ @Released EntryEventImpl event
= EntryEventImpl.create(this, op, key, value, aCallbackArgument,
false, getMyId(), generateCallbacks);
try {
@@ -4434,7 +4433,7 @@ public class LocalRegion extends AbstractRegion
}
checkReadiness();
validateKey(key);
- EntryEventImpl event = EntryEventImpl.create(this, Operation.LOCAL_DESTROY,
+ @Released EntryEventImpl event = EntryEventImpl.create(this, Operation.LOCAL_DESTROY,
key, false, getMyId(), false /* generateCallbacks */, true);
try {
basicDestroy(event,
@@ -5580,7 +5579,7 @@ public class LocalRegion extends AbstractRegion
//to get Hash. If the partitioning column is different from primary key,
//the resolver for Sqlfabric is not able to obtain the hash object used for creation of KeyInfo
- final EntryEventImpl event = EntryEventImpl.create(this, Operation.CREATE, key,
+ @Released final EntryEventImpl event = EntryEventImpl.create(this, Operation.CREATE, key,
value, callbackArg, false /* origin remote */, client.getDistributedMember(),
true /* generateCallbacks */,
eventId);
@@ -5654,7 +5653,7 @@ public class LocalRegion extends AbstractRegion
}
}
- final EntryEventImpl event = EntryEventImpl.create(this, Operation.UPDATE, key,
+ @Released final EntryEventImpl event = EntryEventImpl.create(this, Operation.UPDATE, key,
null /* new value */, callbackArg,
false /* origin remote */, memberId.getDistributedMember(),
true /* generateCallbacks */,
@@ -5801,7 +5800,7 @@ public class LocalRegion extends AbstractRegion
concurrencyConfigurationCheck(versionTag);
// Create an event and put the entry
- EntryEventImpl event =
+ @Released EntryEventImpl event =
EntryEventImpl.create(this,
Operation.INVALIDATE,
key, null /* newValue */,
@@ -5856,7 +5855,7 @@ public class LocalRegion extends AbstractRegion
concurrencyConfigurationCheck(versionTag);
// Create an event and destroy the entry
- EntryEventImpl event =
+ @Released EntryEventImpl event =
EntryEventImpl.create(this,
Operation.DESTROY,
key, null /* newValue */,
@@ -5938,7 +5937,7 @@ public class LocalRegion extends AbstractRegion
}
// Create an event and put the entry
- final EntryEventImpl event = EntryEventImpl.create(this, Operation.DESTROY, key,
+ @Released final EntryEventImpl event = EntryEventImpl.create(this, Operation.DESTROY, key,
null /* new value */, callbackArg,
false /* origin remote */, memberId.getDistributedMember(),
true /* generateCallbacks */,
@@ -5978,7 +5977,7 @@ public class LocalRegion extends AbstractRegion
}
// Create an event and put the entry
- final EntryEventImpl event = EntryEventImpl.create(this, Operation.INVALIDATE, key,
+ @Released final EntryEventImpl event = EntryEventImpl.create(this, Operation.INVALIDATE, key,
null /* new value */, callbackArg,
false /* origin remote */, memberId.getDistributedMember(),
true /* generateCallbacks */,
@@ -6004,7 +6003,7 @@ public class LocalRegion extends AbstractRegion
ClientProxyMembershipID memberId, boolean fromClient, EntryEventImpl clientEvent) {
// Create an event and update version stamp of the entry
- EntryEventImpl event = EntryEventImpl.create(this, Operation.UPDATE_VERSION_STAMP, key,
+ @Released EntryEventImpl event = EntryEventImpl.create(this, Operation.UPDATE_VERSION_STAMP, key,
null /* new value */, null /*callbackArg*/,
false /* origin remote */, memberId.getDistributedMember(),
false /* generateCallbacks */,
@@ -6368,8 +6367,7 @@ public class LocalRegion extends AbstractRegion
protected void notifyTimestampsToGateways(EntryEventImpl event) {
// Create updateTimeStampEvent from event.
- EntryEventImpl updateTimeStampEvent = EntryEventImpl.createVersionTagHolder(event.getVersionTag());
- try {
+ VersionTagHolder updateTimeStampEvent = new VersionTagHolder(event.getVersionTag());
updateTimeStampEvent.setOperation(Operation.UPDATE_VERSION_STAMP);
updateTimeStampEvent.setKeyInfo(event.getKeyInfo());
updateTimeStampEvent.setGenerateCallbacks(false);
@@ -6394,9 +6392,6 @@ public class LocalRegion extends AbstractRegion
updateTimeStampEvent.setRegion(event.getRegion());
notifyGatewaySender(EnumListenerEvent.TIMESTAMP_UPDATE, updateTimeStampEvent);
}
- } finally {
- updateTimeStampEvent.release();
- }
}
@@ -7276,8 +7271,9 @@ public class LocalRegion extends AbstractRegion
* @param key - the key that this event is related to
* @return an event for EVICT_DESTROY
*/
+ @Retained
protected EntryEventImpl generateEvictDestroyEvent(final Object key) {
- EntryEventImpl event = EntryEventImpl.create(
+ @Retained EntryEventImpl event = EntryEventImpl.create(
this, Operation.EVICT_DESTROY, key, null/* newValue */,
null, false, getMyId());
// Fix for bug#36963
@@ -7287,8 +7283,10 @@ public class LocalRegion extends AbstractRegion
event.setFetchFromHDFS(false);
return event;
}
- protected EntryEventImpl generateCustomEvictDestroyEvent(final Object key) {
- EntryEventImpl event = EntryEventImpl.create(
+
+ @Retained
+ protected EntryEventImpl generateCustomEvictDestroyEvent(final Object key) {
+ @Retained EntryEventImpl event = EntryEventImpl.create(
this, Operation.CUSTOM_EVICT_DESTROY, key, null/* newValue */,
null, false, getMyId());
@@ -7307,7 +7305,7 @@ public class LocalRegion extends AbstractRegion
{
checkReadiness();
- final EntryEventImpl event =
+ @Released final EntryEventImpl event =
generateEvictDestroyEvent(entry.getKey());
try {
return mapDestroy(event,
@@ -8207,7 +8205,7 @@ public class LocalRegion extends AbstractRegion
for (Iterator itr = keySet().iterator(); itr.hasNext();) {
try {
//EventID will not be generated by this constructor
- EntryEventImpl event = EntryEventImpl.create(
+ @Released EntryEventImpl event = EntryEventImpl.create(
this, op, itr.next() /*key*/,
null/* newValue */, null/* callbackArg */, rgnEvent.isOriginRemote(),
rgnEvent.getDistributedMember());
@@ -9267,6 +9265,10 @@ public class LocalRegion extends AbstractRegion
class EventDispatcher implements Runnable
{
+ /**
+ * released by the release method
+ */
+ @Retained
InternalCacheEvent event;
EnumListenerEvent op;
@@ -10091,7 +10093,7 @@ public class LocalRegion extends AbstractRegion
// correct events will be delivered to any callbacks we have.
long startPut = CachePerfStats.getStatTime();
validateKey(key);
- EntryEventImpl event = EntryEventImpl.create(
+ @Released EntryEventImpl event = EntryEventImpl.create(
this, Operation.LOCAL_LOAD_CREATE, key, value,
callback, false, getMyId(), true);
try {
@@ -10186,7 +10188,7 @@ public class LocalRegion extends AbstractRegion
callbackArg = new GatewaySenderEventCallbackArgument(callbackArg);
}
- final EntryEventImpl event = EntryEventImpl.create(this, Operation.PUTALL_CREATE, null,
+ @Released final EntryEventImpl event = EntryEventImpl.create(this, Operation.PUTALL_CREATE, null,
null /* new value */, callbackArg,
false /* origin remote */, memberId.getDistributedMember(),
!skipCallbacks /* generateCallbacks */,
@@ -10222,7 +10224,7 @@ public class LocalRegion extends AbstractRegion
callbackArg = new GatewaySenderEventCallbackArgument(callbackArg);
}
- final EntryEventImpl event = EntryEventImpl.create(this, Operation.REMOVEALL_DESTROY, null,
+ @Released final EntryEventImpl event = EntryEventImpl.create(this, Operation.REMOVEALL_DESTROY, null,
null /* new value */, callbackArg,
false /* origin remote */, memberId.getDistributedMember(),
true /* generateCallbacks */,
@@ -10246,7 +10248,7 @@ public class LocalRegion extends AbstractRegion
long startPut = CachePerfStats.getStatTime();
// generateCallbacks == false
- EntryEventImpl event = EntryEventImpl.create(this, Operation.PUTALL_CREATE,
+ @Released EntryEventImpl event = EntryEventImpl.create(this, Operation.PUTALL_CREATE,
null, null, null, true, getMyId(), !skipCallbacks);
try {
DistributedPutAllOperation putAllOp = new DistributedPutAllOperation(event, map.size(), false);
@@ -10394,7 +10396,7 @@ public class LocalRegion extends AbstractRegion
Runnable r = new Runnable() {
public void run() {
int offset = 0;
- EntryEventImpl tagHolder = EntryEventImpl.createVersionTagHolder();
+ VersionTagHolder tagHolder = new VersionTagHolder();
while (iterator.hasNext()) {
stopper.checkCancelInProgress(null);
Map.Entry mapEntry = (Map.Entry)iterator.next();
@@ -10445,11 +10447,7 @@ public class LocalRegion extends AbstractRegion
}
if (!overwritten) {
- try {
- basicEntryPutAll(key, value, dpao, offset, tagHolder);
- } finally {
- tagHolder.release();
- }
+ basicEntryPutAll(key, value, dpao, offset, tagHolder);
}
// now we must check again since the cache may have closed during
// distribution (causing this process to not receive and queue the
@@ -10613,7 +10611,7 @@ public class LocalRegion extends AbstractRegion
Runnable r = new Runnable() {
public void run() {
int offset = 0;
- EntryEventImpl tagHolder = EntryEventImpl.createVersionTagHolder();
+ VersionTagHolder tagHolder = new VersionTagHolder();
while (iterator.hasNext()) {
stopper.checkCancelInProgress(null);
Object key;
@@ -10766,6 +10764,7 @@ public class LocalRegion extends AbstractRegion
// Create a dummy event for the PutAll operation. Always create a
// PutAll operation, even if there is no distribution, so that individual
// events can be tracked and handed off to callbacks in postPutAll
+ // No need for release since disallowOffHeapValues called.
final EntryEventImpl event = EntryEventImpl.create(this,
Operation.PUTALL_CREATE, null, null, callbackArg, true, getMyId());
@@ -10795,6 +10794,7 @@ public class LocalRegion extends AbstractRegion
// Create a dummy event for the removeAll operation. Always create a
// removeAll operation, even if there is no distribution, so that individual
// events can be tracked and handed off to callbacks in postRemoveAll
+ // No need for release since disallowOffHeapValues called.
final EntryEventImpl event = EntryEventImpl.create(this, Operation.REMOVEALL_DESTROY, null,
null/* newValue */, callbackArg, false, getMyId());
event.disallowOffHeapValues();
@@ -10826,7 +10826,7 @@ public class LocalRegion extends AbstractRegion
validateArguments(key, value, null);
// event is marked as a PUTALL_CREATE but if the entry exists it
// will be changed to a PUTALL_UPDATE later on.
- EntryEventImpl event = EntryEventImpl.createPutAllEvent(
+ @Released EntryEventImpl event = EntryEventImpl.createPutAllEvent(
putallOp, this, Operation.PUTALL_CREATE, key, value);
try {
@@ -10864,7 +10864,7 @@ public class LocalRegion extends AbstractRegion
assert op != null;
checkReadiness();
validateKey(key);
- EntryEventImpl event = EntryEventImpl.createRemoveAllEvent(op, this, key);
+ @Released EntryEventImpl event = EntryEventImpl.createRemoveAllEvent(op, this, key);
try {
if (tagHolder != null) {
event.setVersionTag(tagHolder.getVersionTag());
@@ -10918,13 +10918,17 @@ public class LocalRegion extends AbstractRegion
successfulKeys.add(key);
}
for (Iterator it=putallOp.eventIterator(); it.hasNext(); ) {
- EntryEventImpl event = (EntryEventImpl)it.next();
+ @Released EntryEventImpl event = (EntryEventImpl)it.next();
+ try {
if (successfulKeys.contains(event.getKey())) {
EnumListenerEvent op = event.getOperation().isCreate() ? EnumListenerEvent.AFTER_CREATE
: EnumListenerEvent.AFTER_UPDATE;
invokePutCallbacks(op, event, !event.callbacksInvoked() && !event.isPossibleDuplicate(),
false /* We must notify gateways inside RegionEntry lock, NOT here, to preserve the order of events sent by gateways for same key*/);
}
+ } finally {
+ event.release();
+ }
}
}
@@ -10941,11 +10945,15 @@ public class LocalRegion extends AbstractRegion
successfulKeys.add(key);
}
for (Iterator it=op.eventIterator(); it.hasNext(); ) {
- EntryEventImpl event = (EntryEventImpl)it.next();
+ @Released EntryEventImpl event = (EntryEventImpl)it.next();
+ try {
if (successfulKeys.contains(event.getKey())) {
invokeDestroyCallbacks(EnumListenerEvent.AFTER_DESTROY, event, !event.callbacksInvoked() && !event.isPossibleDuplicate(),
false /* We must notify gateways inside RegionEntry lock, NOT here, to preserve the order of events sent by gateways for same key*/);
}
+ } finally {
+ event.release();
+ }
}
}
@@ -11607,8 +11615,7 @@ public class LocalRegion extends AbstractRegion
}
public void destroyRecoveredEntry(Object key) {
- EntryEventImpl event = EntryEventImpl.create(
- this,
+ @Released EntryEventImpl event = EntryEventImpl.create(this,
Operation.LOCAL_DESTROY, key, null, null, false, getMyId(), false);
try {
event.inhibitCacheListenerNotification(true);
@@ -12337,7 +12344,7 @@ public class LocalRegion extends AbstractRegion
// was modified to call the other EntryEventImpl constructor so that
// an id will be generated by default. Null was passed in anyway.
// generate EventID
- EntryEventImpl event = EntryEventImpl.create(
+ @Released EntryEventImpl event = EntryEventImpl.create(
this, Operation.PUT_IF_ABSENT, key,
value, callbackArgument, false, getMyId());
final Object oldValue = null;
@@ -12398,7 +12405,7 @@ public class LocalRegion extends AbstractRegion
if (value == null) {
value = Token.INVALID;
}
- EntryEventImpl event = EntryEventImpl.create(this,
+ @Released EntryEventImpl event = EntryEventImpl.create(this,
Operation.REMOVE,
key,
null, // newValue
@@ -12455,7 +12462,7 @@ public class LocalRegion extends AbstractRegion
validateArguments(key, newValue, callbackArg);
checkReadiness();
checkForLimitedOrNoAccess();
- EntryEventImpl event = EntryEventImpl.create(this,
+ @Released EntryEventImpl event = EntryEventImpl.create(this,
Operation.REPLACE,
key,
newValue,
@@ -12523,7 +12530,7 @@ public class LocalRegion extends AbstractRegion
validateArguments(key, value, callbackArg);
checkReadiness();
checkForLimitedOrNoAccess();
- EntryEventImpl event = EntryEventImpl.create(this,
+ @Released EntryEventImpl event = EntryEventImpl.create(this,
Operation.REPLACE,
key,
value,
@@ -12571,7 +12578,7 @@ public class LocalRegion extends AbstractRegion
callbackArg = new GatewaySenderEventCallbackArgument(callbackArg);
}
}
- final EntryEventImpl event = EntryEventImpl.create(this, Operation.PUT_IF_ABSENT, key,
+ @Released final EntryEventImpl event = EntryEventImpl.create(this, Operation.PUT_IF_ABSENT, key,
null /* new value */, callbackArg,
false /* origin remote */, client.getDistributedMember(),
true /* generateCallbacks */,
@@ -12648,7 +12655,7 @@ public class LocalRegion extends AbstractRegion
callbackArg = new GatewaySenderEventCallbackArgument(callbackArg);
}
}
- final EntryEventImpl event = EntryEventImpl.create(this, Operation.REPLACE, key,
+ @Released final EntryEventImpl event = EntryEventImpl.create(this, Operation.REPLACE, key,
null /* new value */, callbackArg,
false /* origin remote */, client.getDistributedMember(),
true /* generateCallbacks */,
@@ -12705,7 +12712,7 @@ public class LocalRegion extends AbstractRegion
callbackArg = new GatewaySenderEventCallbackArgument(callbackArg);
}
}
- final EntryEventImpl event = EntryEventImpl.create(this, Operation.REPLACE, key,
+ @Released final EntryEventImpl event = EntryEventImpl.create(this, Operation.REPLACE, key,
null /* new value */, callbackArg,
false /* origin remote */, client.getDistributedMember(),
true /* generateCallbacks */,
@@ -12771,7 +12778,7 @@ public class LocalRegion extends AbstractRegion
}
// Create an event and put the entry
- final EntryEventImpl event = EntryEventImpl.create(this, Operation.REMOVE, key,
+ @Released final EntryEventImpl event = EntryEventImpl.create(this, Operation.REMOVE, key,
null /* new value */, callbackArg,
false /* origin remote */, memberId.getDistributedMember(),
true /* generateCallbacks */,