You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/05/11 16:29:13 UTC
[geode] branch develop updated: GEODE-5172: refactor txApplyPut to
reuse RegionMapPut (#1917)
This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 6839dca GEODE-5172: refactor txApplyPut to reuse RegionMapPut (#1917)
6839dca is described below
commit 6839dca5b3c63f2a068998d890ee56cf3c21e5ef
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Fri May 11 09:29:09 2018 -0700
GEODE-5172: refactor txApplyPut to reuse RegionMapPut (#1917)
AbstractRegionMapPut has been introduced and has the common code used for both a non-tx put (RegionMapPut) and a transaction put being committed (RegionMapCommitPut).
RegionMapCommitPut is used by txApplyPut.
---
.../apache/geode/cache/DynamicRegionFactory.java | 2 +-
.../java/org/apache/geode/cache/TransactionId.java | 3 +
.../geode/internal/cache/AbstractRegion.java | 3 +-
.../geode/internal/cache/AbstractRegionMap.java | 406 +-----------
.../geode/internal/cache/DistributedRegion.java | 5 +
.../geode/internal/cache/EntryEventImpl.java | 14 +-
.../org/apache/geode/internal/cache/HARegion.java | 2 +-
.../geode/internal/cache/InternalRegion.java | 15 +
.../apache/geode/internal/cache/LocalRegion.java | 14 +-
.../geode/internal/cache/PartitionedRegion.java | 2 +-
.../java/org/apache/geode/internal/cache/TXId.java | 1 +
.../apache/geode/internal/cache/TXRmtEvent.java | 10 +-
.../internal/cache/map/AbstractRegionMapPut.java | 321 ++++++++++
.../geode/internal/cache/map/FocusedRegionMap.java | 9 +
.../internal/cache/map/RegionMapCommitPut.java | 297 +++++++++
.../geode/internal/cache/map/RegionMapPut.java | 599 ++++++------------
.../wan/parallel/ParallelGatewaySenderQueue.java | 2 +-
.../cache/wan/serial/SerialGatewaySenderQueue.java | 2 +-
.../internal/cache/AbstractRegionMapTest.java | 16 +-
.../cache/map/AbstractRegionMapPutTest.java | 352 +++++++++++
.../internal/cache/map/RegionMapCommitPutTest.java | 704 +++++++++++++++++++++
.../geode/internal/cache/map/RegionMapPutTest.java | 156 ++++-
22 files changed, 2100 insertions(+), 835 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/DynamicRegionFactory.java b/geode-core/src/main/java/org/apache/geode/cache/DynamicRegionFactory.java
index 7053b73..a4c4204 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/DynamicRegionFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/DynamicRegionFactory.java
@@ -887,7 +887,7 @@ public abstract class DynamicRegionFactory {
// while internal, its contents should be communicated with bridge clients
@Override
- protected boolean shouldNotifyBridgeClients() {
+ public boolean shouldNotifyBridgeClients() {
return getCache().getCacheServers().size() > 0;
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/TransactionId.java b/geode-core/src/main/java/org/apache/geode/cache/TransactionId.java
index 6f55137..9fc8680 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/TransactionId.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/TransactionId.java
@@ -17,6 +17,8 @@ package org.apache.geode.cache;
import java.io.Externalizable;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+
/**
* The TransactionId interface is a "marker" interface that represents a unique GemFire transaction.
*
@@ -27,4 +29,5 @@ import java.io.Externalizable;
* @see CacheTransactionManager#getTransactionId
*/
public interface TransactionId extends Externalizable {
+ InternalDistributedMember getMemberId();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java
index bb527e1..35ff366 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java
@@ -1580,7 +1580,8 @@ public abstract class AbstractRegion implements InternalRegion, AttributesMutato
*
* @since GemFire 5.0
*/
- boolean isAllEvents() {
+ @Override
+ public boolean isAllEvents() {
return getDataPolicy().withReplication()
|| getSubscriptionAttributes().getInterestPolicy().isAll();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 0e39632..50b5371 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -54,6 +54,7 @@ import org.apache.geode.internal.cache.eviction.EvictableEntry;
import org.apache.geode.internal.cache.eviction.EvictionController;
import org.apache.geode.internal.cache.map.CacheModificationLock;
import org.apache.geode.internal.cache.map.FocusedRegionMap;
+import org.apache.geode.internal.cache.map.RegionMapCommitPut;
import org.apache.geode.internal.cache.map.RegionMapDestroy;
import org.apache.geode.internal.cache.map.RegionMapPut;
import org.apache.geode.internal.cache.persistence.DiskRegionView;
@@ -334,7 +335,8 @@ public abstract class AbstractRegionMap
}
}
- protected void incEntryCount(int delta) {
+ @Override
+ public void incEntryCount(int delta) {
LocalRegion lr = _getOwner();
if (lr != null) {
CachePerfStats stats = lr.getCachePerfStats();
@@ -1088,7 +1090,7 @@ public abstract class AbstractRegionMap
}
boolean clearOccured = false;
try {
- processAndGenerateTXVersionTag(owner, callbackEvent, re, txEntryState);
+ processAndGenerateTXVersionTag(callbackEvent, re, txEntryState);
if (inTokenMode) {
if (oldValue == Token.TOMBSTONE) {
owner.unscheduleTombstone(re);
@@ -1177,7 +1179,7 @@ public abstract class AbstractRegionMap
if (owner.isUsedForPartitionedRegionBucket()) {
txHandleWANEvent(owner, callbackEvent, txEntryState);
}
- processAndGenerateTXVersionTag(owner, callbackEvent, oldRe, txEntryState);
+ processAndGenerateTXVersionTag(callbackEvent, oldRe, txEntryState);
if (invokeCallbacks) {
switchEventOwnerAndOriginRemote(callbackEvent, hasRemoteOrigin);
pendingCallbacks.add(callbackEvent);
@@ -1236,7 +1238,7 @@ public abstract class AbstractRegionMap
if (owner.isUsedForPartitionedRegionBucket()) {
txHandleWANEvent(owner, callbackEvent, txEntryState);
}
- processAndGenerateTXVersionTag(owner, callbackEvent, newRe, txEntryState);
+ processAndGenerateTXVersionTag(callbackEvent, newRe, txEntryState);
if (invokeCallbacks) {
switchEventOwnerAndOriginRemote(callbackEvent, hasRemoteOrigin);
pendingCallbacks.add(callbackEvent);
@@ -1880,7 +1882,7 @@ public abstract class AbstractRegionMap
aCallbackArgument);
}
oldRe.setValueResultOfSearch(false);
- processAndGenerateTXVersionTag(owner, callbackEvent, oldRe, txEntryState);
+ processAndGenerateTXVersionTag(callbackEvent, oldRe, txEntryState);
boolean clearOccured = false;
try {
oldRe.setValue(owner, oldRe.prepareValueForCache(owner, newValue, true));
@@ -1926,7 +1928,7 @@ public abstract class AbstractRegionMap
newRe.setValueResultOfSearch(false);
boolean clearOccured = false;
try {
- processAndGenerateTXVersionTag(owner, callbackEvent, newRe, txEntryState);
+ processAndGenerateTXVersionTag(callbackEvent, newRe, txEntryState);
newRe.setValue(owner, newRe.prepareValueForCache(owner, newValue, true));
EntryLogger.logTXInvalidate(_getOwnerObject(), key);
owner.updateSizeOnCreate(newRe.getKey(), 0);// we are putting in a new invalidated
@@ -1989,7 +1991,7 @@ public abstract class AbstractRegionMap
txEvent.addInvalidate(owner, re, re.getKey(), newValue, aCallbackArgument);
}
re.setValueResultOfSearch(false);
- processAndGenerateTXVersionTag(owner, callbackEvent, re, txEntryState);
+ processAndGenerateTXVersionTag(callbackEvent, re, txEntryState);
boolean clearOccured = false;
try {
re.setValue(owner, re.prepareValueForCache(owner, newValue, true));
@@ -2102,389 +2104,24 @@ public abstract class AbstractRegionMap
}
@Override
- public void txApplyPut(Operation p_putOp, Object key, Object nv, boolean didDestroy,
+ public void txApplyPut(Operation putOp, Object key, Object nv, boolean didDestroy,
TransactionId txId, TXRmtEvent txEvent, EventID eventId, Object aCallbackArgument,
List<EntryEventImpl> pendingCallbacks, FilterRoutingInfo filterRoutingInfo,
ClientProxyMembershipID bridgeContext, TXEntryState txEntryState, VersionTag versionTag,
long tailKey) {
assert pendingCallbacks != null;
final LocalRegion owner = _getOwner();
- if (owner == null) {
- // "fix" for bug 32440
- Assert.assertTrue(false, "The owner for RegionMap " + this + " is null");
- }
- final boolean hasRemoteOrigin = !((TXId) txId).getMemberId().equals(owner.getMyId());
- final boolean isTXHost = txEntryState != null;
- final boolean isClientTXOriginator = owner.getCache().isClient() && !hasRemoteOrigin;
- final boolean isRegionReady = owner.isInitialized();
- boolean onlyExisting = false;
- if (hasRemoteOrigin && !isTXHost && !isClientTXOriginator) {
- // If we are not a mirror then only apply the update to existing
- // entries
- //
- // If we are a mirror then then only apply the update to
- // existing entries when the operation is an update and we
- // are initialized.
- // Otherwise use the standard create/update logic
- if (!owner.isAllEvents() || (!p_putOp.isCreate() && isRegionReady)) {
- onlyExisting = true;
- }
- }
- TxApplyPutContext txApplyPutContext = null;
@Released
final EntryEventImpl callbackEvent =
- createTransactionCallbackEvent(owner, p_putOp, key, nv, txId, txEvent, eventId,
+ createTransactionCallbackEvent(owner, putOp, key, nv, txId, txEvent, eventId,
aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
- boolean invokeCallbacks = shouldCreateCallbackEvent(owner, isRegionReady);
- try {
- Object newValue = nv;
- if (logger.isDebugEnabled()) {
- logger.debug("txApplyPut callbackEvent={}", callbackEvent);
- }
-
- if (owner.isUsedForPartitionedRegionBucket()) {
- newValue = EntryEventImpl.getCachedDeserializable(nv, callbackEvent);
- txHandleWANEvent(owner, callbackEvent, txEntryState);
- }
-
- txApplyPutContext = new TxApplyPutContext(false, false, p_putOp, callbackEvent, onlyExisting,
- newValue, didDestroy, txEvent, aCallbackArgument, pendingCallbacks, txEntryState,
- hasRemoteOrigin, invokeCallbacks);
-
- // Fix for Bug #44431. We do NOT want to update the region and wait
- // later for index INIT as region.clear() can cause inconsistency if
- // happened in parallel as it also does index INIT.
- IndexManager oqlIndexManager = owner.getIndexManager();
- if (oqlIndexManager != null) {
- oqlIndexManager.waitForIndexInit();
- }
- try {
- do {
- txApplyPutContext.setRegionEntry(null);
- if (!txApplyPutFindExistingEntry(txApplyPutContext, onlyExisting)) {
- break;
- }
- txApplyPutCreateNewEntryIfNeeded(txApplyPutContext);
- } while (!addRegionEntryToMapAndDoTxPut(txApplyPutContext));
-
- if (onlyExisting && !txApplyPutContext.isOpCompleted()) {
- if (didDestroy) {
- owner.txApplyPutHandleDidDestroy(key);
- }
- if (invokeCallbacks) {
- callbackEvent.makeUpdate();
- owner.invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, callbackEvent, false);
- }
- }
- if (owner.getConcurrencyChecksEnabled() && txEntryState != null && callbackEvent != null) {
- txEntryState.setVersionTag(callbackEvent.getVersionTag());
- }
- } catch (DiskAccessException dae) {
- owner.handleDiskAccessException(dae);
- throw dae;
- } finally {
- if (oqlIndexManager != null) {
- oqlIndexManager.countDownIndexUpdaters();
- }
- }
- } finally {
- if (txApplyPutContext == null || !txApplyPutContext.isCallbackEventInPending())
- callbackEvent.release();
- }
- }
-
- private boolean addRegionEntryToMapAndDoTxPut(TxApplyPutContext txApplyPutContext) {
- final RegionEntry regionEntry = txApplyPutContext.getRegionEntry();
- synchronized (regionEntry) {
- if (txApplyPutContext.isCreate()) {
- RegionEntry oldRe = putEntryIfAbsent(txApplyPutContext.getEvent().getKey(), regionEntry);
- if (oldRe != null) {
- txApplyPutContext.setCreate(false);
- txApplyPutContext.setRegionEntry(oldRe);
- }
- }
- return doTxPutOnRegionEntryInMap(txApplyPutContext);
- }
- }
-
- private boolean doTxPutOnRegionEntryInMap(TxApplyPutContext txApplyPutContext) {
- final RegionEntry regionEntry = txApplyPutContext.getRegionEntry();
-
- synchronized (regionEntry) {
- if (isRegionEntryRemoved(txApplyPutContext)) {
- return false;
- }
- try {
- txApplyPutRegionEntry(txApplyPutContext);
- return true;
- } finally {
- if (!txApplyPutContext.isOpCompleted() && txApplyPutContext.isCreate()) {
- removeEntry(txApplyPutContext.getEvent().getKey(), txApplyPutContext.getRegionEntry(),
- false);
- }
- }
- }
- }
-
- private boolean isRegionEntryRemoved(TxApplyPutContext txApplyPutContext) {
- final RegionEntry regionEntry = txApplyPutContext.getRegionEntry();
- if (regionEntry.isRemovedPhase2()) {
- _getOwner().getCachePerfStats().incRetries();
- getEntryMap().remove(txApplyPutContext.getEvent().getKey(), regionEntry);
- return true;
- } else {
- return false;
- }
- }
-
- private void txApplyPutCreateNewEntryIfNeeded(TxApplyPutContext txApplyPutContext) {
- txApplyPutContext.setCreate(txApplyPutContext.getRegionEntry() == null);
- if (txApplyPutContext.isCreate()) {
- final Object key = txApplyPutContext.getEvent().getKey();
- RegionEntry newEntry = getEntryFactory().createEntry(_getOwner(), key, Token.REMOVED_PHASE1);
- txApplyPutContext.setRegionEntry(newEntry);
- }
- }
-
- private boolean txApplyPutFindExistingEntry(TxApplyPutContext txApplyPutContext,
- boolean onlyExisting) {
- RegionEntry re = getEntry(txApplyPutContext.getEvent());
- if (onlyExisting) {
- if (re == null || re.isRemoved()) {
- return false;
- }
- }
- txApplyPutContext.setRegionEntry(re);
- return true;
- }
-
- private void txApplyPutRegionEntry(TxApplyPutContext txApplyPutContext) {
- final Object key = txApplyPutContext.getEvent().getKey();
- final boolean didDestroy = txApplyPutContext.isDidDestroy();
- final TXRmtEvent txEvent = txApplyPutContext.getTxEvent();
- final Object callbackArgument = txApplyPutContext.getCallbackArgument();
- final List<EntryEventImpl> pendingCallbacks = txApplyPutContext.getPendingCallbacks();
- final TXEntryState txEntryState = txApplyPutContext.getTxEntryState();
- final LocalRegion owner = _getOwner();
- final Object newValue = txApplyPutContext.getNewValue();
- final boolean hasRemoteOrigin = txApplyPutContext.isHasRemoteOrigin();
- final EntryEventImpl callbackEvent = txApplyPutContext.getEvent();
- final boolean invokeCallbacks = txApplyPutContext.isInvokeCallbacks();
- final RegionEntry regionEntry = txApplyPutContext.getRegionEntry();
- final boolean onlyExisting = txApplyPutContext.isOnlyExisting();
- final boolean isCreate = txApplyPutContext.isCreate();
-
- if (isCreate) {
- txApplyPutContext.makeCreate();
- } else {
- if (onlyExisting && regionEntry.isRemoved()) {
- return;
- }
- txApplyPutContext.setOpCompleted(true);
- if (!regionEntry.isRemoved()) {
- txApplyPutContext.makeUpdate();
- }
- }
- final int oldSize = isCreate ? 0 : owner.calculateRegionEntryValueSize(regionEntry);
- final boolean oldIsRemoved = isCreate ? true : regionEntry.isDestroyedOrRemoved();
- if (callbackEvent != null) {
- callbackEvent.setRegionEntry(regionEntry);
- callbackEvent.setOldValue(isCreate ? null : regionEntry.getValueInVM(owner));
- }
- boolean clearOccured = false;
- if (owner.getIndexMaintenanceSynchronous()) {
- regionEntry.setUpdateInProgress(true);
- }
- try {
- txRemoveOldIndexEntry(txApplyPutContext.getPutOp(), regionEntry);
- if (didDestroy) {
- regionEntry.txDidDestroy(owner.cacheTimeMillis());
- }
- if (txEvent != null) {
- txEvent.addPut(txApplyPutContext.getPutOp(), owner, regionEntry, regionEntry.getKey(),
- newValue, callbackArgument);
- }
- regionEntry.setValueResultOfSearch(txApplyPutContext.getPutOp().isNetSearch());
- try {
- processAndGenerateTXVersionTag(owner, callbackEvent, regionEntry, txEntryState);
- {
- boolean wasTombstone = regionEntry.isTombstone();
- regionEntry.setValue(owner, regionEntry.prepareValueForCache(owner, newValue,
- callbackEvent, !txApplyPutContext.getPutOp().isCreate()));
- if (wasTombstone) {
- owner.unscheduleTombstone(regionEntry);
- }
- }
- if (txApplyPutContext.getPutOp().isCreate()) {
- owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(regionEntry));
- } else if (txApplyPutContext.getPutOp().isUpdate()) {
- owner.updateSizeOnPut(key, oldSize, owner.calculateRegionEntryValueSize(regionEntry));
- }
- } catch (RegionClearedException rce) {
- clearOccured = true;
- }
- {
- long lastMod = owner.cacheTimeMillis();
- EntryLogger.logTXPut(_getOwnerObject(), key, newValue);
- regionEntry.updateStatsForPut(lastMod, lastMod);
- owner.txApplyPutPart2(regionEntry, regionEntry.getKey(), lastMod, isCreate, didDestroy,
- clearOccured);
- }
- } finally {
- if (regionEntry != null && owner.getIndexMaintenanceSynchronous()) {
- regionEntry.setUpdateInProgress(false);
- }
- }
- if (isCreate) {
- txApplyPutContext.setOpCompleted(true);
- }
- if (invokeCallbacks) {
- if (isCreate) {
- callbackEvent.makeCreate();
- callbackEvent.setOldValue(null);
- } else {
- if (!oldIsRemoved) {
- callbackEvent.makeUpdate();
- }
- }
- switchEventOwnerAndOriginRemote(callbackEvent, hasRemoteOrigin);
- pendingCallbacks.add(callbackEvent);
- txApplyPutContext.setCallbackEventInPending(true);
- }
- if (!clearOccured) {
- if (isCreate) {
- lruEntryCreate(regionEntry);
- incEntryCount(1);
- } else {
- lruEntryUpdate(regionEntry);
- }
- }
- }
-
- private static class TxApplyPutContext {
- private boolean opCompleted;
- private boolean callbackEventInPending;
- private Operation putOp;
- /**
- * true if the regionEntry is one that we created;
- * false if the regionEntry was an existing one.
- */
- private boolean create;
- private RegionEntry regionEntry;
- private final EntryEventImpl event;
- private final boolean onlyExisting;
- private final Object newValue;
- private final boolean didDestroy;
- private final TXRmtEvent txEvent;
- private final Object callbackArgument;
- private final List<EntryEventImpl> pendingCallbacks;
- private final TXEntryState txEntryState;
- private final boolean hasRemoteOrigin;
- private final boolean invokeCallbacks;
-
- public TxApplyPutContext(boolean opCompleted, boolean callbackEventInPending, Operation putOp,
- EntryEventImpl event, boolean onlyExisting, Object newValue, boolean didDestroy,
- TXRmtEvent txEvent, Object aCallbackArgument, List<EntryEventImpl> pendingCallbacks,
- TXEntryState txEntryState, boolean hasRemoteOrigin, boolean invokeCallbacks) {
- this.opCompleted = opCompleted;
- this.callbackEventInPending = callbackEventInPending;
- this.putOp = putOp;
- this.regionEntry = null;
- this.event = event;
- this.onlyExisting = onlyExisting;
- this.newValue = newValue;
- this.didDestroy = didDestroy;
- this.txEvent = txEvent;
- this.callbackArgument = aCallbackArgument;
- this.pendingCallbacks = pendingCallbacks;
- this.txEntryState = txEntryState;
- this.hasRemoteOrigin = hasRemoteOrigin;
- this.invokeCallbacks = invokeCallbacks;
- }
-
- public boolean isOpCompleted() {
- return opCompleted;
- }
-
- public void setOpCompleted(boolean opCompleted) {
- this.opCompleted = opCompleted;
- }
-
- public boolean isCallbackEventInPending() {
- return callbackEventInPending;
- }
-
- public void setCallbackEventInPending(boolean callbackEventInPending) {
- this.callbackEventInPending = callbackEventInPending;
- }
-
- public Operation getPutOp() {
- return putOp;
- }
-
- public boolean isCreate() {
- return create;
- }
-
- public void setCreate(boolean create) {
- this.create = create;
- }
-
- public RegionEntry getRegionEntry() {
- return regionEntry;
- }
-
- public void setRegionEntry(RegionEntry regionEntry) {
- this.regionEntry = regionEntry;
- }
-
- public EntryEventImpl getEvent() {
- return event;
- }
-
- public boolean isOnlyExisting() {
- return onlyExisting;
- }
-
- public Object getNewValue() {
- return newValue;
- }
-
- public boolean isDidDestroy() {
- return didDestroy;
- }
-
- public TXRmtEvent getTxEvent() {
- return txEvent;
- }
-
- public Object getCallbackArgument() {
- return callbackArgument;
- }
-
- public List<EntryEventImpl> getPendingCallbacks() {
- return pendingCallbacks;
- }
-
- public TXEntryState getTxEntryState() {
- return txEntryState;
- }
-
- public boolean isHasRemoteOrigin() {
- return hasRemoteOrigin;
- }
-
- public boolean isInvokeCallbacks() {
- return invokeCallbacks;
- }
-
- public void makeCreate() {
- putOp = putOp.getCorrespondingCreateOp();
- }
-
- public void makeUpdate() {
- putOp = putOp.getCorrespondingUpdateOp();
+ if (owner.isUsedForPartitionedRegionBucket()) {
+ callbackEvent.makeSerializedNewValue();
+ txHandleWANEvent(owner, callbackEvent, txEntryState);
}
+ RegionMapCommitPut commitPut = new RegionMapCommitPut(this, owner, callbackEvent, putOp,
+ didDestroy, txId, txEvent, pendingCallbacks, txEntryState);
+ commitPut.put();
}
private void txHandleWANEvent(final LocalRegion owner, EntryEventImpl callbackEvent,
@@ -2498,8 +2135,10 @@ public abstract class AbstractRegionMap
/**
* called from txApply* methods to process and generate versionTags.
*/
- private void processAndGenerateTXVersionTag(final LocalRegion owner, EntryEventImpl callbackEvent,
- RegionEntry re, TXEntryState txEntryState) {
+ @Override
+ public void processAndGenerateTXVersionTag(EntryEventImpl callbackEvent, RegionEntry re,
+ TXEntryState txEntryState) {
+ final LocalRegion owner = _getOwner();
if (shouldPerformConcurrencyChecks(owner, callbackEvent)) {
try {
if (txEntryState != null && txEntryState.getRemoteVersionTag() != null) {
@@ -2557,7 +2196,8 @@ public abstract class AbstractRegionMap
*
* @param entry the RegionEntry that contains the value prior to applying the op
*/
- private void txRemoveOldIndexEntry(Operation op, RegionEntry entry) {
+ @Override
+ public void txRemoveOldIndexEntry(Operation op, RegionEntry entry) {
if ((op.isUpdate() && !entry.isInvalid()) || op.isInvalidate() || op.isDestroy()) {
IndexManager idxManager = _getOwner().getIndexManager();
if (idxManager != null) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index e8d19de..bca76ce 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -3848,4 +3848,9 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
return op.getLatestLastAccessTime();
}
+ @Override
+ public Set adviseNetWrite() {
+ return getCacheDistributionAdvisor().adviseNetWrite();
+ }
+
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index 206338a..5d010ca 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -698,8 +698,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
return this.context;
}
- // INTERNAL
- boolean isLocalInvalid() {
+ public boolean isLocalInvalid() {
return testEventFlag(EventFlags.FLAG_LOCAL_INVALID);
}
@@ -2895,4 +2894,15 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
public boolean isOldValueOffHeap() {
return isOffHeapReference(this.oldValue);
}
+
+ /**
+ * If region is currently a bucket
+ * then change it to be the partitioned region that owns that bucket.
+ * Otherwise do nothing.
+ */
+ public void changeRegionToBucketsOwner() {
+ if (getRegion().isUsedForPartitionedRegionBucket()) {
+ setRegion(getRegion().getPartitionedRegion());
+ }
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
index c24b17e..cafc43f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
@@ -289,7 +289,7 @@ public class HARegion extends DistributedRegion {
}
@Override
- protected boolean shouldNotifyBridgeClients() {
+ public boolean shouldNotifyBridgeClients() {
return false;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
index 6b3f4a0..1a311a4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -351,6 +351,11 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo
ClientProxyMembershipID bridgeContext, TXEntryState txEntryState, VersionTag versionTag,
long tailKey);
+ void txApplyPutPart2(RegionEntry regionEntry, Object key, long lastModified, boolean isCreate,
+ boolean didDestroy, boolean clearConflict);
+
+ void txApplyPutHandleDidDestroy(Object key);
+
void handleReliableDistribution(Set successfulRecipients);
StoppableCountDownLatch getInitializationLatchBeforeGetInitialImage();
@@ -381,5 +386,15 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo
RegionIdleExpiryTask getRegionIdleExpiryTask();
+ boolean isAllEvents();
+
+ boolean shouldDispatchListenerEvent();
+
+ boolean shouldNotifyBridgeClients();
+
+ default Set adviseNetWrite() {
+ return null;
+ }
+
EvictionController getEvictionController();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index fe12140..6f1c6b4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -5056,7 +5056,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* Called by AbstractRegionMap txApplyPut when it was told a destroy was also done
* by the transaction.
*/
- void txApplyPutHandleDidDestroy(Object key) {
+ @Override
+ public void txApplyPutHandleDidDestroy(Object key) {
if (this.entryUserAttributes != null) {
this.entryUserAttributes.remove(key);
}
@@ -5120,8 +5121,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
getCachePerfStats().endPut(startPut, false);
}
- void txApplyPutPart2(RegionEntry regionEntry, Object key, long lastModified, boolean isCreate,
- boolean didDestroy, boolean clearConflict) {
+ @Override
+ public void txApplyPutPart2(RegionEntry regionEntry, Object key, long lastModified,
+ boolean isCreate, boolean didDestroy, boolean clearConflict) {
if (this.testCallable != null) {
Operation op = isCreate ? Operation.CREATE : Operation.UPDATE;
this.testCallable.call(this, op, regionEntry);
@@ -10574,7 +10576,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*
* @return true only if it's cache has bridge servers and this is nt a meta region
*/
- protected boolean shouldNotifyBridgeClients() {
+ @Override
+ public boolean shouldNotifyBridgeClients() {
return !this.cache.getCacheServers().isEmpty() && !this.isUsedForPartitionedRegionAdmin
&& !this.isUsedForPartitionedRegionBucket && !this.isUsedForMetaRegion;
}
@@ -10584,7 +10587,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*
* @return true only if this region has a Listener
*/
- boolean shouldDispatchListenerEvent() {
+ @Override
+ public boolean shouldDispatchListenerEvent() {
return hasListener();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index f3d1f25..99aa3f0 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -8139,7 +8139,7 @@ public class PartitionedRegion extends LocalRegion
}
@Override
- protected boolean shouldNotifyBridgeClients() {
+ public boolean shouldNotifyBridgeClients() {
return true;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXId.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXId.java
index 8b00cff..61ff743 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXId.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXId.java
@@ -55,6 +55,7 @@ public class TXId extends ExternalizableDSFID implements TransactionId {
this.uniqId = uniqId;
}
+ @Override
public InternalDistributedMember getMemberId() {
return this.memberId;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRmtEvent.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRmtEvent.java
index e924aed..3b9a52c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRmtEvent.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRmtEvent.java
@@ -110,12 +110,12 @@ public class TXRmtEvent implements TransactionEvent {
}
@Retained
- private EntryEventImpl createEvent(LocalRegion r, Operation op, RegionEntry re, Object key,
+ private EntryEventImpl createEvent(InternalRegion r, Operation op, RegionEntry re, Object key,
Object newValue, Object aCallbackArgument) {
DistributedMember originator = ((TXId) this.txId).getMemberId();
// TODO:ASIF :EventID will not be generated with this constructor . Check if
// this is correct
- LocalRegion eventRegion = r;
+ InternalRegion eventRegion = r;
if (r.isUsedForPartitionedRegionBucket()) {
eventRegion = r.getPartitionedRegion();
}
@@ -140,16 +140,16 @@ public class TXRmtEvent implements TransactionEvent {
}
}
- void addDestroy(LocalRegion r, RegionEntry re, Object key, Object aCallbackArgument) {
+ public void addDestroy(InternalRegion r, RegionEntry re, Object key, Object aCallbackArgument) {
addEvent(createEvent(r, Operation.DESTROY, re, key, null, aCallbackArgument));
}
- void addInvalidate(LocalRegion r, RegionEntry re, Object key, Object newValue,
+ public void addInvalidate(InternalRegion r, RegionEntry re, Object key, Object newValue,
Object aCallbackArgument) {
addEvent(createEvent(r, Operation.INVALIDATE, re, key, newValue, aCallbackArgument));
}
- void addPut(Operation putOp, LocalRegion r, RegionEntry re, Object key, Object newValue,
+ public void addPut(Operation putOp, InternalRegion r, RegionEntry re, Object key, Object newValue,
Object aCallbackArgument) {
addEvent(createEvent(r, putOp, re, key, newValue, aCallbackArgument));
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/map/AbstractRegionMapPut.java b/geode-core/src/main/java/org/apache/geode/internal/cache/map/AbstractRegionMapPut.java
new file mode 100644
index 0000000..174ee06
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/map/AbstractRegionMapPut.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.map;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.DiskAccessException;
+import org.apache.geode.cache.query.internal.index.IndexManager;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.RegionEntry;
+import org.apache.geode.internal.cache.Token;
+import org.apache.geode.internal.logging.LogService;
+
+public abstract class AbstractRegionMapPut {
+ private static final Logger logger = LogService.getLogger();
+
+ private final InternalRegion owner;
+ private final FocusedRegionMap focusedRegionMap;
+ private final EntryEventImpl event;
+ private final boolean ownerInitialized;
+
+ private boolean clearOccurred;
+ private long lastModifiedTime;
+ private RegionEntry regionEntry;
+ private boolean create;
+ private boolean completed;
+
+ public AbstractRegionMapPut(FocusedRegionMap focusedRegionMap, InternalRegion owner,
+ EntryEventImpl event) {
+ this.focusedRegionMap = focusedRegionMap;
+ this.owner = owner;
+ this.event = event;
+ this.ownerInitialized = owner.isInitialized();
+ }
+
+ protected boolean isOwnerInitialized() {
+ return ownerInitialized;
+ }
+
+ protected boolean isClearOccurred() {
+ return clearOccurred;
+ }
+
+ protected void setClearOccurred(boolean v) {
+ clearOccurred = v;
+ }
+
+ protected long getLastModifiedTime() {
+ return lastModifiedTime;
+ }
+
+ protected void setLastModifiedTime(long v) {
+ lastModifiedTime = v;
+ }
+
+ protected RegionEntry getRegionEntry() {
+ return regionEntry;
+ }
+
+ private void setRegionEntry(RegionEntry v) {
+ regionEntry = v;
+ }
+
+ /**
+ * @return true if put created a new entry; false if it updated an existing one.
+ */
+ protected boolean isCreate() {
+ return create;
+ }
+
+ private void setCreate(boolean v) {
+ create = v;
+ }
+
+ protected EntryEventImpl getEvent() {
+ return event;
+ }
+
+ protected boolean isCompleted() {
+ return completed;
+ }
+
+ private void setCompleted(boolean b) {
+ completed = b;
+ }
+
+ protected InternalRegion getOwner() {
+ return owner;
+ }
+
+ protected FocusedRegionMap getRegionMap() {
+ return focusedRegionMap;
+ }
+
+ protected abstract boolean isOnlyExisting();
+
+ protected abstract boolean entryExists(RegionEntry regionEntry);
+
+ protected abstract void serializeNewValueIfNeeded();
+
+ protected abstract void runWhileLockedForCacheModification(Runnable r);
+
+ protected abstract void setOldValueForDelta();
+
+ protected abstract void setOldValueInEvent();
+
+ protected abstract void unsetOldValueForDelta();
+
+ protected abstract boolean checkPreconditions();
+
+ protected abstract void invokeCacheWriter();
+
+ protected abstract void createOrUpdateEntry();
+
+ /**
+ * Returns true if getRegionEntry should be removed from the map
+ * because the put did not complete.
+ * Precondition: isCreate()
+ */
+ protected abstract boolean shouldCreatedEntryBeRemoved();
+
+ /**
+ * Called after the put is done but before setCompleted(true)
+ * is called.
+ * Note that the RegionEntry that was modified by the put
+ * is still synchronized when this is called.
+ */
+ protected abstract void doBeforeCompletionActions();
+
+ /**
+ * Called after the put is done.
+ * Always called, even if the put failed.
+ * Note that the RegionEntry that was modified by the put
+ * is no longer synchronized when this is called.
+ */
+ protected abstract void doAfterCompletionActions();
+
+ /**
+ * @return regionEntry if put completed, otherwise null.
+ */
+ public RegionEntry put() {
+ serializeNewValueIfNeeded();
+ runWhileLockedForCacheModification(this::doPut);
+ if (isCompleted()) {
+ return getRegionEntry();
+ } else {
+ return null;
+ }
+ }
+
+ private void doPut() {
+ try {
+ doWithIndexInUpdateMode(this::doPutRetryingIfNeeded);
+ } catch (DiskAccessException dae) {
+ getOwner().handleDiskAccessException(dae);
+ throw dae;
+ } finally {
+ doAfterCompletionActions();
+ }
+ }
+
+ private void doWithIndexInUpdateMode(Runnable r) {
+ final IndexManager oqlIndexManager = getInitializedIndexManager();
+ if (oqlIndexManager != null) {
+ try {
+ r.run();
+ } finally {
+ oqlIndexManager.countDownIndexUpdaters();
+ }
+ } else {
+ r.run();
+ }
+ }
+
+ private IndexManager getInitializedIndexManager() {
+ final IndexManager oqlIndexManager = getOwner().getIndexManager();
+ if (oqlIndexManager != null) {
+ oqlIndexManager.waitForIndexInit();
+ }
+ return oqlIndexManager;
+ }
+
+ private void doPutRetryingIfNeeded() {
+ do {
+ if (!findAndSaveExistingEntry()) {
+ return;
+ }
+ createNewEntryIfNeeded();
+ } while (!addRegionEntryToMapAndDoPut());
+ }
+
+ /**
+ * If an existing one is found, save it by calling setRegionEntry.
+ *
+ * @return false if an existing entry was not found and this put requires
+ * an existing one; otherwise returns true.
+ */
+ private boolean findAndSaveExistingEntry() {
+ RegionEntry re = getRegionMap().getEntry(getEvent());
+ if (isOnlyExisting() && !entryExists(re)) {
+ setRegionEntry(null);
+ return false;
+ }
+ setRegionEntry(re);
+ return true;
+ }
+
+ private void createNewEntryIfNeeded() {
+ setCreate(getRegionEntry() == null);
+ if (isCreate()) {
+ final Object key = getEvent().getKey();
+ RegionEntry newEntry =
+ getRegionMap().getEntryFactory().createEntry(getOwner(), key, Token.REMOVED_PHASE1);
+ setRegionEntry(newEntry);
+ }
+ }
+
+ /**
+ * @return false if caller should retry
+ */
+ private boolean addRegionEntryToMapAndDoPut() {
+ synchronized (getRegionEntry()) {
+ putIfAbsentNewEntry();
+ return doPutOnRegionEntryInMap();
+ }
+ }
+
+ private void putIfAbsentNewEntry() {
+ if (isCreate()) {
+ RegionEntry oldRe = getRegionMap().putEntryIfAbsent(getEvent().getKey(), getRegionEntry());
+ if (oldRe != null) {
+ setCreate(false);
+ setRegionEntry(oldRe);
+ }
+ }
+ }
+
+ /**
+ * @return false if caller should retry because entry is no longer in the map
+ */
+ private boolean doPutOnRegionEntryInMap() {
+ synchronized (getRegionEntry()) {
+ if (isRegionEntryRemoved()) {
+ return false;
+ }
+ doPutOnSynchronizedRegionEntry();
+ return true;
+ }
+ }
+
+ private void doPutOnSynchronizedRegionEntry() {
+ setOldValueForDelta();
+ try {
+ setOldValueInEvent();
+ doPutIfPreconditionsSatisified();
+ } finally {
+ unsetOldValueForDelta();
+ if (isCreate() && shouldCreatedEntryBeRemoved()) {
+ getRegionMap().removeEntry(getEvent().getKey(), getRegionEntry(), false);
+ }
+ }
+ }
+
+ private void doPutIfPreconditionsSatisified() {
+ if (!checkPreconditions()) {
+ return;
+ }
+ invokeCacheWriter();
+ runWithIndexUpdatingInProgress(this::doPutAndDeliverEvent);
+ }
+
+ private void doPutAndDeliverEvent() {
+ createOrUpdateEntry();
+ doBeforeCompletionActions();
+ setCompleted(true);
+ }
+
+ private void runWithIndexUpdatingInProgress(Runnable r) {
+ notifyIndex(true);
+ try {
+ r.run();
+ } finally {
+ notifyIndex(false);
+ }
+ }
+
+ private void notifyIndex(boolean isUpdating) {
+ if (getOwner().getIndexMaintenanceSynchronous()) {
+ getRegionEntry().setUpdateInProgress(isUpdating);
+ }
+ }
+
+ /**
+ * @return true if the entry is in the final stage of removal
+ */
+ private boolean isRegionEntryRemoved() {
+ final RegionEntry re = getRegionEntry();
+ if (re.isRemovedPhase2()) {
+ getOwner().getCachePerfStats().incRetries();
+ getRegionMap().getEntryMap().remove(getEvent().getKey(), re);
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/map/FocusedRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/map/FocusedRegionMap.java
index c4064fa..328e943 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/map/FocusedRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/map/FocusedRegionMap.java
@@ -16,10 +16,12 @@ package org.apache.geode.internal.cache.map;
import java.util.Map;
+import org.apache.geode.cache.Operation;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.RegionEntry;
import org.apache.geode.internal.cache.RegionEntryFactory;
+import org.apache.geode.internal.cache.TXEntryState;
public interface FocusedRegionMap {
@@ -46,9 +48,16 @@ public interface FocusedRegionMap {
void lruEntryCreate(RegionEntry regionEntry);
+ void incEntryCount(int delta);
+
void runWhileEvictionDisabled(Runnable runnable);
void lruUpdateCallback();
void resetThreadLocals();
+
+ void txRemoveOldIndexEntry(Operation putOp, RegionEntry regionEntry);
+
+ void processAndGenerateTXVersionTag(EntryEventImpl callbackEvent, RegionEntry regionEntry,
+ TXEntryState txEntryState);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapCommitPut.java b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapCommitPut.java
new file mode 100644
index 0000000..9536ce6
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapCommitPut.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.map;
+
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.RegionClearedException;
+import org.apache.geode.internal.cache.RegionEntry;
+import org.apache.geode.internal.cache.TXEntryState;
+import org.apache.geode.internal.cache.TXRmtEvent;
+import org.apache.geode.internal.cache.Token;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.offheap.annotations.Released;
+import org.apache.geode.internal.sequencelog.EntryLogger;
+
+/**
+ * Does a put for a transaction that is being committed.
+ */
+public class RegionMapCommitPut extends AbstractRegionMapPut {
+ private static final Logger logger = LogService.getLogger();
+
+ private final boolean onlyExisting;
+ private final boolean didDestroy;
+ private final TXRmtEvent txEvent;
+ private final List<EntryEventImpl> pendingCallbacks;
+ private final TXEntryState txEntryState;
+ private final boolean remoteOrigin;
+ private final boolean invokeCallbacks;
+
+ private boolean callbackEventInPending;
+ private Operation putOp;
+
+ public RegionMapCommitPut(FocusedRegionMap focusedRegionMap, InternalRegion owner,
+ @Released EntryEventImpl callbackEvent, Operation putOp, boolean didDestroy,
+ TransactionId txId, TXRmtEvent txEvent, List<EntryEventImpl> pendingCallbacks,
+ TXEntryState txEntryState) {
+ super(focusedRegionMap, owner, callbackEvent);
+ this.putOp = putOp;
+ this.didDestroy = didDestroy;
+ this.txEvent = txEvent;
+ this.pendingCallbacks = pendingCallbacks;
+ this.txEntryState = txEntryState;
+ this.remoteOrigin = !txId.getMemberId().equals(owner.getMyId());
+ this.invokeCallbacks = shouldInvokeCallbacks();
+ final boolean isTXHost = txEntryState != null;
+ // If the transaction originated on another member and we do not host the transaction entry
+ // and are not a replicate or partitioned (i.e. !isAllEvents)
+ // then only apply the update to existing entries.
+ // If the transaction originated on another member and we do not host the transaction entry
+ // and we are a replicate or partitioned
+ // then only apply the update to existing entries when the operation is an update and we
+ // are initialized.
+ // Otherwise use the standard create/update logic.
+ this.onlyExisting = remoteOrigin && !isTXHost
+ && (!owner.isAllEvents() || (!putOp.isCreate() && isOwnerInitialized()));
+ }
+
+ boolean isRemoteOrigin() {
+ return remoteOrigin;
+ }
+
+ boolean isInvokeCallbacks() {
+ return invokeCallbacks;
+ }
+
+ private Operation getPutOp() {
+ return putOp;
+ }
+
+ private boolean isPutOpCreate() {
+ return getPutOp().isCreate();
+ }
+
+ private boolean shouldInvokeCallbacks() {
+ InternalRegion owner = getOwner();
+ boolean isPartitioned = owner.isUsedForPartitionedRegionBucket();
+ if (isPartitioned) {
+ owner = owner.getPartitionedRegion();
+ }
+ return (isPartitioned || isOwnerInitialized()) && (owner.shouldDispatchListenerEvent()
+ || owner.shouldNotifyBridgeClients() || owner.getConcurrencyChecksEnabled());
+ }
+
+ private void setCallbackEventInPending(boolean v) {
+ this.callbackEventInPending = v;
+ }
+
+ boolean isCallbackEventInPending() {
+ return this.callbackEventInPending;
+ }
+
+ private void makePutOpCreate() {
+ putOp = putOp.getCorrespondingCreateOp();
+ }
+
+ private void makePutOpUpdate() {
+ putOp = putOp.getCorrespondingUpdateOp();
+ }
+
+ @Override
+ protected boolean isOnlyExisting() {
+ return this.onlyExisting;
+ }
+
+ @Override
+ protected boolean entryExists(RegionEntry regionEntry) {
+ return regionEntry != null && !regionEntry.isDestroyedOrRemoved();
+ }
+
+ @Override
+ protected void serializeNewValueIfNeeded() {
+ // nothing needed
+ }
+
+ @Override
+ protected void runWhileLockedForCacheModification(Runnable r) {
+ // commit has already done the locking
+ r.run();
+ }
+
+ @Override
+ protected void setOldValueForDelta() {
+ // nothing needed
+ }
+
+ @Override
+ protected void setOldValueInEvent() {
+ if (isCreate()) {
+ makePutOpCreate();
+ } else {
+ if (!getRegionEntry().isDestroyedOrRemoved()) {
+ makePutOpUpdate();
+ } else {
+ makePutOpCreate();
+ }
+ }
+ if (isPutOpCreate()) {
+ getEvent().makeCreate();
+ getEvent().setOldValue(null);
+ } else {
+ getEvent().makeUpdate();
+ Object oldValue = getRegionEntry().getValueInVM(getOwner());
+ getEvent().setOldValue(oldValue);
+ }
+ }
+
+ @Override
+ protected void unsetOldValueForDelta() {
+ // nothing needed
+ }
+
+ @Override
+ protected boolean checkPreconditions() {
+ if (isOnlyExisting() && isPutOpCreate()) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ protected void invokeCacheWriter() {
+ // nothing needed
+ }
+
+ @Override
+ protected void createOrUpdateEntry() {
+ final RegionEntry regionEntry = getRegionEntry();
+ final EntryEventImpl callbackEvent = getEvent();
+ final InternalRegion owner = getOwner();
+ final FocusedRegionMap regionMap = getRegionMap();
+ final Object key = callbackEvent.getKey();
+ final Object newValue = computeNewValue(callbackEvent);
+
+ final int oldSize = isPutOpCreate() ? 0 : owner.calculateRegionEntryValueSize(regionEntry);
+ callbackEvent.setRegionEntry(regionEntry);
+ regionMap.txRemoveOldIndexEntry(getPutOp(), regionEntry);
+ setLastModifiedTime(owner.cacheTimeMillis());
+ if (didDestroy) {
+ regionEntry.txDidDestroy(getLastModifiedTime());
+ }
+ if (txEvent != null) {
+ txEvent.addPut(getPutOp(), owner, regionEntry, key, newValue,
+ callbackEvent.getCallbackArgument());
+ }
+ regionEntry.setValueResultOfSearch(getPutOp().isNetSearch());
+ try {
+ regionMap.processAndGenerateTXVersionTag(callbackEvent, regionEntry, txEntryState);
+ setNewValueOnRegionEntry(newValue);
+ int newSize = owner.calculateRegionEntryValueSize(regionEntry);
+ if (isPutOpCreate()) {
+ owner.updateSizeOnCreate(key, newSize);
+ } else {
+ owner.updateSizeOnPut(key, oldSize, newSize);
+ }
+ } catch (RegionClearedException rce) {
+ setClearOccurred(true);
+ }
+ EntryLogger.logTXPut(owner, key, newValue);
+ }
+
+ private void setNewValueOnRegionEntry(final Object newValue) throws RegionClearedException {
+ final RegionEntry regionEntry = getRegionEntry();
+ final InternalRegion owner = getOwner();
+ final boolean wasTombstone = regionEntry.isTombstone();
+ final Object preparedValue =
+ regionEntry.prepareValueForCache(owner, newValue, getEvent(), !isPutOpCreate());
+ regionEntry.setValue(owner, preparedValue);
+ if (wasTombstone) {
+ owner.unscheduleTombstone(regionEntry);
+ }
+ }
+
+ private static Object computeNewValue(EntryEventImpl callbackEvent) {
+ Object newValue = callbackEvent.getRawNewValueAsHeapObject();
+ if (newValue == null) {
+ if (callbackEvent.isLocalInvalid()) {
+ newValue = Token.LOCAL_INVALID;
+ } else {
+ newValue = Token.INVALID;
+ }
+ }
+ return newValue;
+ }
+
+ @Override
+ protected boolean shouldCreatedEntryBeRemoved() {
+ return !isCompleted();
+ }
+
+ @Override
+ protected void doBeforeCompletionActions() {
+ final RegionEntry regionEntry = getRegionEntry();
+ final EntryEventImpl callbackEvent = getEvent();
+ final InternalRegion owner = getOwner();
+ final FocusedRegionMap regionMap = getRegionMap();
+ final Object key = callbackEvent.getKey();
+
+ regionEntry.updateStatsForPut(getLastModifiedTime(), getLastModifiedTime());
+ owner.txApplyPutPart2(regionEntry, key, getLastModifiedTime(), isPutOpCreate(), didDestroy,
+ isClearOccurred());
+ if (isInvokeCallbacks()) {
+ callbackEvent.changeRegionToBucketsOwner();
+ callbackEvent.setOriginRemote(isRemoteOrigin());
+ pendingCallbacks.add(callbackEvent);
+ setCallbackEventInPending(true);
+ }
+ if (!isClearOccurred()) {
+ if (isCreate()) {
+ regionMap.lruEntryCreate(regionEntry);
+ regionMap.incEntryCount(1);
+ } else {
+ regionMap.lruEntryUpdate(regionEntry);
+ }
+ }
+ }
+
+ @Override
+ protected void doAfterCompletionActions() {
+ if (isOnlyExisting() && !isCompleted()) {
+ if (didDestroy) {
+ getOwner().txApplyPutHandleDidDestroy(getEvent().getKey());
+ }
+ if (isInvokeCallbacks()) {
+ getEvent().makeUpdate();
+ getOwner().invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, getEvent(), false);
+ }
+ }
+ if (getOwner().getConcurrencyChecksEnabled() && txEntryState != null) {
+ txEntryState.setVersionTag(getEvent().getVersionTag());
+ }
+ if (!isCallbackEventInPending()) {
+ getEvent().release();
+ }
+ }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPut.java b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPut.java
index f089db9..506b7b7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPut.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPut.java
@@ -21,10 +21,7 @@ import java.util.Set;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.Operation;
-import org.apache.geode.cache.query.internal.index.IndexManager;
-import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.CachePerfStats;
-import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EntryEventSerialization;
import org.apache.geode.internal.cache.InternalRegion;
@@ -42,15 +39,13 @@ import org.apache.geode.internal.offheap.annotations.Retained;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.sequencelog.EntryLogger;
-public class RegionMapPut {
- private final InternalRegion owner;
- private final FocusedRegionMap focusedRegionMap;
- private final EntryEventImpl event;
+public class RegionMapPut extends AbstractRegionMapPut {
+ private final CacheModificationLock cacheModificationLock;
+ private final EntryEventSerialization entryEventSerialization;
private final boolean ifNew;
private final boolean ifOld;
private final boolean overwriteDestroyed;
private final boolean requireOldValue;
- private final boolean uninitialized;
private final boolean retrieveOldValueForDelta;
private final boolean replaceOnClient;
private final boolean onlyExisting;
@@ -58,13 +53,7 @@ public class RegionMapPut {
private final CacheWriter cacheWriter;
private final Set netWriteRecipients;
private final Object expectedOldValue;
- private final CacheModificationLock cacheModificationLock;
- private final EntryEventSerialization entryEventSerialization;
- private boolean clearOccured;
- private long lastModifiedTime;
- private RegionEntry regionEntry;
- private boolean create;
- private boolean completed;
+
@Released
private Object oldValueForDelta;
@@ -72,35 +61,22 @@ public class RegionMapPut {
CacheModificationLock cacheModificationLock, EntryEventSerialization entryEventSerialization,
EntryEventImpl event, boolean ifNew, boolean ifOld, boolean overwriteDestroyed,
boolean requireOldValue, Object expectedOldValue) {
- if (owner == null) {
- // "fix" for bug 32440
- Assert.assertTrue(false, "The owner for RegionMap " + this + " is null for event " + event);
- }
- this.focusedRegionMap = focusedRegionMap;
- this.owner = owner;
+ super(focusedRegionMap, owner, event);
this.cacheModificationLock = cacheModificationLock;
this.entryEventSerialization = entryEventSerialization;
- this.event = event;
this.ifNew = ifNew;
this.ifOld = ifOld;
this.overwriteDestroyed = overwriteDestroyed;
this.requireOldValue = requireOldValue;
- this.uninitialized = !owner.isInitialized();
this.retrieveOldValueForDelta = event.getDeltaBytes() != null && event.getRawNewValue() == null;
- this.replaceOnClient =
- event.getOperation() == Operation.REPLACE && owner.getServerProxy() != null;
+ this.replaceOnClient = event.getOperation() == Operation.REPLACE && owner.hasServerProxy();
this.onlyExisting = ifOld && !isReplaceOnClient();
this.cacheWriter = owner.basicGetWriter();
this.cacheWrite = !event.isOriginRemote() && !event.isNetSearch() && event.isGenerateCallbacks()
&& (getCacheWriter() != null || owner.hasServerProxy() || owner.getScope().isDistributed());
this.expectedOldValue = expectedOldValue;
- if (isCacheWrite()) {
- if (getCacheWriter() == null && owner.getScope().isDistributed()) {
- this.netWriteRecipients =
- ((DistributedRegion) owner).getCacheDistributionAdvisor().adviseNetWrite();
- } else {
- this.netWriteRecipients = null;
- }
+ if (isCacheWrite() && getCacheWriter() == null) {
+ this.netWriteRecipients = owner.adviseNetWrite();
} else {
this.netWriteRecipients = null;
}
@@ -122,23 +98,15 @@ public class RegionMapPut {
return requireOldValue;
}
- private boolean isUninitialized() {
- return uninitialized;
- }
-
- private boolean isRetrieveOldValueForDelta() {
+ boolean isRetrieveOldValueForDelta() {
return retrieveOldValueForDelta;
}
- private boolean isReplaceOnClient() {
+ boolean isReplaceOnClient() {
return replaceOnClient;
}
- private boolean isOnlyExisting() {
- return onlyExisting;
- }
-
- private boolean isCacheWrite() {
+ boolean isCacheWrite() {
return cacheWrite;
}
@@ -154,53 +122,6 @@ public class RegionMapPut {
return expectedOldValue;
}
- private boolean getClearOccured() {
- return clearOccured;
- }
-
- private void setClearOccured(boolean clearOccured) {
- this.clearOccured = clearOccured;
- }
-
- private long getLastModifiedTime() {
- return lastModifiedTime;
- }
-
- private void setLastModifiedTime(long lastModifiedTime) {
- this.lastModifiedTime = lastModifiedTime;
- }
-
- private RegionEntry getRegionEntry() {
- return regionEntry;
- }
-
- private void setRegionEntry(RegionEntry regionEntry) {
- this.regionEntry = regionEntry;
- }
-
- /**
- * @return true if put created a new entry; false if it updated an existing one.
- */
- private boolean isCreate() {
- return create;
- }
-
- private void setCreate(boolean v) {
- this.create = v;
- }
-
- private EntryEventImpl getEvent() {
- return event;
- }
-
- private boolean isCompleted() {
- return this.completed;
- }
-
- private void setCompleted(boolean b) {
- this.completed = b;
- }
-
private Object getOldValueForDelta() {
return this.oldValueForDelta;
}
@@ -209,74 +130,23 @@ public class RegionMapPut {
this.oldValueForDelta = value;
}
- private InternalRegion getOwner() {
- return owner;
+ @Override
+ protected boolean isOnlyExisting() {
+ return onlyExisting;
}
- private FocusedRegionMap getRegionMap() {
- return focusedRegionMap;
+ @Override
+ protected boolean entryExists(RegionEntry regionEntry) {
+ return regionEntry != null && !regionEntry.isTombstone();
}
- /**
- * @return regionEntry if put completed, otherwise null.
- */
- public RegionEntry put() {
+ @Override
+ protected void serializeNewValueIfNeeded() {
entryEventSerialization.serializeNewValueIfNeeded(getOwner(), getEvent());
- runWhileLockedForCacheModification(this::doPut);
- if (isCompleted()) {
- return getRegionEntry();
- } else {
- return null;
- }
- }
-
- private void doPut() {
- try {
- doWithIndexInUpdateMode(this::doPutRetryingIfNeeded);
- } catch (DiskAccessException dae) {
- getOwner().handleDiskAccessException(dae);
- throw dae;
- } finally {
- doAfterPut();
- }
}
- private void doWithIndexInUpdateMode(Runnable r) {
- final IndexManager oqlIndexManager = getInitializedIndexManager();
- if (oqlIndexManager != null) {
- try {
- r.run();
- } finally {
- oqlIndexManager.countDownIndexUpdaters();
- }
- } else {
- r.run();
- }
- }
-
- private IndexManager getInitializedIndexManager() {
- IndexManager oqlIndexManager;
- // Fix for Bug #44431. We do NOT want to update the region and wait
- // later for index INIT as region.clear() can cause inconsistency if
- // happened in parallel as it also does index INIT.
- oqlIndexManager = getOwner().getIndexManager();
- if (oqlIndexManager != null) {
- oqlIndexManager.waitForIndexInit();
- }
- return oqlIndexManager;
- }
-
- private void doPutRetryingIfNeeded() {
- do {
- setRegionEntry(null);
- if (!findExistingEntry()) {
- return;
- }
- createNewEntryIfNeeded();
- } while (!addRegionEntryToMapAndDoPut());
- }
-
- private void runWhileLockedForCacheModification(Runnable r) {
+ @Override
+ protected void runWhileLockedForCacheModification(Runnable r) {
cacheModificationLock.lockForCacheModification(getOwner(), getEvent());
try {
r.run();
@@ -285,234 +155,17 @@ public class RegionMapPut {
}
}
-
- /**
- * @return false if an existing entry was not found and this put requires
- * an existing one; otherwise returns true.
- */
- private boolean findExistingEntry() {
- RegionEntry re = getRegionMap().getEntry(getEvent());
- if (isOnlyExisting()) {
- if (re == null || re.isTombstone()) {
- return false;
- }
- }
- setRegionEntry(re);
- return true;
- }
-
- /**
- * Stores the created entry in getRegionEntry.
- */
- private void createNewEntryIfNeeded() {
- setCreate(getRegionEntry() == null);
- if (isCreate()) {
- final Object key = getEvent().getKey();
- RegionEntry newEntry =
- getRegionMap().getEntryFactory().createEntry(getOwner(), key, Token.REMOVED_PHASE1);
- setRegionEntry(newEntry);
- }
- }
-
- /**
- * @return false if caller should retry
- */
- private boolean addRegionEntryToMapAndDoPut() {
- synchronized (getRegionEntry()) {
- putIfAbsentNewEntry();
- return doPutOnRegionEntryInMap();
- }
- }
-
- private void putIfAbsentNewEntry() {
- if (isCreate()) {
- RegionEntry oldRe = getRegionMap().putEntryIfAbsent(getEvent().getKey(), getRegionEntry());
- if (oldRe != null) {
- setCreate(false);
- setRegionEntry(oldRe);
- }
- }
- }
-
- /**
- * @return false if caller should retry because entry is no longer in the map
- */
- private boolean doPutOnRegionEntryInMap() {
- final RegionEntry re = getRegionEntry();
-
- synchronized (re) {
- if (isRegionEntryRemoved()) {
- return false;
- }
-
- setOldValueForDelta();
- try {
- setOldValueInEvent();
- doPutIfPreconditionsPass();
- return true;
- } finally {
- OffHeapHelper.release(getOldValueForDelta());
- setOldValueForDelta(null);
- if (isCreate() && re.getValueAsToken() == Token.REMOVED_PHASE1) {
- // Region entry remove needs to be done while still synced on re.
- getRegionMap().removeEntry(getEvent().getKey(), re, false);
- }
- }
- }
- }
-
- private void doAfterPut() {
- if (isCompleted()) {
- try {
- final boolean invokeListeners = getEvent().basicGetNewValue() != Token.TOMBSTONE;
- getOwner().basicPutPart3(getEvent(), getRegionEntry(), !isUninitialized(),
- getLastModifiedTime(), invokeListeners, isIfNew(), isIfOld(), getExpectedOldValue(),
- isRequireOldValue());
- } finally {
- if (!getClearOccured()) {
- try {
- getRegionMap().lruUpdateCallback();
- } catch (DiskAccessException dae) {
- getOwner().handleDiskAccessException(dae);
- throw dae;
- }
- }
- }
- } else {
- getRegionMap().resetThreadLocals();
- }
- }
-
- /**
- * @return false if precondition indicates that
- * the put should not be done.
- */
- private boolean checkPreconditions() {
- if (continueUpdate() && continueOverwriteDestroyed() && satisfiesExpectedOldValue()) {
- return true;
- }
- return false;
- }
-
- private void doPutIfPreconditionsPass() {
- if (!checkPreconditions()) {
- return;
- }
- invokeCacheWriter();
- runWithIndexUpdatingInProgress(this::doPutAndDeliverEvent);
- }
-
- private void doPutAndDeliverEvent() {
- final EntryEventImpl event = getEvent();
- createOrUpdateEntry();
- if (isUninitialized()) {
- event.inhibitCacheListenerNotification(true);
- }
- updateLru();
-
- final RegionEntry re = getRegionEntry();
- long lastModTime = getOwner().basicPutPart2(event, re, !isUninitialized(),
- getLastModifiedTime(), getClearOccured());
- setLastModifiedTime(lastModTime);
- setCompleted(true);
- }
-
- private void runWithIndexUpdatingInProgress(Runnable r) {
- notifyIndex(true);
- try {
- r.run();
- } finally {
- notifyIndex(false);
- }
- }
-
- private void notifyIndex(boolean isUpdating) {
- if (getOwner().getIndexMaintenanceSynchronous()) {
- getRegionEntry().setUpdateInProgress(isUpdating);
- }
- }
-
- private void createOrUpdateEntry() {
- final EntryEventImpl event = getEvent();
- try {
- if (isUpdate()) {
- updateEntry();
- } else {
- createEntry();
- }
- getOwner().recordEvent(event);
- } catch (RegionClearedException rce) {
- setClearOccured(true);
- getOwner().recordEvent(event);
- } catch (ConcurrentCacheModificationException ccme) {
- VersionTag tag = event.getVersionTag();
- if (tag != null && tag.isTimeStampUpdated()) {
- getOwner().notifyTimestampsToGateways(event);
- }
- throw ccme;
- }
- }
-
- private boolean isUpdate() {
- if (isCacheWrite() && getEvent().getOperation().isUpdate()) {
- // if there is a cacheWriter, type of event has already been set
- return true;
- }
- if (isReplaceOnClient()) {
- return true;
- }
- if (!getRegionEntry().isRemoved()) {
- return true;
- }
- return false;
- }
-
- private void setOldValueForDelta() {
+ @Override
+ protected void setOldValueForDelta() {
if (isRetrieveOldValueForDelta()) {
getRegionMap().runWhileEvictionDisabled(() -> {
- // Old value is faulted in from disk if not found in memory.
setOldValueForDelta(getRegionEntry().getValue(getOwner()));
- // OFFHEAP: if we are synced on region entry no issue since we can use ARE's ref
});
}
}
- /**
- * If the re goes into removed2 state, it will be removed from the map.
- *
- * @return true if re was remove phase 2
- */
- private boolean isRegionEntryRemoved() {
- final RegionEntry re = getRegionEntry();
- if (re.isRemovedPhase2()) {
- getOwner().getCachePerfStats().incRetries();
- getRegionMap().getEntryMap().remove(getEvent().getKey(), re);
- return true;
- } else {
- return false;
- }
- }
-
- private boolean satisfiesExpectedOldValue() {
- // replace is propagated to server, so no need to check
- // satisfiesOldValue on client
- final EntryEventImpl event = getEvent();
- if (getExpectedOldValue() != null && !isReplaceOnClient()) {
- assert event.getOperation().guaranteesOldValue();
- // We already called setOldValueInEvent so the event will have the old value.
- @Unretained
- Object v = event.getRawOldValue();
- // Note that v will be null instead of INVALID because setOldValue
- // converts INVALID to null.
- // But checkExpectedOldValue handle this and says INVALID equals null.
- if (!AbstractRegionEntry.checkExpectedOldValue(getExpectedOldValue(), v, event.getRegion())) {
- return false;
- }
- }
- return true;
- }
-
- private void setOldValueInEvent() {
+ @Override
+ protected void setOldValueInEvent() {
final EntryEventImpl event = getEvent();
final RegionEntry re = getRegionEntry();
event.setRegionEntry(re);
@@ -563,31 +216,68 @@ public class RegionMapPut {
}
}
- private void createEntry() throws RegionClearedException {
+ @Override
+ protected void unsetOldValueForDelta() {
+ OffHeapHelper.release(getOldValueForDelta());
+ setOldValueForDelta(null);
+ }
+
+ @Override
+ protected void invokeCacheWriter() {
final EntryEventImpl event = getEvent();
- final RegionEntry re = getRegionEntry();
- final boolean wasTombstone = re.isTombstone();
- getRegionMap().processVersionTag(re, event);
- event.putNewEntry(getOwner(), re);
- updateSize(0, false, wasTombstone);
- if (!event.getRegion().isInitialized()) {
- getOwner().getImageState().removeDestroyedEntry(event.getKey());
+ if (getOwner().isInitialized() && isCacheWrite()) {
+ if (!isReplaceOnClient()) {
+ if (getRegionEntry().isDestroyedOrRemoved()) {
+ event.makeCreate();
+ } else {
+ event.makeUpdate();
+ }
+ }
+ getOwner().cacheWriteBeforePut(event, getNetWriteRecipients(), getCacheWriter(),
+ isRequireOldValue(), getExpectedOldValue());
+ }
+ if (!getOwner().isInitialized() && !isCacheWrite()) {
+ event.oldValueNotAvailable();
}
}
- private void updateEntry() throws RegionClearedException {
+ @Override
+ protected void createOrUpdateEntry() {
+ try {
+ if (isUpdate()) {
+ updateEntry();
+ } else {
+ createEntry();
+ }
+ } catch (RegionClearedException rce) {
+ setClearOccurred(true);
+ } catch (ConcurrentCacheModificationException ccme) {
+ final EntryEventImpl event = getEvent();
+ VersionTag tag = event.getVersionTag();
+ if (tag != null && tag.isTimeStampUpdated()) {
+ getOwner().notifyTimestampsToGateways(event);
+ }
+ throw ccme;
+ }
+ }
+
+ @Override
+ protected void doBeforeCompletionActions() {
final EntryEventImpl event = getEvent();
+ getOwner().recordEvent(event);
+ if (!isOwnerInitialized()) {
+ event.inhibitCacheListenerNotification(true);
+ }
+ updateLru();
+
final RegionEntry re = getRegionEntry();
- final boolean wasTombstone = re.isTombstone();
- final int oldSize = event.getRegion().calculateRegionEntryValueSize(re);
- getRegionMap().processVersionTag(re, event);
- event.putExistingEntry(event.getRegion(), re, isRequireOldValue(), getOldValueForDelta());
- EntryLogger.logPut(event);
- updateSize(oldSize, true/* isUpdate */, wasTombstone);
+ long lastModTime = getOwner().basicPutPart2(event, re, isOwnerInitialized(),
+ getLastModifiedTime(), isClearOccurred());
+ setLastModifiedTime(lastModTime);
}
private void updateLru() {
- if (!getClearOccured()) {
+ if (!isClearOccurred()) {
if (getEvent().getOperation().isCreate()) {
getRegionMap().lruEntryCreate(getRegionEntry());
} else {
@@ -596,44 +286,58 @@ public class RegionMapPut {
}
}
- private void invokeCacheWriter() {
- final EntryEventImpl event = getEvent();
- // invoke listeners only if region is initialized
- if (getOwner().isInitialized() && isCacheWrite()) {
- // event.setOldValue already called in setOldValueInEvent
+ @Override
+ protected boolean shouldCreatedEntryBeRemoved() {
+ return getRegionEntry().getValueAsToken() == Token.REMOVED_PHASE1;
+ }
- // bug #42638 for replaceOnClient, do not make the event create
- // or update since replace must propagate to server
- if (!isReplaceOnClient()) {
- if (getRegionEntry().isDestroyedOrRemoved()) {
- event.makeCreate();
- } else {
- event.makeUpdate();
+ @Override
+ protected void doAfterCompletionActions() {
+ if (isCompleted()) {
+ try {
+ final boolean invokeListeners = getEvent().basicGetNewValue() != Token.TOMBSTONE;
+ getOwner().basicPutPart3(getEvent(), getRegionEntry(), isOwnerInitialized(),
+ getLastModifiedTime(), invokeListeners, isIfNew(), isIfOld(), getExpectedOldValue(),
+ isRequireOldValue());
+ } finally {
+ if (!isClearOccurred()) {
+ try {
+ getRegionMap().lruUpdateCallback();
+ } catch (DiskAccessException dae) {
+ getOwner().handleDiskAccessException(dae);
+ throw dae;
+ }
}
}
- getOwner().cacheWriteBeforePut(event, getNetWriteRecipients(), getCacheWriter(),
- isRequireOldValue(), getExpectedOldValue());
- }
- if (!getOwner().isInitialized() && !isCacheWrite()) {
- // block setting of old value in putNewValueNoSync, don't need it
- event.oldValueNotAvailable();
+ } else {
+ getRegionMap().resetThreadLocals();
}
}
- private boolean continueOverwriteDestroyed() {
- Token oldValueInVM = getRegionEntry().getValueAsToken();
- // if region is under GII, check if token is destroyed
- if (!isOverwriteDestroyed()) {
- if (!getOwner().isInitialized()
- && (oldValueInVM == Token.DESTROYED || oldValueInVM == Token.TOMBSTONE)) {
- getEvent().setOldValueDestroyedToken();
- return false;
- }
+ private boolean isUpdate() {
+ if (isCacheWrite() && getEvent().getOperation().isUpdate()) {
+ // if there is a cacheWriter, type of event has already been set
+ return true;
}
- if (isIfNew() && !Token.isRemoved(oldValueInVM)) {
- return false;
+ if (isReplaceOnClient()) {
+ return true;
}
- return true;
+ if (!getRegionEntry().isRemoved()) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * @return false if precondition indicates that
+ * the put should not be done.
+ */
+ @Override
+ protected boolean checkPreconditions() {
+ if (continueUpdate() && continueOverwriteDestroyed() && satisfiesExpectedOldValue()) {
+ return true;
+ }
+ return false;
}
private boolean continueUpdate() {
@@ -660,6 +364,64 @@ public class RegionMapPut {
return true;
}
+ private boolean continueOverwriteDestroyed() {
+ Token oldValueInVM = getRegionEntry().getValueAsToken();
+ // if region is under GII, check if token is destroyed
+ if (!isOverwriteDestroyed()) {
+ if (!getOwner().isInitialized()
+ && (oldValueInVM == Token.DESTROYED || oldValueInVM == Token.TOMBSTONE)) {
+ getEvent().setOldValueDestroyedToken();
+ return false;
+ }
+ }
+ if (isIfNew() && !Token.isRemoved(oldValueInVM)) {
+ return false;
+ }
+ return true;
+ }
+
+ private boolean satisfiesExpectedOldValue() {
+ // replace is propagated to server, so no need to check
+ // satisfiesOldValue on client
+ final EntryEventImpl event = getEvent();
+ if (getExpectedOldValue() != null && !isReplaceOnClient()) {
+ assert event.getOperation().guaranteesOldValue();
+ // We already called setOldValueInEvent so the event will have the old value.
+ @Unretained
+ Object v = event.getRawOldValue();
+ // Note that v will be null instead of INVALID because setOldValue
+ // converts INVALID to null.
+ // But checkExpectedOldValue handle this and says INVALID equals null.
+ if (!AbstractRegionEntry.checkExpectedOldValue(getExpectedOldValue(), v, event.getRegion())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void createEntry() throws RegionClearedException {
+ final EntryEventImpl event = getEvent();
+ final RegionEntry re = getRegionEntry();
+ final boolean wasTombstone = re.isTombstone();
+ getRegionMap().processVersionTag(re, event);
+ event.putNewEntry(getOwner(), re);
+ updateSize(0, false, wasTombstone);
+ if (!event.getRegion().isInitialized()) {
+ getOwner().getImageState().removeDestroyedEntry(event.getKey());
+ }
+ }
+
+ private void updateEntry() throws RegionClearedException {
+ final EntryEventImpl event = getEvent();
+ final RegionEntry re = getRegionEntry();
+ final boolean wasTombstone = re.isTombstone();
+ final int oldSize = event.getRegion().calculateRegionEntryValueSize(re);
+ getRegionMap().processVersionTag(re, event);
+ event.putExistingEntry(event.getRegion(), re, isRequireOldValue(), getOldValueForDelta());
+ EntryLogger.logPut(event);
+ updateSize(oldSize, true/* isUpdate */, wasTombstone);
+ }
+
private void updateSize(int oldSize, boolean isUpdate, boolean wasTombstone) {
final EntryEventImpl event = getEvent();
final Object key = event.getKey();
@@ -676,4 +438,5 @@ public class RegionMapPut {
}
}
}
+
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index d93a9fc..057697a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1804,7 +1804,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
@Override
- protected boolean shouldNotifyBridgeClients() {
+ public boolean shouldNotifyBridgeClients() {
return false;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 6c6e416..64e3a6a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -1221,7 +1221,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
}
@Override
- protected boolean shouldNotifyBridgeClients() {
+ public boolean shouldNotifyBridgeClients() {
return false;
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
index c01b4b4..d8957ed 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
@@ -861,12 +861,12 @@ public class AbstractRegionMapTest {
AbstractRegionMap arm = new TxPutIfAbsentTestableAbstractRegionMap();
List<EntryEventImpl> pendingCallbacks = new ArrayList<>();
TXId txId = new TXId(arm._getOwner().getMyId(), 1);
- TXRmtEvent txRmtEvent = mock(TXRmtEvent.class);
EventID eventId = mock(EventID.class);
+ TXEntryState txEntryState = mock(TXEntryState.class);
Object newValue = "value";
- arm.txApplyPut(Operation.UPDATE, KEY, newValue, false, txId, txRmtEvent, eventId, null,
- pendingCallbacks, null, null, null, null, 1);
+ arm.txApplyPut(Operation.UPDATE, KEY, newValue, false, txId, null, eventId, null,
+ pendingCallbacks, null, null, txEntryState, null, 1);
assertEquals(1, pendingCallbacks.size());
verify(arm._getOwner(), times(1)).txApplyPutPart2(any(), any(), anyLong(), anyBoolean(),
@@ -913,14 +913,6 @@ public class AbstractRegionMapTest {
false);
}
- private EntryEventImpl createEventForCreate(LocalRegion lr, String key) {
- when(lr.getKeyInfo(key)).thenReturn(new KeyInfo(key, null, null));
- EntryEventImpl event =
- EntryEventImpl.create(lr, Operation.CREATE, key, false, null, true, false);
- event.setNewValue("create_value");
- return event;
- }
-
private static class TxNoRegionEntryTestableAbstractRegionMap
extends TxTestableAbstractRegionMap {
@Override
@@ -957,7 +949,7 @@ public class AbstractRegionMapTest {
@Override
public RegionEntry getEntry(Object key) {
RegionEntry regionEntry = mock(RegionEntry.class);
- when(regionEntry.isRemoved()).thenReturn(true);
+ when(regionEntry.isDestroyedOrRemoved()).thenReturn(true);
return regionEntry;
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/map/AbstractRegionMapPutTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/map/AbstractRegionMapPutTest.java
new file mode 100644
index 0000000..02a70a8
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/map/AbstractRegionMapPutTest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.InOrder;
+
+import org.apache.geode.cache.DiskAccessException;
+import org.apache.geode.cache.query.internal.index.IndexManager;
+import org.apache.geode.internal.cache.CachePerfStats;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.RegionEntry;
+import org.apache.geode.internal.cache.RegionEntryFactory;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class AbstractRegionMapPutTest {
+ private final InternalRegion internalRegion = mock(InternalRegion.class);
+ private final FocusedRegionMap focusedRegionMap = mock(FocusedRegionMap.class);
+ @SuppressWarnings("rawtypes")
+ private final Map entryMap = mock(Map.class);
+ private final EntryEventImpl event = mock(EntryEventImpl.class);
+ private final RegionEntry createdRegionEntry = mock(RegionEntry.class);
+ private final TestableRegionMapPut instance = spy(new TestableRegionMapPut());
+
+ @Before
+ public void setup() {
+ RegionEntryFactory regionEntryFactory = mock(RegionEntryFactory.class);
+ when(regionEntryFactory.createEntry(any(), any(), any())).thenReturn(createdRegionEntry);
+ when(focusedRegionMap.getEntryFactory()).thenReturn(regionEntryFactory);
+ when(focusedRegionMap.getEntryMap()).thenReturn(entryMap);
+ when(internalRegion.getCachePerfStats()).thenReturn(mock(CachePerfStats.class));
+ }
+
+ @Test
+ public void validateOwnerInitialized() {
+ when(internalRegion.isInitialized()).thenReturn(true);
+
+ TestableRegionMapPut testableRegionMapPut = new TestableRegionMapPut();
+
+ assertThat(testableRegionMapPut.isOwnerInitialized()).isTrue();
+ }
+
+ @Test
+ public void validateOwnerUninitialized() {
+ when(internalRegion.isInitialized()).thenReturn(false);
+
+ TestableRegionMapPut testableRegionMapPut = new TestableRegionMapPut();
+
+ assertThat(testableRegionMapPut.isOwnerInitialized()).isFalse();
+ }
+
+ @Test
+ public void validateSetLastModifiedTime() {
+ instance.setLastModifiedTime(99L);
+
+ assertThat(instance.getLastModifiedTime()).isEqualTo(99L);
+ }
+
+ @Test
+ public void validateSetClearOccurred() {
+ instance.setClearOccurred(true);
+
+ assertThat(instance.isClearOccurred()).isTrue();
+ }
+
+ @Test
+ public void putWithUnsatisfiedPreconditionsReturnsNull() {
+ instance.checkPreconditions = false;
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isNull();
+ verify(focusedRegionMap, times(1)).getEntry(eq(event));
+ verify(focusedRegionMap, times(1)).putEntryIfAbsent(any(), eq(createdRegionEntry));
+ verify(instance, times(1)).isOnlyExisting();
+ verify(instance, never()).entryExists(any());
+ verify(instance, times(1)).serializeNewValueIfNeeded();
+ verify(instance, times(1)).runWhileLockedForCacheModification(any());
+ verify(instance, times(1)).setOldValueForDelta();
+ verify(instance, times(1)).setOldValueInEvent();
+ verify(instance, times(1)).unsetOldValueForDelta();
+ verify(instance, times(1)).checkPreconditions();
+ verify(instance, never()).invokeCacheWriter();
+ verify(instance, never()).createOrUpdateEntry();
+ verify(instance, times(1)).shouldCreatedEntryBeRemoved();
+ verify(instance, never()).doBeforeCompletionActions();
+ verify(instance, times(1)).doAfterCompletionActions();
+ }
+
+ @Test
+ public void putWithShouldCreatedEntryBeRemovedCallsRemoveEntry() {
+ instance.shouldCreatedEntryBeRemoved = true;
+
+ instance.put();
+
+ verify(focusedRegionMap, times(1)).removeEntry(any(), eq(createdRegionEntry), eq(false));
+ }
+
+ @Test
+ public void putWithOqlIndexManagerCallInitAndCountDown() {
+ IndexManager oqlIndexManager = mock(IndexManager.class);
+ when(internalRegion.getIndexManager()).thenReturn(oqlIndexManager);
+ instance.checkPreconditions = true;
+
+ instance.put();
+
+ InOrder inOrder = inOrder(oqlIndexManager, instance);
+ inOrder.verify(oqlIndexManager, times(1)).waitForIndexInit();
+ inOrder.verify(instance, times(1)).createOrUpdateEntry();
+ inOrder.verify(oqlIndexManager, times(1)).countDownIndexUpdaters();
+ }
+
+ @Test
+ public void putCallsHandleDiskAccessExceptionWhenThrownDuringPut() {
+ instance.checkPreconditions = true;
+ doThrow(DiskAccessException.class).when(instance).createOrUpdateEntry();
+
+ assertThatThrownBy(() -> instance.put()).isInstanceOf(DiskAccessException.class);
+
+ verify(internalRegion, times(1)).handleDiskAccessException(any());
+ }
+
+ @Test
+ public void putWithSatisfiedPreconditionsAndNoExistingEntryReturnsRegionEntryFromFactory() {
+ instance.checkPreconditions = true;
+ when(focusedRegionMap.getEntry(event)).thenReturn(null);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isSameAs(createdRegionEntry);
+ verifyMapContractWhenCreateSucceeds();
+ verifyAbstractContract();
+ }
+
+
+ @Test
+ public void regionWithIndexMaintenanceSynchronousCallsSetUpdateInProgress() {
+ when(internalRegion.getIndexMaintenanceSynchronous()).thenReturn(true);
+ instance.checkPreconditions = true;
+
+ instance.put();
+
+ InOrder inOrder = inOrder(createdRegionEntry, instance);
+ inOrder.verify(createdRegionEntry, times(1)).setUpdateInProgress(true);
+ inOrder.verify(instance, times(1)).createOrUpdateEntry();
+ inOrder.verify(createdRegionEntry, times(1)).setUpdateInProgress(false);
+ }
+
+ @Test
+ public void putWithOnlyExistingTrueAndNoEntryExistsReturnsNull() {
+ instance.onlyExisting = true;
+ instance.entryExists = false;
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isNull();
+ verify(focusedRegionMap, times(1)).getEntry(eq(event));
+ verify(focusedRegionMap, never()).putEntryIfAbsent(any(), any());
+ verifyAbstractContract();
+ }
+
+ @Test
+ public void putWithExistingEntryReturnsExistingEntry() {
+ instance.checkPreconditions = true;
+ instance.onlyExisting = true;
+ instance.entryExists = true;
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(focusedRegionMap.getEntry(eq(event))).thenReturn(existingEntry);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isSameAs(existingEntry);
+ verify(focusedRegionMap, times(1)).getEntry(eq(event));
+ verify(focusedRegionMap, never()).getEntryFactory();
+ verify(focusedRegionMap, never()).putEntryIfAbsent(any(), eq(createdRegionEntry));
+ verifyAbstractContract();
+ }
+
+ @Test
+ public void putWithExistingEntryFromPutIfAbsentReturnsExistingEntry() {
+ instance.checkPreconditions = true;
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(focusedRegionMap.putEntryIfAbsent(any(), eq(createdRegionEntry)))
+ .thenReturn(existingEntry);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isSameAs(existingEntry);
+ verify(focusedRegionMap, times(1)).getEntry(eq(event));
+ verify(focusedRegionMap, times(1)).getEntryFactory();
+ verifyAbstractContract();
+ }
+
+ @Test
+ public void putWithExistingEntryFromPutIfAbsentThatIsRemovedReturnsExistingEntry() {
+ instance.checkPreconditions = true;
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(existingEntry.isRemovedPhase2()).thenReturn(true).thenReturn(false);
+ when(focusedRegionMap.putEntryIfAbsent(any(), eq(createdRegionEntry)))
+ .thenReturn(existingEntry);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isSameAs(existingEntry);
+ verify(focusedRegionMap, times(2)).getEntry(eq(event));
+ verify(focusedRegionMap, times(2)).getEntryFactory();
+ verify(entryMap, times(1)).remove(any(), eq(existingEntry));
+ verifyAbstractContractWithRetry();
+ }
+
+ private void verifyMapContractWhenCreateSucceeds() {
+ verify(focusedRegionMap, times(1)).getEntry(eq(event));
+ verify(focusedRegionMap, times(1)).putEntryIfAbsent(any(), eq(createdRegionEntry));
+ }
+
+ private void verifyAbstractContractWithRetry() {
+ verify(instance, times(2)).isOnlyExisting();
+ verifyCommonAbstractContract();
+ }
+
+ private void verifyAbstractContract() {
+ verify(instance, times(1)).isOnlyExisting();
+ verifyCommonAbstractContract();
+ }
+
+ private void verifyCommonAbstractContract() {
+ if (instance.onlyExisting) {
+ verify(instance, times(1)).entryExists(any());
+ } else {
+ verify(instance, never()).entryExists(any());
+ }
+ verify(instance, times(1)).serializeNewValueIfNeeded();
+ verify(instance, times(1)).runWhileLockedForCacheModification(any());
+ if (!instance.onlyExisting || instance.entryExists) {
+ verify(instance, times(1)).setOldValueForDelta();
+ verify(instance, times(1)).setOldValueInEvent();
+ verify(instance, times(1)).unsetOldValueForDelta();
+ verify(instance, times(1)).checkPreconditions();
+ } else {
+ verify(instance, never()).setOldValueForDelta();
+ verify(instance, never()).setOldValueInEvent();
+ verify(instance, never()).unsetOldValueForDelta();
+ verify(instance, never()).checkPreconditions();
+ }
+ if (instance.checkPreconditions || !instance.onlyExisting || instance.entryExists) {
+ verify(instance, times(1)).invokeCacheWriter();
+ verify(instance, times(1)).createOrUpdateEntry();
+ verify(instance, times(1)).doBeforeCompletionActions();
+ verify(instance, times(instance.isCreate() ? 1 : 0)).shouldCreatedEntryBeRemoved();
+ } else {
+ verify(instance, never()).invokeCacheWriter();
+ verify(instance, never()).createOrUpdateEntry();
+ verify(instance, never()).doBeforeCompletionActions();
+ verify(instance, never()).shouldCreatedEntryBeRemoved();
+ }
+ verify(instance, times(1)).doAfterCompletionActions();
+ }
+
+ private class TestableRegionMapPut extends AbstractRegionMapPut {
+ public boolean checkPreconditions;
+ public boolean onlyExisting;
+ public boolean entryExists;
+ public boolean shouldCreatedEntryBeRemoved;
+
+ public TestableRegionMapPut() {
+ super(focusedRegionMap, internalRegion, event);
+ }
+
+ @Override
+ protected boolean isOnlyExisting() {
+ return onlyExisting;
+ }
+
+ @Override
+ protected boolean entryExists(RegionEntry regionEntry) {
+ return entryExists;
+ }
+
+ @Override
+ protected void serializeNewValueIfNeeded() {}
+
+ @Override
+ protected void runWhileLockedForCacheModification(Runnable r) {
+ r.run();
+ }
+
+ @Override
+ protected void setOldValueForDelta() {}
+
+ @Override
+ protected void setOldValueInEvent() {}
+
+ @Override
+ protected void unsetOldValueForDelta() {}
+
+ @Override
+ protected boolean checkPreconditions() {
+ return checkPreconditions;
+ }
+
+ @Override
+ protected void invokeCacheWriter() {}
+
+ @Override
+ protected void createOrUpdateEntry() {}
+
+ @Override
+ protected void doBeforeCompletionActions() {}
+
+ @Override
+ protected boolean shouldCreatedEntryBeRemoved() {
+ return shouldCreatedEntryBeRemoved;
+ }
+
+ @Override
+ protected void doAfterCompletionActions() {}
+
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapCommitPutTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapCommitPutTest.java
new file mode 100644
index 0000000..2815cf7
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapCommitPutTest.java
@@ -0,0 +1,704 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.CachePerfStats;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.RegionClearedException;
+import org.apache.geode.internal.cache.RegionEntry;
+import org.apache.geode.internal.cache.RegionEntryFactory;
+import org.apache.geode.internal.cache.TXEntryState;
+import org.apache.geode.internal.cache.TXRmtEvent;
+import org.apache.geode.internal.cache.Token;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class RegionMapCommitPutTest {
+ private final InternalRegion internalRegion = mock(InternalRegion.class);
+ private final FocusedRegionMap focusedRegionMap = mock(FocusedRegionMap.class);
+ @SuppressWarnings("rawtypes")
+ private final Map entryMap = mock(Map.class);
+ private final EntryEventImpl event = mock(EntryEventImpl.class);
+ private final RegionEntry regionEntry = mock(RegionEntry.class);
+ private final TransactionId transactionId = mock(TransactionId.class);
+ private final TXRmtEvent txRmtEvent = mock(TXRmtEvent.class);
+ private final List<EntryEventImpl> pendingCallbacks = new ArrayList<>();
+ private final TXEntryState localTxEntryState = mock(TXEntryState.class);
+ private final InternalDistributedMember myId = mock(InternalDistributedMember.class);
+ private final InternalDistributedMember remoteId = mock(InternalDistributedMember.class);
+ private final Object key = "theKey";
+ private final Object newValue = "newValue";
+ private RegionMapCommitPut instance;
+
+ @Before
+ public void setup() {
+ RegionEntryFactory regionEntryFactory = mock(RegionEntryFactory.class);
+ when(regionEntryFactory.createEntry(any(), any(), any())).thenReturn(regionEntry);
+ when(focusedRegionMap.getEntryFactory()).thenReturn(regionEntryFactory);
+ when(focusedRegionMap.getEntryMap()).thenReturn(entryMap);
+ when(internalRegion.getCachePerfStats()).thenReturn(mock(CachePerfStats.class));
+ when(internalRegion.getCache()).thenReturn(mock(InternalCache.class));
+ when(internalRegion.getMyId()).thenReturn(myId);
+ when(transactionId.getMemberId()).thenReturn(myId);
+ when(event.getRawNewValueAsHeapObject()).thenReturn(newValue);
+ when(event.getKey()).thenReturn(key);
+ }
+
+ private void createInstance(Operation putOp, boolean didDestroy, TXRmtEvent txEvent,
+ TXEntryState txEntryState) {
+ instance = new RegionMapCommitPut(focusedRegionMap, internalRegion, event, putOp, didDestroy,
+ transactionId, txEvent, pendingCallbacks, txEntryState);
+ }
+
+ @Test
+ public void localCreateIsNotRemoteOrigin() {
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ assertThat(instance.isRemoteOrigin()).isFalse();
+ }
+
+ @Test
+ public void remoteCreateIsRemoteOrigin() {
+ when(transactionId.getMemberId()).thenReturn(remoteId);
+
+ createInstance(Operation.CREATE, false, txRmtEvent, null);
+
+ assertThat(instance.isRemoteOrigin()).isTrue();
+ }
+
+ @Test
+ public void localCreateIsNotOnlyExisting() {
+ when(transactionId.getMemberId()).thenReturn(myId);
+
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ assertThat(instance.isOnlyExisting()).isFalse();
+ }
+
+ @Test
+ public void remoteCreateWithNotAllEventsIsOnlyExisting() {
+ when(transactionId.getMemberId()).thenReturn(remoteId);
+ when(internalRegion.isAllEvents()).thenReturn(false);
+
+ createInstance(Operation.CREATE, false, txRmtEvent, null);
+
+ assertThat(instance.isOnlyExisting()).isTrue();
+ }
+
+ @Test
+ public void remoteCreateWithLocalTxEntryStateIsNotOnlyExisting() {
+ when(transactionId.getMemberId()).thenReturn(remoteId);
+
+ createInstance(Operation.CREATE, false, txRmtEvent, localTxEntryState);
+
+ assertThat(instance.isOnlyExisting()).isFalse();
+ }
+
+ @Test
+ public void remoteCreateWithAllEventsIsNotOnlyExisting() {
+ when(transactionId.getMemberId()).thenReturn(remoteId);
+ when(internalRegion.isAllEvents()).thenReturn(true);
+
+ createInstance(Operation.CREATE, false, txRmtEvent, null);
+
+ assertThat(instance.isOnlyExisting()).isFalse();
+ }
+
+ @Test
+ public void remoteUpdateWithAllEventsIsNotOnlyExisting() {
+ when(transactionId.getMemberId()).thenReturn(remoteId);
+ when(internalRegion.isAllEvents()).thenReturn(true);
+
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ assertThat(instance.isOnlyExisting()).isFalse();
+ }
+
+ @Test
+ public void uninitializedNonPartitionedRegionDoesNotInvokeCallbacks() {
+ when(internalRegion.shouldDispatchListenerEvent()).thenReturn(true);
+ when(internalRegion.shouldNotifyBridgeClients()).thenReturn(true);
+ when(internalRegion.getConcurrencyChecksEnabled()).thenReturn(true);
+
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ assertThat(instance.isInvokeCallbacks()).isFalse();
+ }
+
+ //
+ @Test
+ public void uninitializedPartitionedRegionDoesNotInvokeCallbacks() {
+ when(internalRegion.isUsedForPartitionedRegionBucket()).thenReturn(true);
+ when(internalRegion.getPartitionedRegion()).thenReturn(mock(LocalRegion.class));
+
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ assertThat(instance.isInvokeCallbacks()).isFalse();
+ }
+
+ @Test
+ public void initializedNonPartitionedRegionWithFalseAttributesDoesNotInvokeCallbacks() {
+ when(internalRegion.isInitialized()).thenReturn(true);
+
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ assertThat(instance.isInvokeCallbacks()).isFalse();
+ }
+
+ @Test
+ public void initializedNonPartitionedRegionWithShouldDispatchListenerEventDoesInvokeCallbacks() {
+ when(internalRegion.isInitialized()).thenReturn(true);
+ when(internalRegion.shouldDispatchListenerEvent()).thenReturn(true);
+
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ assertThat(instance.isInvokeCallbacks()).isTrue();
+ }
+
+ @Test
+ public void initializedNonPartitionedRegionWithShouldNotifyBridgeClientsDoesInvokeCallbacks() {
+ when(internalRegion.isInitialized()).thenReturn(true);
+ when(internalRegion.shouldNotifyBridgeClients()).thenReturn(true);
+
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ assertThat(instance.isInvokeCallbacks()).isTrue();
+ }
+
+ @Test
+ public void initializedNonPartitionedRegionWithConcurrencyChecksEnabledDoesInvokeCallbacks() {
+ when(internalRegion.isInitialized()).thenReturn(true);
+ when(internalRegion.getConcurrencyChecksEnabled()).thenReturn(true);
+
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ assertThat(instance.isInvokeCallbacks()).isTrue();
+ }
+
+ @Test
+ public void uninitializedPartitionedRegionWithFalseAttributesDoesNotInvokeCallbacks() {
+ when(internalRegion.isUsedForPartitionedRegionBucket()).thenReturn(true);
+ when(internalRegion.getPartitionedRegion()).thenReturn(mock(LocalRegion.class));
+
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ assertThat(instance.isInvokeCallbacks()).isFalse();
+ }
+
+ @Test
+ public void uninitializedPartitionedRegionWithShouldDispatchListenerEventDoesInvokeCallbacks() {
+ when(internalRegion.isUsedForPartitionedRegionBucket()).thenReturn(true);
+ LocalRegion partitionedRegion = mock(LocalRegion.class);
+ when(internalRegion.getPartitionedRegion()).thenReturn(partitionedRegion);
+ when(partitionedRegion.shouldDispatchListenerEvent()).thenReturn(true);
+
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ assertThat(instance.isInvokeCallbacks()).isTrue();
+ }
+
+ @Test
+ public void uninitializedPartitionedRegionWithShouldNotifyBridgeClientsDoesInvokeCallbacks() {
+ when(internalRegion.isUsedForPartitionedRegionBucket()).thenReturn(true);
+ LocalRegion partitionedRegion = mock(LocalRegion.class);
+ when(internalRegion.getPartitionedRegion()).thenReturn(partitionedRegion);
+ when(partitionedRegion.shouldNotifyBridgeClients()).thenReturn(true);
+
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ assertThat(instance.isInvokeCallbacks()).isTrue();
+ }
+
+ @Test
+ public void uninitializedPartitionedRegionWithConcurrencyChecksEnabledDoesInvokeCallbacks() {
+ when(internalRegion.isUsedForPartitionedRegionBucket()).thenReturn(true);
+ LocalRegion partitionedRegion = mock(LocalRegion.class);
+ when(internalRegion.getPartitionedRegion()).thenReturn(partitionedRegion);
+ when(partitionedRegion.getConcurrencyChecksEnabled()).thenReturn(true);
+
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ assertThat(instance.isInvokeCallbacks()).isTrue();
+ }
+
+ @Test
+ public void remoteUpdateWithAllEventsAndInitializedIsOnlyExisting() {
+ when(transactionId.getMemberId()).thenReturn(remoteId);
+ when(internalRegion.isAllEvents()).thenReturn(true);
+ when(internalRegion.isInitialized()).thenReturn(true);
+
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ assertThat(instance.isOnlyExisting()).isTrue();
+ }
+
+ @Test
+ public void successfulPutCallsUpdateStatsForPut() {
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isSameAs(regionEntry);
+ final long lastModifiedTime = instance.getLastModifiedTime();
+ verify(regionEntry, times(1)).updateStatsForPut(eq(lastModifiedTime), eq(lastModifiedTime));
+ }
+
+ @Test
+ public void doBeforeCompletionActionsCallsTxApplyPutPart2() {
+ final boolean didDestroy = true;
+ createInstance(Operation.CREATE, didDestroy, null, localTxEntryState);
+
+ instance.put();
+
+ final long lastModifiedTime = instance.getLastModifiedTime();
+ verify(internalRegion, times(1)).txApplyPutPart2(eq(regionEntry), eq(key), eq(lastModifiedTime),
+ eq(true), eq(didDestroy), eq(false));
+ }
+
+ @Test
+ public void putThatDoesNotAddEventToPendingCallbacksCallsEventRelease() {
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ instance.put();
+
+ assertThat(instance.isCallbackEventInPending()).isFalse();
+ assertThat(pendingCallbacks).doesNotContain(event);
+ verify(event, never()).changeRegionToBucketsOwner();
+ verify(event, never()).setOriginRemote(anyBoolean());
+ verify(event, times(1)).release();
+ }
+
+ @Test
+ public void putThatDoesAddEventToPendingCallbacksDoesNotCallEventRelease() {
+ when(internalRegion.isInitialized()).thenReturn(true);
+ when(internalRegion.getConcurrencyChecksEnabled()).thenReturn(true);
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(focusedRegionMap.getEntry(eq(event))).thenReturn(existingEntry);
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isNotNull();
+ verify(event, never()).release();
+ }
+
+ @Test
+ public void putThatDoesNotInvokesCallbacksDoesNotAddToPendingCallbacks() {
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ instance.put();
+
+ assertThat(instance.isCallbackEventInPending()).isFalse();
+ assertThat(pendingCallbacks).doesNotContain(event);
+ verify(event, never()).changeRegionToBucketsOwner();
+ verify(event, never()).setOriginRemote(anyBoolean());
+ }
+
+ @Test
+ public void putThatInvokesCallbacksAddsToPendingCallbacks() {
+ when(internalRegion.isInitialized()).thenReturn(true);
+ when(internalRegion.getConcurrencyChecksEnabled()).thenReturn(true);
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(focusedRegionMap.getEntry(eq(event))).thenReturn(existingEntry);
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isNotNull();
+ assertThat(instance.isCallbackEventInPending()).isTrue();
+ assertThat(pendingCallbacks).contains(event);
+ verify(event, times(1)).changeRegionToBucketsOwner();
+ verify(event, times(1)).setOriginRemote(instance.isRemoteOrigin());
+ }
+
+ @Test
+ public void updateThatDoesNotSeeClearCallsLruEntryUpdate() {
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(focusedRegionMap.getEntry(eq(event))).thenReturn(existingEntry);
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ instance.put();
+
+ verify(focusedRegionMap, times(1)).lruEntryUpdate(existingEntry);
+ }
+
+ @Test
+ public void createThatDoesNotSeeClearCallsLruEntryCreate() {
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ instance.put();
+
+ verify(focusedRegionMap, times(1)).lruEntryCreate(regionEntry);
+ verify(focusedRegionMap, times(1)).incEntryCount(1);
+ }
+
+ @Test
+ public void createCallsUpdateSizeOnCreate() {
+ final int newSize = 79;
+ when(internalRegion.calculateRegionEntryValueSize(eq(regionEntry))).thenReturn(newSize);
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ instance.put();
+
+ verify(internalRegion, times(1)).updateSizeOnCreate(eq(key), eq(newSize));
+ }
+
+ @Test
+ public void updateCallsUpdateSizeOnPut() {
+ final int oldSize = 12;
+ final int newSize = 79;
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(focusedRegionMap.getEntry(eq(event))).thenReturn(existingEntry);
+ when(internalRegion.calculateRegionEntryValueSize(eq(existingEntry))).thenReturn(oldSize)
+ .thenReturn(newSize);
+ createInstance(Operation.UPDATE, false, null, localTxEntryState);
+
+ instance.put();
+
+ verify(internalRegion, times(1)).updateSizeOnPut(eq(key), eq(oldSize), eq(newSize));
+ }
+
+ @Test
+ public void createThatDoesSeeClearDoesNotMakeLruCalls() throws RegionClearedException {
+ doThrow(RegionClearedException.class).when(regionEntry).setValue(any(), any());
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ instance.put();
+
+ assertThat(instance.isClearOccurred()).isTrue();
+ verify(focusedRegionMap, never()).lruEntryUpdate(any());
+ verify(focusedRegionMap, never()).lruEntryCreate(any());
+ verify(focusedRegionMap, never()).incEntryCount(1);
+ }
+
+ @Test
+ public void putWithoutConcurrencyChecksEnabledDoesNotCallSetVersionTag() {
+ createInstance(Operation.UPDATE, false, null, localTxEntryState);
+
+ instance.put();
+
+ verify(localTxEntryState, never()).setVersionTag(any());
+ }
+
+ @Test
+ public void putWithConcurrencyChecksEnabledDoesCallSetVersionTag() {
+ when(internalRegion.getConcurrencyChecksEnabled()).thenReturn(true);
+ VersionTag versionTag = mock(VersionTag.class);
+ when(event.getVersionTag()).thenReturn(versionTag);
+ createInstance(Operation.UPDATE, false, null, localTxEntryState);
+
+ instance.put();
+
+ verify(localTxEntryState, times(1)).setVersionTag(eq(versionTag));
+ }
+
+ @Test
+ public void failedUpdateWithDidDestroyDoesCallTxApplyPutHandleDidDestroy() {
+ when(transactionId.getMemberId()).thenReturn(remoteId);
+ when(internalRegion.getConcurrencyChecksEnabled()).thenReturn(true);
+ createInstance(Operation.UPDATE, true, txRmtEvent, null);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isNull();
+ assertThat(instance.isOnlyExisting()).isTrue();
+ verify(internalRegion, times(1)).txApplyPutHandleDidDestroy(eq(key));
+ }
+
+ @Test
+ public void successfulUpdateWithDidDestroyDoesNotCallTxApplyPutHandleDidDestroy() {
+ when(transactionId.getMemberId()).thenReturn(remoteId);
+ when(internalRegion.getConcurrencyChecksEnabled()).thenReturn(true);
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(focusedRegionMap.getEntry(eq(event))).thenReturn(existingEntry);
+ createInstance(Operation.UPDATE, true, txRmtEvent, null);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isNotNull();
+ assertThat(instance.isOnlyExisting()).isTrue();
+ verify(internalRegion, never()).txApplyPutHandleDidDestroy(any());
+ }
+
+ @Test
+ public void failedUpdateDoesCallInvokeTXCallbacks() {
+ when(transactionId.getMemberId()).thenReturn(remoteId);
+ when(internalRegion.isInitialized()).thenReturn(true);
+ when(internalRegion.getConcurrencyChecksEnabled()).thenReturn(true);
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isNull();
+ assertThat(instance.isOnlyExisting()).isTrue();
+ assertThat(instance.isInvokeCallbacks()).isTrue();
+ verify(event, times(1)).makeUpdate();
+ verify(internalRegion, times(1)).invokeTXCallbacks(eq(EnumListenerEvent.AFTER_UPDATE),
+ eq(event), eq(false));
+ }
+
+ @Test
+ public void successfulUpdateDoesNotCallInvokeTXCallbacks() {
+ when(transactionId.getMemberId()).thenReturn(remoteId);
+ when(internalRegion.isInitialized()).thenReturn(true);
+ when(internalRegion.getConcurrencyChecksEnabled()).thenReturn(true);
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(focusedRegionMap.getEntry(eq(event))).thenReturn(existingEntry);
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isNotNull();
+ assertThat(instance.isOnlyExisting()).isTrue();
+ assertThat(instance.isInvokeCallbacks()).isTrue();
+ verify(internalRegion, never()).invokeTXCallbacks(any(), any(), anyBoolean());
+ }
+
+ @Test
+ public void localUpdateSetsOldValueOnEvent() {
+ Object oldValue = new Object();
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(existingEntry.getValueInVM(any())).thenReturn(oldValue);
+ when(focusedRegionMap.getEntry(eq(event))).thenReturn(existingEntry);
+ createInstance(Operation.UPDATE, false, null, localTxEntryState);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isSameAs(existingEntry);
+ verify(event, times(1)).setOldValue(oldValue);
+ verify(existingEntry, never()).txDidDestroy(anyLong());
+ }
+
+ @Test
+ public void localUpdateThatAlsoDidDestroyCallsTxDidDestroy() {
+ Object oldValue = new Object();
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(existingEntry.getValueInVM(any())).thenReturn(oldValue);
+ when(focusedRegionMap.getEntry(eq(event))).thenReturn(existingEntry);
+ final long lastModifiedTime = 123L;
+ when(internalRegion.cacheTimeMillis()).thenReturn(lastModifiedTime);
+ createInstance(Operation.UPDATE, true, null, localTxEntryState);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isSameAs(existingEntry);
+ verify(event, times(1)).setOldValue(oldValue);
+ verify(existingEntry, times(1)).txDidDestroy(lastModifiedTime);
+ }
+
+ @Test
+ public void localCreateDoesSetsOldValueToNullOnEvent() {
+ createInstance(Operation.CREATE, false, null, localTxEntryState);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isSameAs(regionEntry);
+ verify(event, times(1)).setOldValue(null);
+ }
+
+ @Test
+ public void localCreateCallsProcessAndGenerateTXVersionTag() {
+ createInstance(Operation.SEARCH_CREATE, false, null, localTxEntryState);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isSameAs(regionEntry);
+ verify(focusedRegionMap, times(1)).processAndGenerateTXVersionTag(eq(event), eq(regionEntry),
+ eq(localTxEntryState));
+ }
+
+ @Test
+ public void localSearchCreateCallsSetValueResultOfSearchWithTrue() {
+ createInstance(Operation.SEARCH_CREATE, false, null, localTxEntryState);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isSameAs(regionEntry);
+ verify(regionEntry, times(1)).setValueResultOfSearch(true);
+ }
+
+ @Test
+ public void remoteUpdateWithOnlyExistingSucceeds() throws Exception {
+ when(internalRegion.isAllEvents()).thenReturn(true);
+ when(internalRegion.isInitialized()).thenReturn(true);
+ Object oldValue = new Object();
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(existingEntry.getValueInVM(any())).thenReturn(oldValue);
+ when(focusedRegionMap.getEntry(eq(event))).thenReturn(existingEntry);
+ when(existingEntry.prepareValueForCache(any(), eq(newValue), eq(event), eq(true)))
+ .thenReturn(newValue);
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isSameAs(existingEntry);
+ verify(existingEntry, times(1)).setValue(eq(internalRegion), eq(newValue));
+ }
+
+ @Test
+ public void remoteUpdateWithOnlyExistingCallsAddPut() throws Exception {
+ when(internalRegion.isAllEvents()).thenReturn(true);
+ when(internalRegion.isInitialized()).thenReturn(true);
+ Object oldValue = new Object();
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(existingEntry.getValueInVM(any())).thenReturn(oldValue);
+ when(focusedRegionMap.getEntry(eq(event))).thenReturn(existingEntry);
+ when(existingEntry.prepareValueForCache(any(), eq(newValue), eq(event), eq(true)))
+ .thenReturn(newValue);
+ final Object callbackArgument = "callbackArgument";
+ when(event.getCallbackArgument()).thenReturn(callbackArgument);
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isSameAs(existingEntry);
+ verify(txRmtEvent, times(1)).addPut(eq(Operation.UPDATE), eq(internalRegion), eq(existingEntry),
+ eq(key), eq(newValue), eq(callbackArgument));
+ }
+
+ @Test
+ public void remoteUpdateWithInvalidateWithOnlyExistingSucceeds() throws Exception {
+ when(internalRegion.isAllEvents()).thenReturn(true);
+ when(internalRegion.isInitialized()).thenReturn(true);
+ Object oldValue = new Object();
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(existingEntry.getValueInVM(any())).thenReturn(oldValue);
+ when(focusedRegionMap.getEntry(eq(event))).thenReturn(existingEntry);
+ when(event.getRawNewValueAsHeapObject()).thenReturn(null);
+ when(existingEntry.prepareValueForCache(any(), eq(Token.INVALID), eq(event), eq(true)))
+ .thenReturn(Token.INVALID);
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isSameAs(existingEntry);
+ verify(existingEntry, times(1)).setValue(eq(internalRegion), eq(Token.INVALID));
+ }
+
+ @Test
+ public void remoteUpdateWithLocalInvalidateWithOnlyExistingSucceeds() throws Exception {
+ when(internalRegion.isAllEvents()).thenReturn(true);
+ when(internalRegion.isInitialized()).thenReturn(true);
+ Object oldValue = new Object();
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(existingEntry.getValueInVM(any())).thenReturn(oldValue);
+ when(focusedRegionMap.getEntry(eq(event))).thenReturn(existingEntry);
+ when(event.getRawNewValueAsHeapObject()).thenReturn(null);
+ when(event.isLocalInvalid()).thenReturn(true);
+ when(existingEntry.prepareValueForCache(any(), eq(Token.LOCAL_INVALID), eq(event), eq(true)))
+ .thenReturn(Token.LOCAL_INVALID);
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isSameAs(existingEntry);
+ verify(existingEntry, times(1)).setValue(eq(internalRegion), eq(Token.LOCAL_INVALID));
+ }
+
+ @Test
+ public void remoteUpdateWithOnlyExistingOnRemovedEntryFails() {
+ when(transactionId.getMemberId()).thenReturn(remoteId);
+ when(internalRegion.isAllEvents()).thenReturn(true);
+ when(internalRegion.isInitialized()).thenReturn(true);
+ Object oldValue = new Object();
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(existingEntry.getValueInVM(any())).thenReturn(oldValue);
+ when(existingEntry.isDestroyedOrRemoved()).thenReturn(false).thenReturn(true);
+ when(focusedRegionMap.getEntry(eq(event))).thenReturn(existingEntry);
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isNull();
+ }
+
+ @Test
+ public void putThatUpdatesTombstoneCallsUnscheduleTombstone() {
+ when(internalRegion.isInitialized()).thenReturn(true);
+ when(internalRegion.getConcurrencyChecksEnabled()).thenReturn(true);
+ RegionEntry existingEntry = mock(RegionEntry.class);
+ when(existingEntry.isTombstone()).thenReturn(true);
+ when(focusedRegionMap.getEntry(eq(event))).thenReturn(existingEntry);
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ RegionEntry result = instance.put();
+
+ assertThat(result).isNotNull();
+ verify(internalRegion, times(1)).unscheduleTombstone(eq(existingEntry));
+ }
+
+
+
+ @Test
+ public void entryExistsWithNullReturnsFalse() {
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+
+ boolean result = instance.entryExists(null);
+
+ assertThat(result).isFalse();
+ }
+
+ @Test
+ public void entryExistsWithRemovedEntryReturnsFalse() {
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+ RegionEntry regionEntry = mock(RegionEntry.class);
+ when(regionEntry.isDestroyedOrRemoved()).thenReturn(true);
+
+ boolean result = instance.entryExists(regionEntry);
+
+ assertThat(result).isFalse();
+ }
+
+ @Test
+ public void entryExistsWithExistingEntryReturnsTrue() {
+ createInstance(Operation.UPDATE, false, txRmtEvent, null);
+ RegionEntry regionEntry = mock(RegionEntry.class);
+
+ boolean result = instance.entryExists(regionEntry);
+
+ assertThat(result).isTrue();
+ }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapPutTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapPutTest.java
index 2394221..391575e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapPutTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapPutTest.java
@@ -33,6 +33,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Scope;
import org.apache.geode.internal.cache.CachePerfStats;
@@ -59,6 +60,7 @@ public class RegionMapPutTest {
private boolean requireOldValue = false;
private Object expectedOldValue = null;
private boolean overwriteDestroyed = false;
+ private RegionMapPut instance;
@Before
public void setup() {
@@ -76,14 +78,160 @@ public class RegionMapPutTest {
when(regionEntry.isRemoved()).thenReturn(true);
}
+ private void createInstance() {
+ instance = new RegionMapPut(focusedRegionMap, internalRegion, cacheModificationLock,
+ entryEventSerialization, event, ifNew, ifOld, overwriteDestroyed, requireOldValue,
+ expectedOldValue);
+ }
+
private RegionEntry doPut() {
- RegionMapPut regionMapPut = new RegionMapPut(focusedRegionMap, internalRegion,
- cacheModificationLock, entryEventSerialization, event, ifNew, ifOld, overwriteDestroyed,
- requireOldValue, expectedOldValue);
- return regionMapPut.put();
+ createInstance();
+ return instance.put();
+ }
+
+ @Test
+ public void retrieveOldValueForDeltaDefaultToFalse() {
+ createInstance();
+
+ assertThat(instance.isRetrieveOldValueForDelta()).isFalse();
+ }
+
+ @Test
+ public void retrieveOldValueForDeltaTrueIfEventHasDeltaBytes() {
+ when(event.getDeltaBytes()).thenReturn(new byte[1]);
+
+ createInstance();
+
+ assertThat(instance.isRetrieveOldValueForDelta()).isTrue();
+ }
+
+ @Test
+ public void retrieveOldValueForDeltaFalseIfEventHasDeltaBytesAndRawNewValue() {
+ when(event.getDeltaBytes()).thenReturn(new byte[1]);
+ when(event.getRawNewValue()).thenReturn(new Object());
+
+ createInstance();
+
+ assertThat(instance.isRetrieveOldValueForDelta()).isFalse();
+ }
+
+ @Test
+ public void replaceOnClientDefaultsToFalse() {
+ createInstance();
+
+ assertThat(instance.isReplaceOnClient()).isFalse();
+ }
+
+ @Test
+ public void replaceOnClientIsTrueIfOperationIsReplaceAndOwnerIsClient() {
+ when(event.getOperation()).thenReturn(Operation.REPLACE);
+ when(internalRegion.hasServerProxy()).thenReturn(true);
+
+ createInstance();
+
+ assertThat(instance.isReplaceOnClient()).isTrue();
}
@Test
+ public void replaceOnClientIsFalseIfOperationIsReplaceAndOwnerIsNotClient() {
+ when(event.getOperation()).thenReturn(Operation.REPLACE);
+
+ createInstance();
+
+ assertThat(instance.isReplaceOnClient()).isFalse();
+ }
+
+ @Test
+ public void onlyExistingDefaultsToFalse() {
+ createInstance();
+
+ assertThat(instance.isOnlyExisting()).isFalse();
+ }
+
+ @Test
+ public void onlyExistingIsTrueIfOld() {
+ ifOld = true;
+
+ createInstance();
+
+ assertThat(instance.isOnlyExisting()).isTrue();
+ }
+
+ @Test
+ public void onlyExistingIsFalseIfOldAndReplaceOnClient() {
+ ifOld = true;
+ when(event.getOperation()).thenReturn(Operation.REPLACE);
+ when(internalRegion.hasServerProxy()).thenReturn(true);
+
+ createInstance();
+
+ assertThat(instance.isOnlyExisting()).isFalse();
+ }
+
+ @Test
+ public void cacheWriteDefaultToFalse() {
+ createInstance();
+
+ assertThat(instance.isCacheWrite()).isFalse();
+ }
+
+ @Test
+ public void cacheWriteIsFaseIfGenerateCallbacksButNotDistributedEtc() {
+ when(event.isGenerateCallbacks()).thenReturn(true);
+ createInstance();
+
+ assertThat(instance.isCacheWrite()).isFalse();
+ }
+
+ @Test
+ public void cacheWriteIsTrueIfGenerateCallbacksAndDistributed() {
+ when(event.isGenerateCallbacks()).thenReturn(true);
+ when(internalRegion.getScope()).thenReturn(Scope.DISTRIBUTED_ACK);
+ createInstance();
+
+ assertThat(instance.isCacheWrite()).isTrue();
+ }
+
+ @Test
+ public void cacheWriteIsTrueIfGenerateCallbacksAndServerProxy() {
+ when(event.isGenerateCallbacks()).thenReturn(true);
+ when(internalRegion.hasServerProxy()).thenReturn(true);
+ createInstance();
+
+ assertThat(instance.isCacheWrite()).isTrue();
+ }
+
+ @Test
+ public void cacheWriteIsTrueIfGenerateCallbacksAndCacheWriter() {
+ when(event.isGenerateCallbacks()).thenReturn(true);
+ when(internalRegion.basicGetWriter()).thenReturn(mock(CacheWriter.class));
+ createInstance();
+
+ assertThat(instance.isCacheWrite()).isTrue();
+ }
+
+ @Test
+ public void isOriginRemoteCausesCacheWriteToBeFalse() {
+ when(event.isOriginRemote()).thenReturn(true);
+ when(event.isGenerateCallbacks()).thenReturn(true);
+ when(internalRegion.basicGetWriter()).thenReturn(mock(CacheWriter.class));
+ createInstance();
+
+ assertThat(instance.isCacheWrite()).isFalse();
+ }
+
+ @Test
+ public void netSearchCausesCacheWriteToBeFalse() {
+ when(event.isNetSearch()).thenReturn(true);
+ when(event.isGenerateCallbacks()).thenReturn(true);
+ when(internalRegion.basicGetWriter()).thenReturn(mock(CacheWriter.class));
+ createInstance();
+
+ assertThat(instance.isCacheWrite()).isFalse();
+ }
+
+
+ @Test
public void createOnEmptyMapAddsEntry() throws Exception {
ifNew = true;
when(event.getOperation()).thenReturn(Operation.CREATE);
--
To stop receiving notification emails like this one, please contact
dschneider@apache.org.