You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/04/17 19:28:02 UTC
[geode] branch develop updated: Revert "GEODE-4957: fix race in
concurrent create on region (#1750)" (#1812)
This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 33f8f5f Revert "GEODE-4957: fix race in concurrent create on region (#1750)" (#1812)
33f8f5f is described below
commit 33f8f5fadbe5655a31f2607dd121fc97e1a373f3
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Tue Apr 17 12:27:55 2018 -0700
Revert "GEODE-4957: fix race in concurrent create on region (#1750)" (#1812)
This reverts commit 65b52f28f4e11b9f5ec82b0e457fc0292132e5da.
---
.../geode/internal/cache/AbstractRegionMap.java | 576 ++++++++++-----------
.../apache/geode/internal/cache/LocalRegion.java | 5 +
.../internal/cache/map/RegionMapPutContext.java | 194 -------
.../internal/cache/AbstractRegionMapTest.java | 229 --------
4 files changed, 278 insertions(+), 726 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 0b2e68c..71ccf51 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -29,6 +29,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.GemFireIOException;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.EntryNotFoundException;
@@ -58,7 +59,6 @@ import org.apache.geode.internal.cache.ha.HARegionQueue;
import org.apache.geode.internal.cache.map.CacheModificationLock;
import org.apache.geode.internal.cache.map.FocusedRegionMap;
import org.apache.geode.internal.cache.map.RegionMapDestroy;
-import org.apache.geode.internal.cache.map.RegionMapPutContext;
import org.apache.geode.internal.cache.persistence.DiskRegionView;
import org.apache.geode.internal.cache.region.entry.RegionEntryFactoryBuilder;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
@@ -72,6 +72,7 @@ import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionStamp;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.concurrent.MapCallbackAdapter;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
@@ -2176,305 +2177,279 @@ public abstract class AbstractRegionMap
}
}
+ private RegionEntry getOrCreateRegionEntry(Object ownerRegion, EntryEventImpl event, Object value,
+ MapCallbackAdapter<Object, Object, Object, Object> valueCreator, boolean onlyExisting,
+ boolean returnTombstone) {
+ Object key = event.getKey();
+ RegionEntry retVal = null;
+ retVal = getEntry(event);
+ if (onlyExisting) {
+ if (!returnTombstone && (retVal != null && retVal.isTombstone())) {
+ return null;
+ }
+ return retVal;
+ }
+ if (retVal != null) {
+ return retVal;
+ }
+ if (valueCreator != null) {
+ value = valueCreator.newValue(key, ownerRegion, value, null);
+ }
+ retVal = getEntryFactory().createEntry((RegionEntryContext) ownerRegion, key, value);
+ RegionEntry oldRe = putEntryIfAbsent(key, retVal);
+ if (oldRe != null) {
+ if (retVal instanceof OffHeapRegionEntry) {
+ ((OffHeapRegionEntry) retVal).release();
+ }
+ return oldRe;
+ }
+ return retVal;
+ }
+
/*
* returns null if the operation fails
*/
@Override
- public RegionEntry basicPut(EntryEventImpl event, final long unused, final boolean ifNew,
- final boolean ifOld, final Object expectedOldValue, // only non-null if ifOld
- final boolean requireOldValue, final boolean overwriteDestroyed)
+ public RegionEntry basicPut(EntryEventImpl event, final long lastModified, final boolean ifNew,
+ final boolean ifOld, Object expectedOldValue, // only non-null if ifOld
+ boolean requireOldValue, final boolean overwriteDestroyed)
throws CacheWriterException, TimeoutException {
final LocalRegion owner = _getOwner();
- final RegionMapPutContext putInfo = new RegionMapPutContext(owner, event, ifNew, ifOld,
- overwriteDestroyed, requireOldValue, expectedOldValue);
entryEventSerialization.serializeNewValueIfNeeded(owner, event);
- runWhileLockedForCacheModification(event, () -> doBasicPut(putInfo));
- if (putInfo.isCompleted()) {
- return putInfo.getRegionEntry();
+
+ boolean clearOccured = false;
+ if (owner == null) {
+ // "fix" for bug 32440
+ Assert.assertTrue(false, "The owner for RegionMap " + this + " is null for event " + event);
+ }
+ if (logger.isTraceEnabled(LogMarker.LRU_TOMBSTONE_COUNT_VERBOSE)
+ && !(owner instanceof HARegion)) {
+ logger.trace(LogMarker.LRU_TOMBSTONE_COUNT_VERBOSE,
+ "ARM.basicPut called for {} expectedOldValue={} requireOldValue={} ifNew={} ifOld={} initialized={} overwriteDestroyed={}",
+ event, expectedOldValue, requireOldValue, ifNew, ifOld, owner.isInitialized(),
+ overwriteDestroyed);
+ }
+
+ RegionEntry result = null;
+ long lastModifiedTime = 0;
+ // copy into local var to prevent race condition with setter
+ final CacheWriter cacheWriter = owner.basicGetWriter();
+ final boolean cacheWrite =
+ !event.isOriginRemote() && !event.isNetSearch() && event.isGenerateCallbacks()
+ && (cacheWriter != null || owner.hasServerProxy() || owner.scope.isDistributed());
+ /*
+ * For performance reason, we try to minimize object creation and do as much work as we can
+ * outside of synchronization, especially getting distribution advice.
+ */
+ final Set netWriteRecipients;
+ if (cacheWrite) {
+ if (cacheWriter == null && owner.scope.isDistributed()) {
+ netWriteRecipients =
+ ((DistributedRegion) owner).getCacheDistributionAdvisor().adviseNetWrite();
+ } else {
+ netWriteRecipients = null;
+ }
} else {
- return null;
+ netWriteRecipients = null;
}
- }
- private void runWhileLockedForCacheModification(EntryEventImpl event, Runnable r) {
- final LocalRegion owner = _getOwner();
+ // mbid: this has been added to maintain consistency between the disk region
+ // and the region map after clear() has been called. This will set the
+ // reference of the diskSegmentRegion as a ThreadLocal so that if the diskRegionSegment
+ // is later changed by another thread, we can do the necessary.
+ boolean uninitialized = !owner.isInitialized();
+ boolean retrieveOldValueForDelta =
+ event.getDeltaBytes() != null && event.getRawNewValue() == null;
+ IndexManager oqlIndexManager = null;
lockForCacheModification(owner, event);
try {
- r.run();
- } finally {
- releaseCacheModificationLock(owner, event);
- }
- }
-
- private void doBasicPut(final RegionMapPutContext putInfo) {
- try {
- doWithIndexInUpdateMode(() -> {
- do {
- putInfo.setRegionEntry(null);
- if (!findExistingEntry(putInfo)) {
- return;
- }
- createNewEntryIfNeeded(putInfo);
- } while (!addRegionEntryToMapAndDoPut(putInfo));
- });
- } catch (DiskAccessException dae) {
- _getOwner().handleDiskAccessException(dae);
- throw dae;
- } finally {
- doAfterPut(putInfo);
- }
- }
-
- private void doWithIndexInUpdateMode(Runnable r) {
- final IndexManager oqlIndexManager = getInitializedIndexManager();
- if (oqlIndexManager != null) {
try {
- r.run();
- } finally {
- oqlIndexManager.countDownIndexUpdaters();
- }
- } else {
- r.run();
- }
- }
+ // Fix for Bug #44431. We do NOT want to update the region and wait
+ // later for index INIT as region.clear() can cause inconsistency if
+ // happened in parallel as it also does index INIT.
+ oqlIndexManager = owner.getIndexManager();
+ if (oqlIndexManager != null) {
+ oqlIndexManager.waitForIndexInit();
+ }
- /**
- * Stores the found entry in putInfo.getRegionEntry.
- *
- * @return false if an existing entry was not found and this put requires
- * an existing one; otherwise returns true.
- */
- private boolean findExistingEntry(final RegionMapPutContext putInfo) {
- final Object key = putInfo.getEvent().getKey();
- RegionEntry re = getEntry(key);
- if (putInfo.isOnlyExisting()) {
- if (re == null || re.isTombstone()) {
- return false;
- }
- }
- putInfo.setRegionEntry(re);
- return true;
- }
+ // fix for bug #42169, replace must go to server if entry not on client
+ boolean replaceOnClient =
+ event.getOperation() == Operation.REPLACE && owner.getServerProxy() != null;
+ // Rather than having two different blocks for synchronizing oldRe
+ // and newRe, have only one block and synchronize re
+ RegionEntry re = null;
+ boolean eventRecorded = false;
+ boolean onlyExisting = ifOld && !replaceOnClient;
- /**
- * Stores the created entry in putInfo.getRegionEntry.
- */
- private void createNewEntryIfNeeded(final RegionMapPutContext putInfo) {
- putInfo.setCreate(putInfo.getRegionEntry() == null);
- if (putInfo.isCreate()) {
- final Object key = putInfo.getEvent().getKey();
- RegionEntry newEntry = getEntryFactory().createEntry(_getOwner(), key, Token.REMOVED_PHASE1);
- putInfo.setRegionEntry(newEntry);
- }
- }
+ re = getOrCreateRegionEntry(owner, event,
- /**
- * @return false if caller should retry
- */
- protected boolean addRegionEntryToMapAndDoPut(final RegionMapPutContext putInfo) {
- synchronized (putInfo.getRegionEntry()) {
- putIfAbsentNewEntry(putInfo);
- return doPutOnRegionEntry(putInfo);
- }
- }
+ Token.REMOVED_PHASE1, null, onlyExisting, false);
+ if (re == null) {
+ return null;
+ }
+ while (true) {
+ synchronized (re) {
+ // if the re goes into removed2 state, it will be removed
+ // from the map. otherwise we can append an event to it
+ // and change its state
+ if (re.isRemovedPhase2()) {
+ _getOwner().getCachePerfStats().incRetries();
+ getEntryMap().remove(event.getKey(), re);
+ re = getOrCreateRegionEntry(owner, event, Token.REMOVED_PHASE1, null, onlyExisting,
+ false);
+ if (re == null) {
+ // this will happen when onlyExisting is true
+ return null;
+ }
+ continue;
+ } else {
+ @Released
+ Object oldValueForDelta = null;
+ if (retrieveOldValueForDelta) {
+ // defer the lruUpdateCallback to prevent a deadlock (see bug 51121).
+ final boolean disabled = disableLruUpdateCallback();
+ try {
+ // Old value is faulted in from disk if not found in memory.
+ oldValueForDelta = re.getValue(owner); // OFFHEAP: if we are synced on oldRe no
+ // issue since we can use ARE's ref
+ } finally {
+ if (disabled) {
+ enableLruUpdateCallback();
+ }
+ }
+ }
- protected void putIfAbsentNewEntry(final RegionMapPutContext putInfo) {
- if (putInfo.isCreate()) {
- RegionEntry oldRe = putEntryIfAbsent(putInfo.getEvent().getKey(), putInfo.getRegionEntry());
- if (oldRe != null) {
- putInfo.setCreate(false);
- putInfo.setRegionEntry(oldRe);
- }
- }
- }
+ try {
- /**
- * @return false if caller should retry
- */
- private boolean doPutOnRegionEntry(final RegionMapPutContext putInfo) {
- final RegionEntry re = putInfo.getRegionEntry();
+ event.setRegionEntry(re);
+ // set old value in event
+ setOldValueInEvent(event, re, cacheWrite, requireOldValue);
+ if (!continueUpdate(re, event, ifOld, replaceOnClient)) {
+ return null;
+ }
+ // overwrite destroyed?
+ if (!continueOverwriteDestroyed(re, event, overwriteDestroyed, ifNew)) {
+ return null;
+ }
+ // check expectedOldValue
+ if (!satisfiesExpectedOldValue(event, re, expectedOldValue, replaceOnClient)) {
+ return null;
+ }
+ // invoke cacheWriter
+ invokeCacheWriter(re, event, cacheWrite, cacheWriter, netWriteRecipients,
+ requireOldValue, expectedOldValue, replaceOnClient);
- synchronized (re) {
- if (isRegionEntryRemoved(putInfo)) {
- return false;
- }
+ // notify index of an update
+ notifyIndex(re, true);
+ try {
+ try {
+ if ((cacheWrite && event.getOperation().isUpdate()) // if there is a
+ // cacheWriter, type of
+ // event has already been
+ // set
+ || !re.isRemoved() || replaceOnClient) {
+ // update
+ updateEntry(event, requireOldValue, oldValueForDelta, re);
+ } else {
+ // create
+ createEntry(event, owner, re);
+ }
+ owner.recordEvent(event);
+ eventRecorded = true;
+ } catch (RegionClearedException rce) {
+ clearOccured = true;
+ owner.recordEvent(event);
+ } catch (ConcurrentCacheModificationException ccme) {
+ VersionTag tag = event.getVersionTag();
+ if (tag != null && tag.isTimeStampUpdated()) {
+ // Notify gateways of new time-stamp.
+ owner.notifyTimestampsToGateways(event);
+ }
+ throw ccme;
+ }
+ if (uninitialized) {
+ event.inhibitCacheListenerNotification(true);
+ }
+ updateLru(clearOccured, re, event);
- setOldValueForDelta(putInfo);
- try {
- setOldValueInEvent(putInfo);
- doCreateOrUpdate(putInfo);
- return true;
+ lastModifiedTime = owner.basicPutPart2(event, re, !uninitialized,
+ lastModifiedTime, clearOccured);
+ } finally {
+ notifyIndex(re, false);
+ }
+ result = re;
+ break;
+ } finally {
+ OffHeapHelper.release(oldValueForDelta);
+ if (re != null && !onlyExisting && !isOpComplete(re, event)) {
+ owner.cleanUpOnIncompleteOp(event, re);
+ } else if (re != null && owner.isUsedForPartitionedRegionBucket()) {
+ BucketRegion br = (BucketRegion) owner;
+ CachePerfStats stats = br.getPartitionedRegion().getCachePerfStats();
+ }
+ } // try
+ }
+ } // sync re
+ } // end while
+ } catch (DiskAccessException dae) {
+ // Asif:Feel that it is safe to destroy the region here as there appears
+ // to be no chance of deadlock during region destruction
+ result = null;
+ this._getOwner().handleDiskAccessException(dae);
+ throw dae;
} finally {
- OffHeapHelper.release(putInfo.getOldValueForDelta());
- putInfo.setOldValueForDelta(null);
- if (!putInfo.isCompleted() && putInfo.isCreate()) {
- // Region entry remove needs to be done while still synced on re.
- removeEntry(putInfo.getEvent().getKey(), re, false);
+ if (oqlIndexManager != null) {
+ oqlIndexManager.countDownIndexUpdaters();
}
- }
- }
- }
-
- private IndexManager getInitializedIndexManager() {
- IndexManager oqlIndexManager;
- // Fix for Bug #44431. We do NOT want to update the region and wait
- // later for index INIT as region.clear() can cause inconsistency if
- // happened in parallel as it also does index INIT.
- oqlIndexManager = _getOwner().getIndexManager();
- if (oqlIndexManager != null) {
- oqlIndexManager.waitForIndexInit();
- }
- return oqlIndexManager;
- }
-
- private void doAfterPut(RegionMapPutContext putInfo) {
- if (putInfo.isCompleted()) {
- final LocalRegion owner = _getOwner();
- try {
- final boolean invokeListeners = putInfo.getEvent().basicGetNewValue() != Token.TOMBSTONE;
- owner.basicPutPart3(putInfo.getEvent(), putInfo.getRegionEntry(),
- !putInfo.isUninitialized(), putInfo.getLastModifiedTime(), invokeListeners,
- putInfo.isIfNew(), putInfo.isIfOld(), putInfo.getExpectedOldValue(),
- putInfo.isRequireOldValue());
- } finally {
- if (!putInfo.getClearOccured()) {
+ if (result != null) {
try {
- lruUpdateCallback();
- } catch (DiskAccessException dae) {
- owner.handleDiskAccessException(dae);
- throw dae;
- }
+ // Note we do distribution after releasing all sync to avoid deadlock
+ final boolean invokeListeners = event.basicGetNewValue() != Token.TOMBSTONE;
+ owner.basicPutPart3(event, result, !uninitialized, lastModifiedTime, invokeListeners,
+ ifNew, ifOld, expectedOldValue, requireOldValue);
+ } finally {
+ // bug 32589, post update may throw an exception if exception occurs
+ // for any recipients
+ if (!clearOccured) {
+ try {
+ lruUpdateCallback();
+ } catch (DiskAccessException dae) {
+ // Asif:Feel that it is safe to destroy the region here as there appears
+ // to be no chance of deadlock during region destruction
+ result = null;
+ this._getOwner().handleDiskAccessException(dae);
+ throw dae;
+ }
+ }
+ } // finally
+ } else {
+ resetThreadLocals();
}
}
- } else {
- resetThreadLocals();
- }
- }
-
- /**
- * @return false if an early out check indicated that
- * the put should not be done.
- */
- private boolean shouldPutContinue(final RegionMapPutContext putInfo) {
- if (continueUpdate(putInfo) && continueOverwriteDestroyed(putInfo)
- && satisfiesExpectedOldValue(putInfo)) {
- return true;
- }
- return false;
- }
-
- private void doCreateOrUpdate(final RegionMapPutContext putInfo) {
- if (!shouldPutContinue(putInfo)) {
- return;
- }
- invokeCacheWriter(putInfo);
-
- runWithIndexUpdatingInProgress(putInfo, () -> {
- final EntryEventImpl event = putInfo.getEvent();
- createOrUpdateEntry(putInfo);
- if (putInfo.isUninitialized()) {
- event.inhibitCacheListenerNotification(true);
- }
- updateLru(putInfo);
-
- final RegionEntry re = putInfo.getRegionEntry();
- long lastModTime = _getOwner().basicPutPart2(event, re, !putInfo.isUninitialized(),
- putInfo.getLastModifiedTime(), putInfo.getClearOccured());
- putInfo.setLastModifiedTime(lastModTime);
- putInfo.setCompleted(true);
- });
- }
-
- private void runWithIndexUpdatingInProgress(RegionMapPutContext putInfo, Runnable r) {
- final RegionEntry re = putInfo.getRegionEntry();
- notifyIndex(re, true);
- try {
- r.run();
- } finally {
- notifyIndex(re, false);
- }
- }
-
- private void createOrUpdateEntry(final RegionMapPutContext putInfo) {
- final EntryEventImpl event = putInfo.getEvent();
- final LocalRegion owner = _getOwner();
- try {
- if (isUpdate(putInfo)) {
- updateEntry(putInfo);
- } else {
- createEntry(putInfo);
- }
- owner.recordEvent(event);
- } catch (RegionClearedException rce) {
- putInfo.setClearOccured(true);
- owner.recordEvent(event);
- } catch (ConcurrentCacheModificationException ccme) {
- VersionTag tag = event.getVersionTag();
- if (tag != null && tag.isTimeStampUpdated()) {
- owner.notifyTimestampsToGateways(event);
- }
- throw ccme;
- }
- }
-
- private boolean isUpdate(final RegionMapPutContext putInfo) {
- if (putInfo.isCacheWrite() && putInfo.getEvent().getOperation().isUpdate()) {
- // if there is a cacheWriter, type of event has already been set
- return true;
- }
- if (putInfo.isReplaceOnClient()) {
- return true;
- }
- if (!putInfo.getRegionEntry().isRemoved()) {
- return true;
- }
- return false;
- }
-
- private void setOldValueForDelta(final RegionMapPutContext putInfo) {
- if (putInfo.isRetrieveOldValueForDelta()) {
- runWhileEvictionDisabled(() -> {
- // Old value is faulted in from disk if not found in memory.
- putInfo.setOldValueForDelta(putInfo.getRegionEntry().getValue(_getOwner()));
- // OFFHEAP: if we are synced on region entry no issue since we can use ARE's ref
- });
- }
- }
-
- private void runWhileEvictionDisabled(Runnable r) {
- final boolean disabled = disableLruUpdateCallback();
- try {
- r.run();
} finally {
- if (disabled) {
- enableLruUpdateCallback();
- }
+ releaseCacheModificationLock(owner, event);
}
+ return result;
}
/**
- * If the re goes into removed2 state, it will be removed from the map.
- *
- * @return true if re was remove phase 2
+ * If the value in the VM is still REMOVED_PHASE1 Token, then the operation was not completed (due
+ * to cacheWriter exception, concurrentMap operation) etc.
*/
- private boolean isRegionEntryRemoved(final RegionMapPutContext putInfo) {
- final RegionEntry re = putInfo.getRegionEntry();
- if (re.isRemovedPhase2()) {
- _getOwner().getCachePerfStats().incRetries();
- getEntryMap().remove(putInfo.getEvent().getKey(), re);
- return true;
- } else {
+ private boolean isOpComplete(RegionEntry re, EntryEventImpl event) {
+ if (re.getValueAsToken() == Token.REMOVED_PHASE1) {
return false;
}
+ return true;
}
- private boolean satisfiesExpectedOldValue(final RegionMapPutContext putInfo) {
+ private boolean satisfiesExpectedOldValue(EntryEventImpl event, RegionEntry re,
+ Object expectedOldValue, boolean replaceOnClient) {
// replace is propagated to server, so no need to check
// satisfiesOldValue on client
- final EntryEventImpl event = putInfo.getEvent();
- if (putInfo.getExpectedOldValue() != null && !putInfo.isReplaceOnClient()) {
+ if (expectedOldValue != null && !replaceOnClient) {
assert event.getOperation().guaranteesOldValue();
// We already called setOldValueInEvent so the event will have the old value.
@Unretained
@@ -2482,8 +2457,7 @@ public abstract class AbstractRegionMap
// Note that v will be null instead of INVALID because setOldValue
// converts INVALID to null.
// But checkExpectedOldValue handle this and says INVALID equals null.
- if (!AbstractRegionEntry.checkExpectedOldValue(putInfo.getExpectedOldValue(), v,
- event.getRegion())) {
+ if (!AbstractRegionEntry.checkExpectedOldValue(expectedOldValue, v, event.getRegion())) {
return false;
}
}
@@ -2491,12 +2465,10 @@ public abstract class AbstractRegionMap
}
// PRECONDITION: caller must be synced on re
- private void setOldValueInEvent(final RegionMapPutContext putInfo) {
- final EntryEventImpl event = putInfo.getEvent();
- final RegionEntry re = putInfo.getRegionEntry();
- event.setRegionEntry(re);
- boolean needToSetOldValue = putInfo.isCacheWrite() || putInfo.isRequireOldValue()
- || event.getOperation().guaranteesOldValue();
+ private void setOldValueInEvent(EntryEventImpl event, RegionEntry re, boolean cacheWrite,
+ boolean requireOldValue) {
+ boolean needToSetOldValue =
+ cacheWrite || requireOldValue || event.getOperation().guaranteesOldValue();
if (needToSetOldValue) {
if (event.getOperation().guaranteesOldValue()) {
// In these cases we want to even get the old value from disk if it is not in memory
@@ -2542,10 +2514,8 @@ public abstract class AbstractRegionMap
}
}
- private void createEntry(final RegionMapPutContext putInfo) throws RegionClearedException {
- final LocalRegion owner = _getOwner();
- final EntryEventImpl event = putInfo.getEvent();
- final RegionEntry re = putInfo.getRegionEntry();
+ protected void createEntry(EntryEventImpl event, final LocalRegion owner, RegionEntry re)
+ throws RegionClearedException {
final boolean wasTombstone = re.isTombstone();
processVersionTag(re, event);
event.putNewEntry(owner, re);
@@ -2555,24 +2525,22 @@ public abstract class AbstractRegionMap
}
}
- private void updateEntry(final RegionMapPutContext putInfo) throws RegionClearedException {
- final EntryEventImpl event = putInfo.getEvent();
- final RegionEntry re = putInfo.getRegionEntry();
- final boolean wasTombstone = re.isTombstone();
+ protected void updateEntry(EntryEventImpl event, boolean requireOldValue, Object oldValueForDelta,
+ RegionEntry re) throws RegionClearedException {
final int oldSize = event.getRegion().calculateRegionEntryValueSize(re);
+ final boolean wasTombstone = re.isTombstone();
processVersionTag(re, event);
- event.putExistingEntry(event.getRegion(), re, putInfo.isRequireOldValue(),
- putInfo.getOldValueForDelta());
+ event.putExistingEntry(event.getRegion(), re, requireOldValue, oldValueForDelta);
EntryLogger.logPut(event);
updateSize(event, oldSize, true/* isUpdate */, wasTombstone);
}
- private void updateLru(final RegionMapPutContext putInfo) {
- if (!putInfo.getClearOccured()) {
- if (putInfo.getEvent().getOperation().isCreate()) {
- lruEntryCreate(putInfo.getRegionEntry());
+ private void updateLru(boolean clearOccured, RegionEntry re, EntryEventImpl event) {
+ if (!clearOccured) {
+ if (event.getOperation().isCreate()) {
+ lruEntryCreate(re);
} else {
- lruEntryUpdate(putInfo.getRegionEntry());
+ lruEntryUpdate(re);
}
}
}
@@ -2595,50 +2563,52 @@ public abstract class AbstractRegionMap
}
}
- private void invokeCacheWriter(RegionMapPutContext putInfo) {
- final EntryEventImpl event = putInfo.getEvent();
+ private void invokeCacheWriter(RegionEntry re, EntryEventImpl event, boolean cacheWrite,
+ CacheWriter cacheWriter, Set netWriteRecipients, boolean requireOldValue,
+ Object expectedOldValue, boolean replaceOnClient) {
// invoke listeners only if region is initialized
- if (_getOwner().isInitialized() && putInfo.isCacheWrite()) {
+ if (_getOwner().isInitialized() && cacheWrite) {
// event.setOldValue already called in setOldValueInEvent
// bug #42638 for replaceOnClient, do not make the event create
// or update since replace must propagate to server
- if (!putInfo.isReplaceOnClient()) {
- if (putInfo.getRegionEntry().isDestroyedOrRemoved()) {
+ if (!replaceOnClient) {
+ if (re.isDestroyedOrRemoved()) {
event.makeCreate();
} else {
event.makeUpdate();
}
}
- _getOwner().cacheWriteBeforePut(event, putInfo.getNetWriteRecipients(),
- putInfo.getCacheWriter(), putInfo.isRequireOldValue(), putInfo.getExpectedOldValue());
+ _getOwner().cacheWriteBeforePut(event, netWriteRecipients, cacheWriter, requireOldValue,
+ expectedOldValue);
}
- if (!_getOwner().isInitialized() && !putInfo.isCacheWrite()) {
- // block setting of old value in putNewValueNoSync, don't need it
+ if (!_getOwner().isInitialized() && !cacheWrite) {
+ // block setting of old value in putNewValueNoSync, don't
+ // need it
event.oldValueNotAvailable();
}
}
- private boolean continueOverwriteDestroyed(final RegionMapPutContext putInfo) {
- Token oldValueInVM = putInfo.getRegionEntry().getValueAsToken();
+ private boolean continueOverwriteDestroyed(RegionEntry re, EntryEventImpl event,
+ boolean overwriteDestroyed, boolean ifNew) {
+ Token oldValueInVM = re.getValueAsToken();
// if region is under GII, check if token is destroyed
- if (!putInfo.isOverwriteDestroyed()) {
+ if (!overwriteDestroyed) {
if (!_getOwner().isInitialized()
&& (oldValueInVM == Token.DESTROYED || oldValueInVM == Token.TOMBSTONE)) {
- putInfo.getEvent().setOldValueDestroyedToken();
+ event.setOldValueDestroyedToken();
return false;
}
}
- if (putInfo.isIfNew() && !Token.isRemoved(oldValueInVM)) {
+ if (ifNew && !Token.isRemoved(oldValueInVM)) {
return false;
}
return true;
}
- private boolean continueUpdate(final RegionMapPutContext putInfo) {
- if (putInfo.isIfOld()) {
- final EntryEventImpl event = putInfo.getEvent();
- final RegionEntry re = putInfo.getRegionEntry();
+ private boolean continueUpdate(RegionEntry re, EntryEventImpl event, boolean ifOld,
+ boolean replaceOnClient) {
+ if (ifOld) {
// only update, so just do tombstone maintainence and exit
if (re.isTombstone() && event.getVersionTag() != null) {
// refresh the tombstone so it doesn't time out too soon
@@ -2652,7 +2622,7 @@ public abstract class AbstractRegionMap
_getOwner().rescheduleTombstone(re, re.getVersionStamp().asVersionTag());
return false;
}
- if (re.isRemoved() && !putInfo.isReplaceOnClient()) {
+ if (re.isRemoved() && !replaceOnClient) {
return false;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 3aab945..7dcfd90 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -7310,6 +7310,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
+ void cleanUpOnIncompleteOp(EntryEventImpl event, RegionEntry regionEntry) {
+ // Ok to remove entry as index has not been modified yet by the operation
+ this.entries.removeEntry(event.getKey(), regionEntry, false);
+ }
+
public static void validateRegionName(String name, InternalRegionArguments internalRegionArgs) {
if (name == null) {
throw new IllegalArgumentException(
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPutContext.java b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPutContext.java
deleted file mode 100644
index f8a2778..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPutContext.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.internal.cache.map;
-
-import java.util.Set;
-
-import org.apache.geode.cache.CacheWriter;
-import org.apache.geode.cache.Operation;
-import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.cache.DistributedRegion;
-import org.apache.geode.internal.cache.EntryEventImpl;
-import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.RegionEntry;
-import org.apache.geode.internal.offheap.annotations.Released;
-
-/**
- * This class is used to hold a bunch of immutable items
- * used by AbstractRegionMap when doing a "put" operation.
- * It also has a few mutable items updates while a "put"
- * operation is being done.
- */
-public class RegionMapPutContext {
- private final EntryEventImpl event;
- private final boolean ifNew;
- private final boolean ifOld;
- private final boolean overwriteDestroyed;
- private final boolean requireOldValue;
- private final boolean uninitialized;
- private final boolean retrieveOldValueForDelta;
- private final boolean replaceOnClient;
- private final boolean onlyExisting;
- private final boolean cacheWrite;
- private final CacheWriter cacheWriter;
- private final Set netWriteRecipients;
- private final Object expectedOldValue;
- private boolean clearOccured;
- private long lastModifiedTime;
- private RegionEntry regionEntry;
- private boolean create;
- private boolean completed;
- @Released
- private Object oldValueForDelta;
-
- public RegionMapPutContext(LocalRegion owner, EntryEventImpl event, boolean ifNew, boolean ifOld,
- boolean overwriteDestroyed, boolean requireOldValue,
-
- Object expectedOldValue) {
- if (owner == null) {
- // "fix" for bug 32440
- Assert.assertTrue(false, "The owner for RegionMap " + this + " is null for event " + event);
- }
- this.event = event;
- this.ifNew = ifNew;
- this.ifOld = ifOld;
- this.overwriteDestroyed = overwriteDestroyed;
- this.requireOldValue = requireOldValue;
- this.uninitialized = !owner.isInitialized();
- this.retrieveOldValueForDelta = event.getDeltaBytes() != null && event.getRawNewValue() == null;
- this.replaceOnClient =
- event.getOperation() == Operation.REPLACE && owner.getServerProxy() != null;
- this.onlyExisting = ifOld && !isReplaceOnClient();
- this.cacheWriter = owner.basicGetWriter();
- this.cacheWrite = !event.isOriginRemote() && !event.isNetSearch() && event.isGenerateCallbacks()
- && (getCacheWriter() != null || owner.hasServerProxy() || owner.getScope().isDistributed());
- this.expectedOldValue = expectedOldValue;
- if (isCacheWrite()) {
- if (getCacheWriter() == null && owner.getScope().isDistributed()) {
- this.netWriteRecipients =
- ((DistributedRegion) owner).getCacheDistributionAdvisor().adviseNetWrite();
- } else {
- this.netWriteRecipients = null;
- }
- } else {
- this.netWriteRecipients = null;
- }
- }
-
- public boolean isIfNew() {
- return ifNew;
- }
-
- public boolean isIfOld() {
- return ifOld;
- }
-
- public boolean isOverwriteDestroyed() {
- return overwriteDestroyed;
- }
-
- public boolean isRequireOldValue() {
- return requireOldValue;
- }
-
- public boolean isUninitialized() {
- return uninitialized;
- }
-
- public boolean isRetrieveOldValueForDelta() {
- return retrieveOldValueForDelta;
- }
-
- public boolean isReplaceOnClient() {
- return replaceOnClient;
- }
-
- public boolean isOnlyExisting() {
- return onlyExisting;
- }
-
- public boolean isCacheWrite() {
- return cacheWrite;
- }
-
- public CacheWriter getCacheWriter() {
- return cacheWriter;
- }
-
- public Set getNetWriteRecipients() {
- return netWriteRecipients;
- }
-
- public Object getExpectedOldValue() {
- return expectedOldValue;
- }
-
- public boolean getClearOccured() {
- return clearOccured;
- }
-
- public void setClearOccured(boolean clearOccured) {
- this.clearOccured = clearOccured;
- }
-
- public long getLastModifiedTime() {
- return lastModifiedTime;
- }
-
- public void setLastModifiedTime(long lastModifiedTime) {
- this.lastModifiedTime = lastModifiedTime;
- }
-
- public RegionEntry getRegionEntry() {
- return regionEntry;
- }
-
- public void setRegionEntry(RegionEntry regionEntry) {
- this.regionEntry = regionEntry;
- }
-
- /**
- * @return true if put created a new entry; false if it updated an existing one.
- */
- public boolean isCreate() {
- return create;
- }
-
- public void setCreate(boolean v) {
- this.create = v;
- }
-
- public EntryEventImpl getEvent() {
- return event;
- }
-
- public boolean isCompleted() {
- return this.completed;
- }
-
- public void setCompleted(boolean b) {
- this.completed = b;
- }
-
- public Object getOldValueForDelta() {
- return this.oldValueForDelta;
- }
-
- public void setOldValueForDelta(Object value) {
- this.oldValueForDelta = value;
- }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
index a38d090..d3053b0 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
@@ -20,10 +20,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -32,11 +30,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -45,12 +38,10 @@ import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.Operation;
-import org.apache.geode.cache.Scope;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.eviction.EvictableEntry;
import org.apache.geode.internal.cache.eviction.EvictionController;
import org.apache.geode.internal.cache.eviction.EvictionCounters;
-import org.apache.geode.internal.cache.map.RegionMapPutContext;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionHolder;
import org.apache.geode.internal.cache.versions.VersionTag;
@@ -114,7 +105,6 @@ public class AbstractRegionMapTest {
// invalidate on region that is not initialized should create
// entry in map as invalid.
- when(arm._getOwner().isInitialized()).thenReturn(false);
assertThatThrownBy(() -> arm.invalidate(event, true, false, false))
.isInstanceOf(EntryNotFoundException.class);
@@ -132,7 +122,6 @@ public class AbstractRegionMapTest {
// invalidate on region that is not initialized should create
// entry in map as invalid.
- when(arm._getOwner().isInitialized()).thenReturn(false);
assertThatThrownBy(() -> arm.invalidate(event, true, false, false))
.isInstanceOf(EntryNotFoundException.class);
@@ -169,7 +158,6 @@ public class AbstractRegionMapTest {
// invalidate on region that is not initialized should create
// entry in map as invalid.
- when(arm._getOwner().isInitialized()).thenReturn(false);
assertTrue(arm.invalidate(event, true, true, false));
verify(arm._getOwner(), times(1)).basicInvalidatePart2(any(), any(), anyBoolean(),
anyBoolean());
@@ -740,139 +728,6 @@ public class AbstractRegionMapTest {
assertEquals(re.getValueAsToken(), token);
}
- @Test
- public void updateOnEmptyMapReturnsNull() {
- final TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
- final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
-
- RegionEntry result = arm.basicPut(event, 0L, false, true, null, false, false);
-
- assertThat(result).isNull();
- verify(arm._getOwner(), never()).basicPutPart2(any(), any(), anyBoolean(), anyLong(),
- anyBoolean());
- verify(arm._getOwner(), never()).basicPutPart3(any(), any(), anyBoolean(), anyLong(),
- anyBoolean(), anyBoolean(), anyBoolean(), any(), anyBoolean());
- }
-
- @Test
- public void createOnExistingEntryReturnsNull() {
- final TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
- // do a create to get a region entry in the map
- arm.basicPut(createEventForCreate(arm._getOwner(), "key"), 0L, true, false, null, false, false);
- final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
- final boolean ifNew = true;
- final boolean ifOld = false;
- final boolean requireOldValue = false;
- final Object expectedOldValue = null;
-
- RegionEntry result =
- arm.basicPut(event, 0L, ifNew, ifOld, expectedOldValue, requireOldValue, false);
- assertThat(result).isNull();
- verify(arm._getOwner(), never()).basicPutPart2(eq(event), any(), anyBoolean(), anyLong(),
- anyBoolean());
- verify(arm._getOwner(), never()).basicPutPart3(eq(event), any(), anyBoolean(), anyLong(),
- anyBoolean(), anyBoolean(), anyBoolean(), any(), anyBoolean());
- }
-
- @Test
- public void createOnEntryReturnedFromPutIfAbsentDoesNothing() {
- CustomEntryConcurrentHashMap<String, RegionEntry> map =
- mock(CustomEntryConcurrentHashMap.class);
- RegionEntry entry = mock(RegionEntry.class);
- when(entry.getValueAsToken()).thenReturn(Token.NOT_A_TOKEN);
- when(map.get(KEY)).thenReturn(null);
- when(map.putIfAbsent((String) eq(KEY), any())).thenReturn(entry);
- final TestableAbstractRegionMap arm = new TestableAbstractRegionMap(false, map, null);
- final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
- final boolean ifNew = true;
- final boolean ifOld = false;
- final boolean requireOldValue = false;
- final Object expectedOldValue = null;
-
- RegionEntry result =
- arm.basicPut(event, 0L, ifNew, ifOld, expectedOldValue, requireOldValue, false);
- assertThat(result).isNull();
- verify(arm._getOwner(), never()).basicPutPart2(eq(event), any(), anyBoolean(), anyLong(),
- anyBoolean());
- verify(arm._getOwner(), never()).basicPutPart3(eq(event), any(), anyBoolean(), anyLong(),
- anyBoolean(), anyBoolean(), anyBoolean(), any(), anyBoolean());
- }
-
- @Test
- public void createOnExistingEntryWithRemovePhase2DoesCreate() throws RegionClearedException {
- final TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
- // do a create to get a region entry in the map
- RegionEntry createdEntry = arm.basicPut(createEventForCreate(arm._getOwner(), "key"), 0L, true,
- false, null, false, false);
- createdEntry.setValue(null, Token.REMOVED_PHASE2);
- final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
- event.setNewValue("create");
- final boolean ifNew = true;
- final boolean ifOld = false;
- final boolean requireOldValue = false;
- final Object expectedOldValue = null;
-
- RegionEntry result =
- arm.basicPut(event, 0L, ifNew, ifOld, expectedOldValue, requireOldValue, false);
-
- assertThat(result).isNotNull();
- assertThat(result).isNotSameAs(createdEntry);
- assertThat(result.getKey()).isEqualTo("key");
- assertThat(result.getValue()).isEqualTo("create");
- verify(arm._getOwner(), times(1)).basicPutPart2(eq(event), eq(result), eq(true), anyLong(),
- eq(false));
- verify(arm._getOwner(), times(1)).basicPutPart3(eq(event), eq(result), eq(true), anyLong(),
- eq(true), eq(ifNew), eq(ifOld), eq(expectedOldValue), eq(requireOldValue));
- }
-
-
- @Test
- public void updateOnExistingEntryDoesUpdate() {
- final TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
- // do a create to get a region entry in the map
- RegionEntry createdEntry = arm.basicPut(createEventForCreate(arm._getOwner(), "key"), 0L, true,
- false, null, false, false);
- final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
- event.setNewValue("update");
- final boolean ifNew = false;
- final boolean ifOld = true;
- final boolean requireOldValue = false;
- final Object expectedOldValue = null;
-
- RegionEntry result =
- arm.basicPut(event, 0L, ifNew, ifOld, expectedOldValue, requireOldValue, false);
-
- assertThat(result).isSameAs(createdEntry);
- assertThat(result.getKey()).isEqualTo("key");
- assertThat(result.getValue()).isEqualTo("update");
- verify(arm._getOwner(), times(1)).basicPutPart2(eq(event), eq(result), eq(true), anyLong(),
- eq(false));
- verify(arm._getOwner(), times(1)).basicPutPart3(eq(event), eq(result), eq(true), anyLong(),
- eq(true), eq(ifNew), eq(ifOld), eq(expectedOldValue), eq(requireOldValue));
- }
-
- @Test
- public void createOnEmptyMapAddsEntry() {
- final TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
- final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
- event.setNewValue("value");
- final boolean ifNew = true;
- final boolean ifOld = false;
- final boolean requireOldValue = false;
- final Object expectedOldValue = null;
-
- RegionEntry result =
- arm.basicPut(event, 0L, ifNew, ifOld, expectedOldValue, requireOldValue, false);
-
- assertThat(result).isNotNull();
- assertThat(result.getKey()).isEqualTo("key");
- assertThat(result.getValue()).isEqualTo("value");
- verify(arm._getOwner(), times(1)).basicPutPart2(eq(event), eq(result), eq(true), anyLong(),
- eq(false));
- verify(arm._getOwner(), times(1)).basicPutPart3(eq(event), eq(result), eq(true), anyLong(),
- eq(true), eq(ifNew), eq(ifOld), eq(expectedOldValue), eq(requireOldValue));
- }
-
/**
* TestableAbstractRegionMap
*/
@@ -894,8 +749,6 @@ public class AbstractRegionMapTest {
when(owner.getCachePerfStats()).thenReturn(cachePerfStats);
when(owner.getConcurrencyChecksEnabled()).thenReturn(withConcurrencyChecks);
when(owner.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
- when(owner.getScope()).thenReturn(Scope.LOCAL);
- when(owner.isInitialized()).thenReturn(true);
doThrow(EntryNotFoundException.class).when(owner).checkEntryNotFound(any());
initialize(owner, new Attributes(), null, false);
if (map != null) {
@@ -907,88 +760,6 @@ public class AbstractRegionMapTest {
}
}
- @Test
- public void verifyConcurrentCreateHasCorrectResult() throws Exception {
- CountDownLatch firstCreateAddedUninitializedEntry = new CountDownLatch(1);
- CountDownLatch secondCreateFoundFirstCreatesEntry = new CountDownLatch(1);
- TestableBasicPutMap arm = new TestableBasicPutMap(firstCreateAddedUninitializedEntry,
- secondCreateFoundFirstCreatesEntry);
- // The key needs to be long enough to not be stored inline on the region entry.
- String key1 = "lonGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGkey";
- String key2 = new String(key1);
-
- Future<RegionEntry> future = doFirstCreateInAnotherThread(arm, key1);
- if (!firstCreateAddedUninitializedEntry.await(5, TimeUnit.SECONDS)) {
- // something is wrong with the other thread
- // so count down the latch it may be waiting
- // on and then call get to see what went wrong with him.
- secondCreateFoundFirstCreatesEntry.countDown();
- fail("other thread took too long. It returned " + future.get());
- }
- EntryEventImpl event = createEventForCreate(arm._getOwner(), key2);
- // now do the second create
- RegionEntry result = arm.basicPut(event, 0L, true, false, null, false, false);
-
- RegionEntry resultFromOtherThread = future.get();
-
- assertThat(result).isNull();
- assertThat(resultFromOtherThread).isNotNull();
- assertThat(resultFromOtherThread.getKey()).isSameAs(key1);
- }
-
- private EntryEventImpl createEventForCreate(LocalRegion lr, String key) {
- when(lr.getKeyInfo(key)).thenReturn(new KeyInfo(key, null, null));
- EntryEventImpl event =
- EntryEventImpl.create(lr, Operation.CREATE, key, false, null, true, false);
- event.setNewValue("create_value");
- return event;
- }
-
- private Future<RegionEntry> doFirstCreateInAnotherThread(TestableBasicPutMap arm, String key) {
- Future<RegionEntry> result = CompletableFuture.supplyAsync(() -> {
- EntryEventImpl event = createEventForCreate(arm._getOwner(), key);
- return arm.basicPut(event, 0L, true, false, null, false, false);
- });
- return result;
- }
-
- private static class TestableBasicPutMap extends TestableAbstractRegionMap {
- private final CountDownLatch firstCreateAddedUninitializedEntry;
- private final CountDownLatch secondCreateFoundFirstCreatesEntry;
- private boolean alreadyCalledPutIfAbsentNewEntry;
- private boolean alreadyCalledAddRegionEntryToMapAndDoPut;
-
- public TestableBasicPutMap(CountDownLatch removePhase1Completed,
- CountDownLatch secondCreateFoundFirstCreatesEntry) {
- super();
- this.firstCreateAddedUninitializedEntry = removePhase1Completed;
- this.secondCreateFoundFirstCreatesEntry = secondCreateFoundFirstCreatesEntry;
- }
-
- @Override
- protected boolean addRegionEntryToMapAndDoPut(final RegionMapPutContext putInfo) {
- if (!alreadyCalledAddRegionEntryToMapAndDoPut) {
- alreadyCalledAddRegionEntryToMapAndDoPut = true;
- } else {
- this.secondCreateFoundFirstCreatesEntry.countDown();
- }
- return super.addRegionEntryToMapAndDoPut(putInfo);
- }
-
- @Override
- protected void putIfAbsentNewEntry(final RegionMapPutContext putInfo) {
- super.putIfAbsentNewEntry(putInfo);
- if (!alreadyCalledPutIfAbsentNewEntry) {
- alreadyCalledPutIfAbsentNewEntry = true;
- this.firstCreateAddedUninitializedEntry.countDown();
- try {
- this.secondCreateFoundFirstCreatesEntry.await(5, TimeUnit.SECONDS);
- } catch (InterruptedException ignore) {
- }
- }
- }
- }
-
/**
* TestableVMLRURegionMap
*/
--
To stop receiving notification emails like this one, please contact
dschneider@apache.org.