You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/05/02 18:20:29 UTC
[geode] branch develop updated: GEODE-5164: refactor txApplyPut
(#1900)
This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 1666adc GEODE-5164: refactor txApplyPut (#1900)
1666adc is described below
commit 1666adc13ec578f7c3f1ae295c74242760512c1e
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Wed May 2 11:20:24 2018 -0700
GEODE-5164: refactor txApplyPut (#1900)
txApplyPut had three large blocks of code that have been refactored into one method.
So applyTxUpdateOnReplicateOrRedundantCopy no longer exists.
Also some methods on RegionMapPut have been renamed for clarity.
---
.../geode/internal/cache/AbstractRegionMap.java | 578 ++++++++++++---------
.../apache/geode/internal/cache/LocalRegion.java | 10 +
.../geode/internal/cache/map/RegionMapPut.java | 43 +-
.../internal/cache/AbstractRegionMapTest.java | 3 +
4 files changed, 354 insertions(+), 280 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index aad10dd..0e39632 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -2113,23 +2113,31 @@ public abstract class AbstractRegionMap
// "fix" for bug 32440
Assert.assertTrue(false, "The owner for RegionMap " + this + " is null");
}
-
- Operation putOp = p_putOp;
-
- Object newValue = nv;
-
final boolean hasRemoteOrigin = !((TXId) txId).getMemberId().equals(owner.getMyId());
final boolean isTXHost = txEntryState != null;
final boolean isClientTXOriginator = owner.getCache().isClient() && !hasRemoteOrigin;
final boolean isRegionReady = owner.isInitialized();
+ boolean onlyExisting = false;
+ if (hasRemoteOrigin && !isTXHost && !isClientTXOriginator) {
+ // If we are not a mirror then only apply the update to existing
+ // entries
+ //
+ // If we are a mirror then then only apply the update to
+ // existing entries when the operation is an update and we
+ // are initialized.
+ // Otherwise use the standard create/update logic
+ if (!owner.isAllEvents() || (!p_putOp.isCreate() && isRegionReady)) {
+ onlyExisting = true;
+ }
+ }
+ TxApplyPutContext txApplyPutContext = null;
@Released
- EntryEventImpl callbackEvent = null;
- boolean invokeCallbacks = shouldCreateCallbackEvent(owner, isRegionReady);
- boolean callbackEventInPending = false;
- callbackEvent =
- createTransactionCallbackEvent(owner, putOp, key, newValue, txId, txEvent, eventId,
+ final EntryEventImpl callbackEvent =
+ createTransactionCallbackEvent(owner, p_putOp, key, nv, txId, txEvent, eventId,
aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+ boolean invokeCallbacks = shouldCreateCallbackEvent(owner, isRegionReady);
try {
+ Object newValue = nv;
if (logger.isDebugEnabled()) {
logger.debug("txApplyPut callbackEvent={}", callbackEvent);
}
@@ -2139,7 +2147,10 @@ public abstract class AbstractRegionMap
txHandleWANEvent(owner, callbackEvent, txEntryState);
}
- boolean opCompleted = false;
+ txApplyPutContext = new TxApplyPutContext(false, false, p_putOp, callbackEvent, onlyExisting,
+ newValue, didDestroy, txEvent, aCallbackArgument, pendingCallbacks, txEntryState,
+ hasRemoteOrigin, invokeCallbacks);
+
// Fix for Bug #44431. We do NOT want to update the region and wait
// later for index INIT as region.clear() can cause inconsistency if
// happened in parallel as it also does index INIT.
@@ -2148,171 +2159,21 @@ public abstract class AbstractRegionMap
oqlIndexManager.waitForIndexInit();
}
try {
- if (hasRemoteOrigin && !isTXHost && !isClientTXOriginator) {
- // If we are not a mirror then only apply the update to existing
- // entries
- //
- // If we are a mirror then then only apply the update to
- // existing entries when the operation is an update and we
- // are initialized.
- // Otherwise use the standard create/update logic
- if (!owner.isAllEvents() || (!putOp.isCreate() && isRegionReady)) {
- callbackEventInPending = applyTxUpdateOnReplicateOrRedundantCopy(key, nv, didDestroy,
- txEvent, aCallbackArgument, pendingCallbacks, txEntryState, owner, putOp, newValue,
- hasRemoteOrigin, callbackEvent, invokeCallbacks, opCompleted);
- return;
+ do {
+ txApplyPutContext.setRegionEntry(null);
+ if (!txApplyPutFindExistingEntry(txApplyPutContext, onlyExisting)) {
+ break;
}
- }
- RegionEntry newRe = getEntryFactory().createEntry(owner, key, Token.REMOVED_PHASE1);
- synchronized (newRe) {
- try {
- RegionEntry oldRe = putEntryIfAbsent(key, newRe);
- while (!opCompleted && oldRe != null) {
- synchronized (oldRe) {
- if (oldRe.isRemovedPhase2()) {
- owner.getCachePerfStats().incRetries();
- getEntryMap().remove(key, oldRe);
- oldRe = putEntryIfAbsent(key, newRe);
- } else {
- opCompleted = true;
- if (!oldRe.isRemoved()) {
- putOp = putOp.getCorrespondingUpdateOp();
- }
- // Net writers are not called for received transaction data
- final int oldSize = owner.calculateRegionEntryValueSize(oldRe);
- final boolean oldIsRemoved = oldRe.isDestroyedOrRemoved();
- if (callbackEvent != null) {
- callbackEvent.setRegionEntry(oldRe);
- callbackEvent.setOldValue(oldRe.getValueInVM(owner)); // OFFHEAP eei
- }
- boolean clearOccured = false;
- // Set RegionEntry updateInProgress
- if (owner.getIndexMaintenanceSynchronous()) {
- oldRe.setUpdateInProgress(true);
- }
- try {
- txRemoveOldIndexEntry(putOp, oldRe);
- if (didDestroy) {
- oldRe.txDidDestroy(owner.cacheTimeMillis());
- }
- if (txEvent != null) {
- txEvent.addPut(putOp, owner, oldRe, oldRe.getKey(), newValue,
- aCallbackArgument);
- }
- oldRe.setValueResultOfSearch(putOp.isNetSearch());
- try {
- processAndGenerateTXVersionTag(owner, callbackEvent, oldRe, txEntryState);
- boolean wasTombstone = oldRe.isTombstone();
- {
- oldRe.setValue(owner, oldRe.prepareValueForCache(owner, newValue,
- callbackEvent, !putOp.isCreate()));
- if (wasTombstone) {
- owner.unscheduleTombstone(oldRe);
- }
- }
- if (putOp.isCreate()) {
- owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(oldRe));
- } else if (putOp.isUpdate()) {
- // Rahul : fix for 41694. Negative bucket size can also be
- // an issue with normal GFE Delta and will have to be fixed
- // in a similar manner and may be this fix the the one for
- // other delta can be combined.
- {
- owner.updateSizeOnPut(key, oldSize,
- owner.calculateRegionEntryValueSize(oldRe));
- }
- }
- } catch (RegionClearedException rce) {
- clearOccured = true;
- }
- {
- long lastMod = owner.cacheTimeMillis();
- EntryLogger.logTXPut(_getOwnerObject(), key, nv);
- oldRe.updateStatsForPut(lastMod, lastMod);
- owner.txApplyPutPart2(oldRe, oldRe.getKey(), lastMod, false, didDestroy,
- clearOccured);
- }
- } finally {
- if (oldRe != null && owner.getIndexMaintenanceSynchronous()) {
- oldRe.setUpdateInProgress(false);
- }
- }
- if (invokeCallbacks) {
- if (!oldIsRemoved) {
- callbackEvent.makeUpdate();
- }
- switchEventOwnerAndOriginRemote(callbackEvent, hasRemoteOrigin);
- pendingCallbacks.add(callbackEvent);
- callbackEventInPending = true;
- }
- if (!clearOccured) {
- lruEntryUpdate(oldRe);
- }
- }
- }
- }
- if (!opCompleted) {
- putOp = putOp.getCorrespondingCreateOp();
- if (callbackEvent != null) {
- callbackEvent.setRegionEntry(newRe);
- callbackEvent.setOldValue(null);
- }
- boolean clearOccured = false;
- // Set RegionEntry updateInProgress
- if (owner.getIndexMaintenanceSynchronous()) {
- newRe.setUpdateInProgress(true);
- }
- try {
- txRemoveOldIndexEntry(putOp, newRe);
- // creating a new entry
- if (didDestroy) {
- newRe.txDidDestroy(owner.cacheTimeMillis());
- }
- if (txEvent != null) {
- txEvent.addPut(putOp, owner, newRe, newRe.getKey(), newValue, aCallbackArgument);
- }
- newRe.setValueResultOfSearch(putOp.isNetSearch());
- try {
+ txApplyPutCreateNewEntryIfNeeded(txApplyPutContext);
+ } while (!addRegionEntryToMapAndDoTxPut(txApplyPutContext));
- processAndGenerateTXVersionTag(owner, callbackEvent, newRe, txEntryState);
- {
- newRe.setValue(owner, newRe.prepareValueForCache(owner, newValue, callbackEvent,
- !putOp.isCreate()));
- }
- owner.updateSizeOnCreate(newRe.getKey(),
- owner.calculateRegionEntryValueSize(newRe));
- } catch (RegionClearedException rce) {
- clearOccured = true;
- }
- {
- long lastMod = owner.cacheTimeMillis();
- EntryLogger.logTXPut(_getOwnerObject(), key, nv);
- newRe.updateStatsForPut(lastMod, lastMod);
- owner.txApplyPutPart2(newRe, newRe.getKey(), lastMod, true, didDestroy,
- clearOccured);
- }
- } finally {
- if (newRe != null && owner.getIndexMaintenanceSynchronous()) {
- newRe.setUpdateInProgress(false);
- }
- }
- opCompleted = true;
- if (invokeCallbacks) {
- callbackEvent.makeCreate();
- callbackEvent.setOldValue(null);
- switchEventOwnerAndOriginRemote(callbackEvent, hasRemoteOrigin);
- pendingCallbacks.add(callbackEvent);
- callbackEventInPending = true;
- }
- if (!clearOccured) {
- lruEntryCreate(newRe);
- incEntryCount(1);
- }
- }
- } finally {
- if (!opCompleted) {
- removeEntry(key, newRe, false);
- }
+ if (onlyExisting && !txApplyPutContext.isOpCompleted()) {
+ if (didDestroy) {
+ owner.txApplyPutHandleDidDestroy(key);
+ }
+ if (invokeCallbacks) {
+ callbackEvent.makeUpdate();
+ owner.invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, callbackEvent, false);
}
}
if (owner.getConcurrencyChecksEnabled() && txEntryState != null && callbackEvent != null) {
@@ -2327,104 +2188,303 @@ public abstract class AbstractRegionMap
}
}
} finally {
- if (!callbackEventInPending)
+ if (txApplyPutContext == null || !txApplyPutContext.isCallbackEventInPending())
callbackEvent.release();
}
}
- private boolean applyTxUpdateOnReplicateOrRedundantCopy(Object key, Object nv, boolean didDestroy,
- TXRmtEvent txEvent, Object aCallbackArgument, List<EntryEventImpl> pendingCallbacks,
- TXEntryState txEntryState, LocalRegion owner, Operation putOp, Object newValue,
- boolean hasRemoteOrigin, EntryEventImpl callbackEvent, boolean invokeCallbacks,
- boolean opCompleted) {
- boolean result = false;
- // At this point we should only apply the update if the entry exists
- RegionEntry re = getEntry(key); // Fix for bug 32347.
- if (re != null) {
- synchronized (re) {
- if (!re.isRemoved()) {
- opCompleted = true;
- putOp = putOp.getCorrespondingUpdateOp();
- // Net writers are not called for received transaction data
- final int oldSize = owner.calculateRegionEntryValueSize(re);
- if (callbackEvent != null) {
- callbackEvent.setRegionEntry(re);
- callbackEvent.setOldValue(re.getValueInVM(owner)); // OFFHEAP eei
- }
+ private boolean addRegionEntryToMapAndDoTxPut(TxApplyPutContext txApplyPutContext) {
+ final RegionEntry regionEntry = txApplyPutContext.getRegionEntry();
+ synchronized (regionEntry) {
+ if (txApplyPutContext.isCreate()) {
+ RegionEntry oldRe = putEntryIfAbsent(txApplyPutContext.getEvent().getKey(), regionEntry);
+ if (oldRe != null) {
+ txApplyPutContext.setCreate(false);
+ txApplyPutContext.setRegionEntry(oldRe);
+ }
+ }
+ return doTxPutOnRegionEntryInMap(txApplyPutContext);
+ }
+ }
- boolean clearOccured = false;
- // Set RegionEntry updateInProgress
- if (owner.getIndexMaintenanceSynchronous()) {
- re.setUpdateInProgress(true);
- }
- try {
- txRemoveOldIndexEntry(putOp, re);
- if (didDestroy) {
- re.txDidDestroy(owner.cacheTimeMillis());
- }
- if (txEvent != null) {
- txEvent.addPut(putOp, owner, re, re.getKey(), newValue, aCallbackArgument);
- }
- re.setValueResultOfSearch(putOp.isNetSearch());
- try {
- processAndGenerateTXVersionTag(owner, callbackEvent, re, txEntryState);
- {
- re.setValue(owner,
- re.prepareValueForCache(owner, newValue, callbackEvent, !putOp.isCreate()));
- }
- if (putOp.isCreate()) {
- owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(re));
- } else if (putOp.isUpdate()) {
- // Rahul : fix for 41694. Negative bucket size can also be
- // an issue with normal GFE Delta and will have to be fixed
- // in a similar manner and may be this fix the the one for
- // other delta can be combined.
- {
- owner.updateSizeOnPut(key, oldSize, owner.calculateRegionEntryValueSize(re));
- }
- }
- } catch (RegionClearedException rce) {
- clearOccured = true;
- }
- {
- long lastMod = owner.cacheTimeMillis();
- EntryLogger.logTXPut(_getOwnerObject(), key, nv);
- re.updateStatsForPut(lastMod, lastMod);
- owner.txApplyPutPart2(re, re.getKey(), lastMod, false, didDestroy, clearOccured);
- }
- } finally {
- if (re != null && owner.getIndexMaintenanceSynchronous()) {
- re.setUpdateInProgress(false);
- }
- }
- if (invokeCallbacks) {
- prepareUpdateCallbacks(pendingCallbacks, owner, hasRemoteOrigin, callbackEvent);
- result = true;
- }
- if (!clearOccured) {
- lruEntryUpdate(re);
+ private boolean doTxPutOnRegionEntryInMap(TxApplyPutContext txApplyPutContext) {
+ final RegionEntry regionEntry = txApplyPutContext.getRegionEntry();
+
+ synchronized (regionEntry) {
+ if (isRegionEntryRemoved(txApplyPutContext)) {
+ return false;
+ }
+ try {
+ txApplyPutRegionEntry(txApplyPutContext);
+ return true;
+ } finally {
+ if (!txApplyPutContext.isOpCompleted() && txApplyPutContext.isCreate()) {
+ removeEntry(txApplyPutContext.getEvent().getKey(), txApplyPutContext.getRegionEntry(),
+ false);
+ }
+ }
+ }
+ }
+
+ private boolean isRegionEntryRemoved(TxApplyPutContext txApplyPutContext) {
+ final RegionEntry regionEntry = txApplyPutContext.getRegionEntry();
+ if (regionEntry.isRemovedPhase2()) {
+ _getOwner().getCachePerfStats().incRetries();
+ getEntryMap().remove(txApplyPutContext.getEvent().getKey(), regionEntry);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void txApplyPutCreateNewEntryIfNeeded(TxApplyPutContext txApplyPutContext) {
+ txApplyPutContext.setCreate(txApplyPutContext.getRegionEntry() == null);
+ if (txApplyPutContext.isCreate()) {
+ final Object key = txApplyPutContext.getEvent().getKey();
+ RegionEntry newEntry = getEntryFactory().createEntry(_getOwner(), key, Token.REMOVED_PHASE1);
+ txApplyPutContext.setRegionEntry(newEntry);
+ }
+ }
+
+ private boolean txApplyPutFindExistingEntry(TxApplyPutContext txApplyPutContext,
+ boolean onlyExisting) {
+ RegionEntry re = getEntry(txApplyPutContext.getEvent());
+ if (onlyExisting) {
+ if (re == null || re.isRemoved()) {
+ return false;
+ }
+ }
+ txApplyPutContext.setRegionEntry(re);
+ return true;
+ }
+
+ private void txApplyPutRegionEntry(TxApplyPutContext txApplyPutContext) {
+ final Object key = txApplyPutContext.getEvent().getKey();
+ final boolean didDestroy = txApplyPutContext.isDidDestroy();
+ final TXRmtEvent txEvent = txApplyPutContext.getTxEvent();
+ final Object callbackArgument = txApplyPutContext.getCallbackArgument();
+ final List<EntryEventImpl> pendingCallbacks = txApplyPutContext.getPendingCallbacks();
+ final TXEntryState txEntryState = txApplyPutContext.getTxEntryState();
+ final LocalRegion owner = _getOwner();
+ final Object newValue = txApplyPutContext.getNewValue();
+ final boolean hasRemoteOrigin = txApplyPutContext.isHasRemoteOrigin();
+ final EntryEventImpl callbackEvent = txApplyPutContext.getEvent();
+ final boolean invokeCallbacks = txApplyPutContext.isInvokeCallbacks();
+ final RegionEntry regionEntry = txApplyPutContext.getRegionEntry();
+ final boolean onlyExisting = txApplyPutContext.isOnlyExisting();
+ final boolean isCreate = txApplyPutContext.isCreate();
+
+ if (isCreate) {
+ txApplyPutContext.makeCreate();
+ } else {
+ if (onlyExisting && regionEntry.isRemoved()) {
+ return;
+ }
+ txApplyPutContext.setOpCompleted(true);
+ if (!regionEntry.isRemoved()) {
+ txApplyPutContext.makeUpdate();
+ }
+ }
+ final int oldSize = isCreate ? 0 : owner.calculateRegionEntryValueSize(regionEntry);
+ final boolean oldIsRemoved = isCreate ? true : regionEntry.isDestroyedOrRemoved();
+ if (callbackEvent != null) {
+ callbackEvent.setRegionEntry(regionEntry);
+ callbackEvent.setOldValue(isCreate ? null : regionEntry.getValueInVM(owner));
+ }
+ boolean clearOccured = false;
+ if (owner.getIndexMaintenanceSynchronous()) {
+ regionEntry.setUpdateInProgress(true);
+ }
+ try {
+ txRemoveOldIndexEntry(txApplyPutContext.getPutOp(), regionEntry);
+ if (didDestroy) {
+ regionEntry.txDidDestroy(owner.cacheTimeMillis());
+ }
+ if (txEvent != null) {
+ txEvent.addPut(txApplyPutContext.getPutOp(), owner, regionEntry, regionEntry.getKey(),
+ newValue, callbackArgument);
+ }
+ regionEntry.setValueResultOfSearch(txApplyPutContext.getPutOp().isNetSearch());
+ try {
+ processAndGenerateTXVersionTag(owner, callbackEvent, regionEntry, txEntryState);
+ {
+ boolean wasTombstone = regionEntry.isTombstone();
+ regionEntry.setValue(owner, regionEntry.prepareValueForCache(owner, newValue,
+ callbackEvent, !txApplyPutContext.getPutOp().isCreate()));
+ if (wasTombstone) {
+ owner.unscheduleTombstone(regionEntry);
}
}
+ if (txApplyPutContext.getPutOp().isCreate()) {
+ owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(regionEntry));
+ } else if (txApplyPutContext.getPutOp().isUpdate()) {
+ owner.updateSizeOnPut(key, oldSize, owner.calculateRegionEntryValueSize(regionEntry));
+ }
+ } catch (RegionClearedException rce) {
+ clearOccured = true;
}
- if (didDestroy && !opCompleted) {
- owner.txApplyInvalidatePart2(re, re.getKey(), true, false /* clear */);
+ {
+ long lastMod = owner.cacheTimeMillis();
+ EntryLogger.logTXPut(_getOwnerObject(), key, newValue);
+ regionEntry.updateStatsForPut(lastMod, lastMod);
+ owner.txApplyPutPart2(regionEntry, regionEntry.getKey(), lastMod, isCreate, didDestroy,
+ clearOccured);
+ }
+ } finally {
+ if (regionEntry != null && owner.getIndexMaintenanceSynchronous()) {
+ regionEntry.setUpdateInProgress(false);
}
}
- if (invokeCallbacks && !opCompleted) {
- callbackEvent.makeUpdate();
- owner.invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, callbackEvent, false);
+ if (isCreate) {
+ txApplyPutContext.setOpCompleted(true);
}
- if (owner.getConcurrencyChecksEnabled() && txEntryState != null && callbackEvent != null) {
- txEntryState.setVersionTag(callbackEvent.getVersionTag());
+ if (invokeCallbacks) {
+ if (isCreate) {
+ callbackEvent.makeCreate();
+ callbackEvent.setOldValue(null);
+ } else {
+ if (!oldIsRemoved) {
+ callbackEvent.makeUpdate();
+ }
+ }
+ switchEventOwnerAndOriginRemote(callbackEvent, hasRemoteOrigin);
+ pendingCallbacks.add(callbackEvent);
+ txApplyPutContext.setCallbackEventInPending(true);
+ }
+ if (!clearOccured) {
+ if (isCreate) {
+ lruEntryCreate(regionEntry);
+ incEntryCount(1);
+ } else {
+ lruEntryUpdate(regionEntry);
+ }
}
- return result;
}
- private void prepareUpdateCallbacks(List<EntryEventImpl> pendingCallbacks, LocalRegion owner,
- boolean hasRemoteOrigin, EntryEventImpl callbackEvent) {
- callbackEvent.makeUpdate();
- switchEventOwnerAndOriginRemote(callbackEvent, hasRemoteOrigin);
- pendingCallbacks.add(callbackEvent);
+ private static class TxApplyPutContext {
+ private boolean opCompleted;
+ private boolean callbackEventInPending;
+ private Operation putOp;
+ /**
+ * true if the regionEntry is one that we created;
+ * false if the regionEntry was an existing one.
+ */
+ private boolean create;
+ private RegionEntry regionEntry;
+ private final EntryEventImpl event;
+ private final boolean onlyExisting;
+ private final Object newValue;
+ private final boolean didDestroy;
+ private final TXRmtEvent txEvent;
+ private final Object callbackArgument;
+ private final List<EntryEventImpl> pendingCallbacks;
+ private final TXEntryState txEntryState;
+ private final boolean hasRemoteOrigin;
+ private final boolean invokeCallbacks;
+
+ public TxApplyPutContext(boolean opCompleted, boolean callbackEventInPending, Operation putOp,
+ EntryEventImpl event, boolean onlyExisting, Object newValue, boolean didDestroy,
+ TXRmtEvent txEvent, Object aCallbackArgument, List<EntryEventImpl> pendingCallbacks,
+ TXEntryState txEntryState, boolean hasRemoteOrigin, boolean invokeCallbacks) {
+ this.opCompleted = opCompleted;
+ this.callbackEventInPending = callbackEventInPending;
+ this.putOp = putOp;
+ this.regionEntry = null;
+ this.event = event;
+ this.onlyExisting = onlyExisting;
+ this.newValue = newValue;
+ this.didDestroy = didDestroy;
+ this.txEvent = txEvent;
+ this.callbackArgument = aCallbackArgument;
+ this.pendingCallbacks = pendingCallbacks;
+ this.txEntryState = txEntryState;
+ this.hasRemoteOrigin = hasRemoteOrigin;
+ this.invokeCallbacks = invokeCallbacks;
+ }
+
+ public boolean isOpCompleted() {
+ return opCompleted;
+ }
+
+ public void setOpCompleted(boolean opCompleted) {
+ this.opCompleted = opCompleted;
+ }
+
+ public boolean isCallbackEventInPending() {
+ return callbackEventInPending;
+ }
+
+ public void setCallbackEventInPending(boolean callbackEventInPending) {
+ this.callbackEventInPending = callbackEventInPending;
+ }
+
+ public Operation getPutOp() {
+ return putOp;
+ }
+
+ public boolean isCreate() {
+ return create;
+ }
+
+ public void setCreate(boolean create) {
+ this.create = create;
+ }
+
+ public RegionEntry getRegionEntry() {
+ return regionEntry;
+ }
+
+ public void setRegionEntry(RegionEntry regionEntry) {
+ this.regionEntry = regionEntry;
+ }
+
+ public EntryEventImpl getEvent() {
+ return event;
+ }
+
+ public boolean isOnlyExisting() {
+ return onlyExisting;
+ }
+
+ public Object getNewValue() {
+ return newValue;
+ }
+
+ public boolean isDidDestroy() {
+ return didDestroy;
+ }
+
+ public TXRmtEvent getTxEvent() {
+ return txEvent;
+ }
+
+ public Object getCallbackArgument() {
+ return callbackArgument;
+ }
+
+ public List<EntryEventImpl> getPendingCallbacks() {
+ return pendingCallbacks;
+ }
+
+ public TXEntryState getTxEntryState() {
+ return txEntryState;
+ }
+
+ public boolean isHasRemoteOrigin() {
+ return hasRemoteOrigin;
+ }
+
+ public boolean isInvokeCallbacks() {
+ return invokeCallbacks;
+ }
+
+ public void makeCreate() {
+ putOp = putOp.getCorrespondingCreateOp();
+ }
+
+ public void makeUpdate() {
+ putOp = putOp.getCorrespondingUpdateOp();
+ }
}
private void txHandleWANEvent(final LocalRegion owner, EntryEventImpl callbackEvent,
@@ -2453,7 +2513,7 @@ public abstract class AbstractRegionMap
}
processVersionTag(re, callbackEvent);
} catch (ConcurrentCacheModificationException ignore) {
- // ignore this execption, however invoke callbacks for this operation
+ // ignore this exception, however invoke callbacks for this operation
}
// For distributed transactions, stuff the next region version generated
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 88f6359..420948c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -5053,6 +5053,16 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
/**
+ * Called by AbstractRegionMap txApplyPut when it was told a destroy was also done
+ * by the transaction.
+ */
+ void txApplyPutHandleDidDestroy(Object key) {
+ if (this.entryUserAttributes != null) {
+ this.entryUserAttributes.remove(key);
+ }
+ }
+
+ /**
* Allows null as new value to accomodate create with a null value. Assumes all key, value, and
* callback validations have been performed.
*
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPut.java b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPut.java
index 6bd1839..f089db9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPut.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPut.java
@@ -320,7 +320,7 @@ public class RegionMapPut {
private boolean addRegionEntryToMapAndDoPut() {
synchronized (getRegionEntry()) {
putIfAbsentNewEntry();
- return doPutOnRegionEntry();
+ return doPutOnRegionEntryInMap();
}
}
@@ -335,9 +335,9 @@ public class RegionMapPut {
}
/**
- * @return false if caller should retry
+ * @return false if caller should retry because entry is no longer in the map
*/
- private boolean doPutOnRegionEntry() {
+ private boolean doPutOnRegionEntryInMap() {
final RegionEntry re = getRegionEntry();
synchronized (re) {
@@ -348,7 +348,7 @@ public class RegionMapPut {
setOldValueForDelta();
try {
setOldValueInEvent();
- doCreateOrUpdate();
+ doPutIfPreconditionsPass();
return true;
} finally {
OffHeapHelper.release(getOldValueForDelta());
@@ -384,36 +384,37 @@ public class RegionMapPut {
}
/**
- * @return false if an early out check indicated that
+ * @return false if precondition indicates that
* the put should not be done.
*/
- private boolean shouldPutContinue() {
+ private boolean checkPreconditions() {
if (continueUpdate() && continueOverwriteDestroyed() && satisfiesExpectedOldValue()) {
return true;
}
return false;
}
- private void doCreateOrUpdate() {
- if (!shouldPutContinue()) {
+ private void doPutIfPreconditionsPass() {
+ if (!checkPreconditions()) {
return;
}
invokeCacheWriter();
+ runWithIndexUpdatingInProgress(this::doPutAndDeliverEvent);
+ }
- runWithIndexUpdatingInProgress(() -> {
- final EntryEventImpl event = getEvent();
- createOrUpdateEntry();
- if (isUninitialized()) {
- event.inhibitCacheListenerNotification(true);
- }
- updateLru();
+ private void doPutAndDeliverEvent() {
+ final EntryEventImpl event = getEvent();
+ createOrUpdateEntry();
+ if (isUninitialized()) {
+ event.inhibitCacheListenerNotification(true);
+ }
+ updateLru();
- final RegionEntry re = getRegionEntry();
- long lastModTime = getOwner().basicPutPart2(event, re, !isUninitialized(),
- getLastModifiedTime(), getClearOccured());
- setLastModifiedTime(lastModTime);
- setCompleted(true);
- });
+ final RegionEntry re = getRegionEntry();
+ long lastModTime = getOwner().basicPutPart2(event, re, !isUninitialized(),
+ getLastModifiedTime(), getClearOccured());
+ setLastModifiedTime(lastModTime);
+ setCompleted(true);
}
private void runWithIndexUpdatingInProgress(Runnable r) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
index 310e0a3..c01b4b4 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
@@ -823,6 +823,9 @@ public class AbstractRegionMapTest {
protected TxTestableAbstractRegionMap() {
super(null);
LocalRegion owner = mock(LocalRegion.class);
+ KeyInfo keyInfo = mock(KeyInfo.class);
+ when(keyInfo.getKey()).thenReturn(KEY);
+ when(owner.getKeyInfo(eq(KEY), any(), any())).thenReturn(keyInfo);
when(owner.getMyId()).thenReturn(mock(InternalDistributedMember.class));
when(owner.getCache()).thenReturn(mock(InternalCache.class));
when(owner.isAllEvents()).thenReturn(true);
--
To stop receiving notification emails like this one, please contact
dschneider@apache.org.