You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/04/12 18:34:09 UTC
[geode] branch develop updated: GEODE-4957: fix race in concurrent
create on region (#1750)
This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 65b52f2 GEODE-4957: fix race in concurrent create on region (#1750)
65b52f2 is described below
commit 65b52f28f4e11b9f5ec82b0e457fc0292132e5da
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Thu Apr 12 11:34:02 2018 -0700
GEODE-4957: fix race in concurrent create on region (#1750)
Refactored basicPut so that a RegionEntry with REMOVE_PHASE1
is added to the map, it will already be synchronized.
This prevents a second concurrent threads from "stealing" it and
creating a region entry with a key that is the same but has a different
identity.
---
.../geode/internal/cache/AbstractRegionMap.java | 576 +++++++++++----------
.../apache/geode/internal/cache/LocalRegion.java | 5 -
.../internal/cache/map/RegionMapPutContext.java | 194 +++++++
.../internal/cache/AbstractRegionMapTest.java | 231 +++++++++
4 files changed, 728 insertions(+), 278 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 493410b..2c57182 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -29,7 +29,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.GemFireIOException;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.cache.CacheEvent;
-import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.EntryNotFoundException;
@@ -59,6 +58,7 @@ import org.apache.geode.internal.cache.ha.HARegionQueue;
import org.apache.geode.internal.cache.map.CacheModificationLock;
import org.apache.geode.internal.cache.map.FocusedRegionMap;
import org.apache.geode.internal.cache.map.RegionMapDestroy;
+import org.apache.geode.internal.cache.map.RegionMapPutContext;
import org.apache.geode.internal.cache.persistence.DiskRegionView;
import org.apache.geode.internal.cache.region.entry.RegionEntryFactoryBuilder;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
@@ -72,7 +72,6 @@ import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionStamp;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
-import org.apache.geode.internal.concurrent.MapCallbackAdapter;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
@@ -2165,279 +2164,305 @@ public abstract class AbstractRegionMap
}
}
- private RegionEntry getOrCreateRegionEntry(Object ownerRegion, EntryEventImpl event, Object value,
- MapCallbackAdapter<Object, Object, Object, Object> valueCreator, boolean onlyExisting,
- boolean returnTombstone) {
- Object key = event.getKey();
- RegionEntry retVal = null;
- retVal = getEntry(event);
- if (onlyExisting) {
- if (!returnTombstone && (retVal != null && retVal.isTombstone())) {
- return null;
- }
- return retVal;
- }
- if (retVal != null) {
- return retVal;
- }
- if (valueCreator != null) {
- value = valueCreator.newValue(key, ownerRegion, value, null);
- }
- retVal = getEntryFactory().createEntry((RegionEntryContext) ownerRegion, key, value);
- RegionEntry oldRe = putEntryIfAbsent(key, retVal);
- if (oldRe != null) {
- if (retVal instanceof OffHeapRegionEntry) {
- ((OffHeapRegionEntry) retVal).release();
- }
- return oldRe;
- }
- return retVal;
- }
-
/*
* returns null if the operation fails
*/
@Override
- public RegionEntry basicPut(EntryEventImpl event, final long lastModified, final boolean ifNew,
- final boolean ifOld, Object expectedOldValue, // only non-null if ifOld
- boolean requireOldValue, final boolean overwriteDestroyed)
+ public RegionEntry basicPut(EntryEventImpl event, final long unused, final boolean ifNew,
+ final boolean ifOld, final Object expectedOldValue, // only non-null if ifOld
+ final boolean requireOldValue, final boolean overwriteDestroyed)
throws CacheWriterException, TimeoutException {
final LocalRegion owner = _getOwner();
+ final RegionMapPutContext putInfo = new RegionMapPutContext(owner, event, ifNew, ifOld,
+ overwriteDestroyed, requireOldValue, expectedOldValue);
entryEventSerialization.serializeNewValueIfNeeded(owner, event);
-
- boolean clearOccured = false;
- if (owner == null) {
- // "fix" for bug 32440
- Assert.assertTrue(false, "The owner for RegionMap " + this + " is null for event " + event);
- }
- if (logger.isTraceEnabled(LogMarker.LRU_TOMBSTONE_COUNT_VERBOSE)
- && !(owner instanceof HARegion)) {
- logger.trace(LogMarker.LRU_TOMBSTONE_COUNT_VERBOSE,
- "ARM.basicPut called for {} expectedOldValue={} requireOldValue={} ifNew={} ifOld={} initialized={} overwriteDestroyed={}",
- event, expectedOldValue, requireOldValue, ifNew, ifOld, owner.isInitialized(),
- overwriteDestroyed);
- }
-
- RegionEntry result = null;
- long lastModifiedTime = 0;
- // copy into local var to prevent race condition with setter
- final CacheWriter cacheWriter = owner.basicGetWriter();
- final boolean cacheWrite =
- !event.isOriginRemote() && !event.isNetSearch() && event.isGenerateCallbacks()
- && (cacheWriter != null || owner.hasServerProxy() || owner.scope.isDistributed());
- /*
- * For performance reason, we try to minimize object creation and do as much work as we can
- * outside of synchronization, especially getting distribution advice.
- */
- final Set netWriteRecipients;
- if (cacheWrite) {
- if (cacheWriter == null && owner.scope.isDistributed()) {
- netWriteRecipients =
- ((DistributedRegion) owner).getCacheDistributionAdvisor().adviseNetWrite();
- } else {
- netWriteRecipients = null;
- }
+ runWhileLockedForCacheModification(event, () -> doBasicPut(putInfo));
+ if (putInfo.isCompleted()) {
+ return putInfo.getRegionEntry();
} else {
- netWriteRecipients = null;
+ return null;
}
+ }
- // mbid: this has been added to maintain consistency between the disk region
- // and the region map after clear() has been called. This will set the
- // reference of the diskSegmentRegion as a ThreadLocal so that if the diskRegionSegment
- // is later changed by another thread, we can do the necessary.
- boolean uninitialized = !owner.isInitialized();
- boolean retrieveOldValueForDelta =
- event.getDeltaBytes() != null && event.getRawNewValue() == null;
- IndexManager oqlIndexManager = null;
+ private void runWhileLockedForCacheModification(EntryEventImpl event, Runnable r) {
+ final LocalRegion owner = _getOwner();
lockForCacheModification(owner, event);
try {
+ r.run();
+ } finally {
+ releaseCacheModificationLock(owner, event);
+ }
+ }
+
+ private void doBasicPut(final RegionMapPutContext putInfo) {
+ try {
+ doWithIndexInUpdateMode(() -> {
+ do {
+ putInfo.setRegionEntry(null);
+ if (!findExistingEntry(putInfo)) {
+ return;
+ }
+ createNewEntryIfNeeded(putInfo);
+ } while (!addRegionEntryToMapAndDoPut(putInfo));
+ });
+ } catch (DiskAccessException dae) {
+ _getOwner().handleDiskAccessException(dae);
+ throw dae;
+ } finally {
+ doAfterPut(putInfo);
+ }
+ }
+
+ private void doWithIndexInUpdateMode(Runnable r) {
+ final IndexManager oqlIndexManager = getInitializedIndexManager();
+ if (oqlIndexManager != null) {
try {
- // Fix for Bug #44431. We do NOT want to update the region and wait
- // later for index INIT as region.clear() can cause inconsistency if
- // happened in parallel as it also does index INIT.
- oqlIndexManager = owner.getIndexManager();
- if (oqlIndexManager != null) {
- oqlIndexManager.waitForIndexInit();
- }
+ r.run();
+ } finally {
+ oqlIndexManager.countDownIndexUpdaters();
+ }
+ } else {
+ r.run();
+ }
+ }
- // fix for bug #42169, replace must go to server if entry not on client
- boolean replaceOnClient =
- event.getOperation() == Operation.REPLACE && owner.getServerProxy() != null;
- // Rather than having two different blocks for synchronizing oldRe
- // and newRe, have only one block and synchronize re
- RegionEntry re = null;
- boolean eventRecorded = false;
- boolean onlyExisting = ifOld && !replaceOnClient;
+ /**
+ * Stores the found entry in putInfo.getRegionEntry.
+ *
+ * @return false if an existing entry was not found and this put requires
+ * an existing one; otherwise returns true.
+ */
+ private boolean findExistingEntry(final RegionMapPutContext putInfo) {
+ final Object key = putInfo.getEvent().getKey();
+ RegionEntry re = getEntry(key);
+ if (putInfo.isOnlyExisting()) {
+ if (re == null || re.isTombstone()) {
+ return false;
+ }
+ }
+ putInfo.setRegionEntry(re);
+ return true;
+ }
- re = getOrCreateRegionEntry(owner, event,
+ /**
+ * Stores the created entry in putInfo.getRegionEntry.
+ */
+ private void createNewEntryIfNeeded(final RegionMapPutContext putInfo) {
+ putInfo.setCreate(putInfo.getRegionEntry() == null);
+ if (putInfo.isCreate()) {
+ final Object key = putInfo.getEvent().getKey();
+ RegionEntry newEntry = getEntryFactory().createEntry(_getOwner(), key, Token.REMOVED_PHASE1);
+ putInfo.setRegionEntry(newEntry);
+ }
+ }
- Token.REMOVED_PHASE1, null, onlyExisting, false);
- if (re == null) {
- return null;
- }
- while (true) {
- synchronized (re) {
- // if the re goes into removed2 state, it will be removed
- // from the map. otherwise we can append an event to it
- // and change its state
- if (re.isRemovedPhase2()) {
- _getOwner().getCachePerfStats().incRetries();
- getEntryMap().remove(event.getKey(), re);
- re = getOrCreateRegionEntry(owner, event, Token.REMOVED_PHASE1, null, onlyExisting,
- false);
- if (re == null) {
- // this will happen when onlyExisting is true
- return null;
- }
- continue;
- } else {
- @Released
- Object oldValueForDelta = null;
- if (retrieveOldValueForDelta) {
- // defer the lruUpdateCallback to prevent a deadlock (see bug 51121).
- final boolean disabled = disableLruUpdateCallback();
- try {
- // Old value is faulted in from disk if not found in memory.
- oldValueForDelta = re.getValue(owner); // OFFHEAP: if we are synced on oldRe no
- // issue since we can use ARE's ref
- } finally {
- if (disabled) {
- enableLruUpdateCallback();
- }
- }
- }
+ /**
+ * @return false if caller should retry
+ */
+ protected boolean addRegionEntryToMapAndDoPut(final RegionMapPutContext putInfo) {
+ synchronized (putInfo.getRegionEntry()) {
+ putIfAbsentNewEntry(putInfo);
+ return doPutOnRegionEntry(putInfo);
+ }
+ }
- try {
+ protected void putIfAbsentNewEntry(final RegionMapPutContext putInfo) {
+ if (putInfo.isCreate()) {
+ RegionEntry oldRe = putEntryIfAbsent(putInfo.getEvent().getKey(), putInfo.getRegionEntry());
+ if (oldRe != null) {
+ putInfo.setCreate(false);
+ putInfo.setRegionEntry(oldRe);
+ }
+ }
+ }
- event.setRegionEntry(re);
- // set old value in event
- setOldValueInEvent(event, re, cacheWrite, requireOldValue);
- if (!continueUpdate(re, event, ifOld, replaceOnClient)) {
- return null;
- }
- // overwrite destroyed?
- if (!continueOverwriteDestroyed(re, event, overwriteDestroyed, ifNew)) {
- return null;
- }
- // check expectedOldValue
- if (!satisfiesExpectedOldValue(event, re, expectedOldValue, replaceOnClient)) {
- return null;
- }
- // invoke cacheWriter
- invokeCacheWriter(re, event, cacheWrite, cacheWriter, netWriteRecipients,
- requireOldValue, expectedOldValue, replaceOnClient);
+ /**
+ * @return false if caller should retry
+ */
+ private boolean doPutOnRegionEntry(final RegionMapPutContext putInfo) {
+ final RegionEntry re = putInfo.getRegionEntry();
- // notify index of an update
- notifyIndex(re, true);
- try {
- try {
- if ((cacheWrite && event.getOperation().isUpdate()) // if there is a
- // cacheWriter, type of
- // event has already been
- // set
- || !re.isRemoved() || replaceOnClient) {
- // update
- updateEntry(event, requireOldValue, oldValueForDelta, re);
- } else {
- // create
- createEntry(event, owner, re);
- }
- owner.recordEvent(event);
- eventRecorded = true;
- } catch (RegionClearedException rce) {
- clearOccured = true;
- owner.recordEvent(event);
- } catch (ConcurrentCacheModificationException ccme) {
- VersionTag tag = event.getVersionTag();
- if (tag != null && tag.isTimeStampUpdated()) {
- // Notify gateways of new time-stamp.
- owner.notifyTimestampsToGateways(event);
- }
- throw ccme;
- }
- if (uninitialized) {
- event.inhibitCacheListenerNotification(true);
- }
- updateLru(clearOccured, re, event);
+ synchronized (re) {
+ if (isRegionEntryRemoved(putInfo)) {
+ return false;
+ }
- lastModifiedTime = owner.basicPutPart2(event, re, !uninitialized,
- lastModifiedTime, clearOccured);
- } finally {
- notifyIndex(re, false);
- }
- result = re;
- break;
- } finally {
- OffHeapHelper.release(oldValueForDelta);
- if (re != null && !onlyExisting && !isOpComplete(re, event)) {
- owner.cleanUpOnIncompleteOp(event, re);
- } else if (re != null && owner.isUsedForPartitionedRegionBucket()) {
- BucketRegion br = (BucketRegion) owner;
- CachePerfStats stats = br.getPartitionedRegion().getCachePerfStats();
- }
- } // try
- }
- } // sync re
- } // end while
- } catch (DiskAccessException dae) {
- // Asif:Feel that it is safe to destroy the region here as there appears
- // to be no chance of deadlock during region destruction
- result = null;
- this._getOwner().handleDiskAccessException(dae);
- throw dae;
+ setOldValueForDelta(putInfo);
+ try {
+ setOldValueInEvent(putInfo);
+ doCreateOrUpdate(putInfo);
+ return true;
} finally {
- if (oqlIndexManager != null) {
- oqlIndexManager.countDownIndexUpdaters();
+ OffHeapHelper.release(putInfo.getOldValueForDelta());
+ putInfo.setOldValueForDelta(null);
+ if (!putInfo.isCompleted() && putInfo.isCreate()) {
+ // Region entry remove needs to be done while still synced on re.
+ removeEntry(putInfo.getEvent().getKey(), re, false);
}
- if (result != null) {
+ }
+ }
+ }
+
+ private IndexManager getInitializedIndexManager() {
+ IndexManager oqlIndexManager;
+ // Fix for Bug #44431. We do NOT want to update the region and wait
+ // later for index INIT as region.clear() can cause inconsistency if
+ // happened in parallel as it also does index INIT.
+ oqlIndexManager = _getOwner().getIndexManager();
+ if (oqlIndexManager != null) {
+ oqlIndexManager.waitForIndexInit();
+ }
+ return oqlIndexManager;
+ }
+
+ private void doAfterPut(RegionMapPutContext putInfo) {
+ if (putInfo.isCompleted()) {
+ final LocalRegion owner = _getOwner();
+ try {
+ final boolean invokeListeners = putInfo.getEvent().basicGetNewValue() != Token.TOMBSTONE;
+ owner.basicPutPart3(putInfo.getEvent(), putInfo.getRegionEntry(),
+ !putInfo.isUninitialized(), putInfo.getLastModifiedTime(), invokeListeners,
+ putInfo.isIfNew(), putInfo.isIfOld(), putInfo.getExpectedOldValue(),
+ putInfo.isRequireOldValue());
+ } finally {
+ if (!putInfo.getClearOccured()) {
try {
- // Note we do distribution after releasing all sync to avoid deadlock
- final boolean invokeListeners = event.basicGetNewValue() != Token.TOMBSTONE;
- owner.basicPutPart3(event, result, !uninitialized, lastModifiedTime, invokeListeners,
- ifNew, ifOld, expectedOldValue, requireOldValue);
- } finally {
- // bug 32589, post update may throw an exception if exception occurs
- // for any recipients
- if (!clearOccured) {
- try {
- lruUpdateCallback();
- } catch (DiskAccessException dae) {
- // Asif:Feel that it is safe to destroy the region here as there appears
- // to be no chance of deadlock during region destruction
- result = null;
- this._getOwner().handleDiskAccessException(dae);
- throw dae;
- }
- }
- } // finally
- } else {
- resetThreadLocals();
+ lruUpdateCallback();
+ } catch (DiskAccessException dae) {
+ owner.handleDiskAccessException(dae);
+ throw dae;
+ }
}
}
+ } else {
+ resetThreadLocals();
+ }
+ }
+
+ /**
+ * @return false if an early out check indicated that
+ * the put should not be done.
+ */
+ private boolean shouldPutContinue(final RegionMapPutContext putInfo) {
+ if (continueUpdate(putInfo) && continueOverwriteDestroyed(putInfo)
+ && satisfiesExpectedOldValue(putInfo)) {
+ return true;
+ }
+ return false;
+ }
+
+ private void doCreateOrUpdate(final RegionMapPutContext putInfo) {
+ if (!shouldPutContinue(putInfo)) {
+ return;
+ }
+ invokeCacheWriter(putInfo);
+
+ runWithIndexUpdatingInProgress(putInfo, () -> {
+ final EntryEventImpl event = putInfo.getEvent();
+ createOrUpdateEntry(putInfo);
+ if (putInfo.isUninitialized()) {
+ event.inhibitCacheListenerNotification(true);
+ }
+ updateLru(putInfo);
+
+ final RegionEntry re = putInfo.getRegionEntry();
+ long lastModTime = _getOwner().basicPutPart2(event, re, !putInfo.isUninitialized(),
+ putInfo.getLastModifiedTime(), putInfo.getClearOccured());
+ putInfo.setLastModifiedTime(lastModTime);
+ putInfo.setCompleted(true);
+ });
+ }
+
+ private void runWithIndexUpdatingInProgress(RegionMapPutContext putInfo, Runnable r) {
+ final RegionEntry re = putInfo.getRegionEntry();
+ notifyIndex(re, true);
+ try {
+ r.run();
} finally {
- releaseCacheModificationLock(owner, event);
+ notifyIndex(re, false);
+ }
+ }
+
+ private void createOrUpdateEntry(final RegionMapPutContext putInfo) {
+ final EntryEventImpl event = putInfo.getEvent();
+ final LocalRegion owner = _getOwner();
+ try {
+ if (isUpdate(putInfo)) {
+ updateEntry(putInfo);
+ } else {
+ createEntry(putInfo);
+ }
+ owner.recordEvent(event);
+ } catch (RegionClearedException rce) {
+ putInfo.setClearOccured(true);
+ owner.recordEvent(event);
+ } catch (ConcurrentCacheModificationException ccme) {
+ VersionTag tag = event.getVersionTag();
+ if (tag != null && tag.isTimeStampUpdated()) {
+ owner.notifyTimestampsToGateways(event);
+ }
+ throw ccme;
+ }
+ }
+
+ private boolean isUpdate(final RegionMapPutContext putInfo) {
+ if (putInfo.isCacheWrite() && putInfo.getEvent().getOperation().isUpdate()) {
+ // if there is a cacheWriter, type of event has already been set
+ return true;
+ }
+ if (putInfo.isReplaceOnClient()) {
+ return true;
+ }
+ if (!putInfo.getRegionEntry().isRemoved()) {
+ return true;
+ }
+ return false;
+ }
+
+ private void setOldValueForDelta(final RegionMapPutContext putInfo) {
+ if (putInfo.isRetrieveOldValueForDelta()) {
+ runWhileEvictionDisabled(() -> {
+ // Old value is faulted in from disk if not found in memory.
+ putInfo.setOldValueForDelta(putInfo.getRegionEntry().getValue(_getOwner()));
+ // OFFHEAP: if we are synced on region entry no issue since we can use ARE's ref
+ });
+ }
+ }
+
+ private void runWhileEvictionDisabled(Runnable r) {
+ final boolean disabled = disableLruUpdateCallback();
+ try {
+ r.run();
+ } finally {
+ if (disabled) {
+ enableLruUpdateCallback();
+ }
}
- return result;
}
/**
- * If the value in the VM is still REMOVED_PHASE1 Token, then the operation was not completed (due
- * to cacheWriter exception, concurrentMap operation) etc.
+ * If the re goes into removed2 state, it will be removed from the map.
+ *
+ * @return true if re was remove phase 2
*/
- private boolean isOpComplete(RegionEntry re, EntryEventImpl event) {
- if (re.getValueAsToken() == Token.REMOVED_PHASE1) {
+ private boolean isRegionEntryRemoved(final RegionMapPutContext putInfo) {
+ final RegionEntry re = putInfo.getRegionEntry();
+ if (re.isRemovedPhase2()) {
+ _getOwner().getCachePerfStats().incRetries();
+ getEntryMap().remove(putInfo.getEvent().getKey(), re);
+ return true;
+ } else {
return false;
}
- return true;
}
- private boolean satisfiesExpectedOldValue(EntryEventImpl event, RegionEntry re,
- Object expectedOldValue, boolean replaceOnClient) {
+ private boolean satisfiesExpectedOldValue(final RegionMapPutContext putInfo) {
// replace is propagated to server, so no need to check
// satisfiesOldValue on client
- if (expectedOldValue != null && !replaceOnClient) {
+ final EntryEventImpl event = putInfo.getEvent();
+ if (putInfo.getExpectedOldValue() != null && !putInfo.isReplaceOnClient()) {
assert event.getOperation().guaranteesOldValue();
// We already called setOldValueInEvent so the event will have the old value.
@Unretained
@@ -2445,7 +2470,8 @@ public abstract class AbstractRegionMap
// Note that v will be null instead of INVALID because setOldValue
// converts INVALID to null.
// But checkExpectedOldValue handle this and says INVALID equals null.
- if (!AbstractRegionEntry.checkExpectedOldValue(expectedOldValue, v, event.getRegion())) {
+ if (!AbstractRegionEntry.checkExpectedOldValue(putInfo.getExpectedOldValue(), v,
+ event.getRegion())) {
return false;
}
}
@@ -2453,10 +2479,12 @@ public abstract class AbstractRegionMap
}
// PRECONDITION: caller must be synced on re
- private void setOldValueInEvent(EntryEventImpl event, RegionEntry re, boolean cacheWrite,
- boolean requireOldValue) {
- boolean needToSetOldValue =
- cacheWrite || requireOldValue || event.getOperation().guaranteesOldValue();
+ private void setOldValueInEvent(final RegionMapPutContext putInfo) {
+ final EntryEventImpl event = putInfo.getEvent();
+ final RegionEntry re = putInfo.getRegionEntry();
+ event.setRegionEntry(re);
+ boolean needToSetOldValue = putInfo.isCacheWrite() || putInfo.isRequireOldValue()
+ || event.getOperation().guaranteesOldValue();
if (needToSetOldValue) {
if (event.getOperation().guaranteesOldValue()) {
// In these cases we want to even get the old value from disk if it is not in memory
@@ -2502,8 +2530,10 @@ public abstract class AbstractRegionMap
}
}
- protected void createEntry(EntryEventImpl event, final LocalRegion owner, RegionEntry re)
- throws RegionClearedException {
+ private void createEntry(final RegionMapPutContext putInfo) throws RegionClearedException {
+ final LocalRegion owner = _getOwner();
+ final EntryEventImpl event = putInfo.getEvent();
+ final RegionEntry re = putInfo.getRegionEntry();
final boolean wasTombstone = re.isTombstone();
processVersionTag(re, event);
event.putNewEntry(owner, re);
@@ -2513,22 +2543,24 @@ public abstract class AbstractRegionMap
}
}
- protected void updateEntry(EntryEventImpl event, boolean requireOldValue, Object oldValueForDelta,
- RegionEntry re) throws RegionClearedException {
- final int oldSize = event.getRegion().calculateRegionEntryValueSize(re);
+ private void updateEntry(final RegionMapPutContext putInfo) throws RegionClearedException {
+ final EntryEventImpl event = putInfo.getEvent();
+ final RegionEntry re = putInfo.getRegionEntry();
final boolean wasTombstone = re.isTombstone();
+ final int oldSize = event.getRegion().calculateRegionEntryValueSize(re);
processVersionTag(re, event);
- event.putExistingEntry(event.getRegion(), re, requireOldValue, oldValueForDelta);
+ event.putExistingEntry(event.getRegion(), re, putInfo.isRequireOldValue(),
+ putInfo.getOldValueForDelta());
EntryLogger.logPut(event);
updateSize(event, oldSize, true/* isUpdate */, wasTombstone);
}
- private void updateLru(boolean clearOccured, RegionEntry re, EntryEventImpl event) {
- if (!clearOccured) {
- if (event.getOperation().isCreate()) {
- lruEntryCreate(re);
+ private void updateLru(final RegionMapPutContext putInfo) {
+ if (!putInfo.getClearOccured()) {
+ if (putInfo.getEvent().getOperation().isCreate()) {
+ lruEntryCreate(putInfo.getRegionEntry());
} else {
- lruEntryUpdate(re);
+ lruEntryUpdate(putInfo.getRegionEntry());
}
}
}
@@ -2551,52 +2583,50 @@ public abstract class AbstractRegionMap
}
}
- private void invokeCacheWriter(RegionEntry re, EntryEventImpl event, boolean cacheWrite,
- CacheWriter cacheWriter, Set netWriteRecipients, boolean requireOldValue,
- Object expectedOldValue, boolean replaceOnClient) {
+ private void invokeCacheWriter(RegionMapPutContext putInfo) {
+ final EntryEventImpl event = putInfo.getEvent();
// invoke listeners only if region is initialized
- if (_getOwner().isInitialized() && cacheWrite) {
+ if (_getOwner().isInitialized() && putInfo.isCacheWrite()) {
// event.setOldValue already called in setOldValueInEvent
// bug #42638 for replaceOnClient, do not make the event create
// or update since replace must propagate to server
- if (!replaceOnClient) {
- if (re.isDestroyedOrRemoved()) {
+ if (!putInfo.isReplaceOnClient()) {
+ if (putInfo.getRegionEntry().isDestroyedOrRemoved()) {
event.makeCreate();
} else {
event.makeUpdate();
}
}
- _getOwner().cacheWriteBeforePut(event, netWriteRecipients, cacheWriter, requireOldValue,
- expectedOldValue);
+ _getOwner().cacheWriteBeforePut(event, putInfo.getNetWriteRecipients(),
+ putInfo.getCacheWriter(), putInfo.isRequireOldValue(), putInfo.getExpectedOldValue());
}
- if (!_getOwner().isInitialized() && !cacheWrite) {
- // block setting of old value in putNewValueNoSync, don't
- // need it
+ if (!_getOwner().isInitialized() && !putInfo.isCacheWrite()) {
+ // block setting of old value in putNewValueNoSync, don't need it
event.oldValueNotAvailable();
}
}
- private boolean continueOverwriteDestroyed(RegionEntry re, EntryEventImpl event,
- boolean overwriteDestroyed, boolean ifNew) {
- Token oldValueInVM = re.getValueAsToken();
+ private boolean continueOverwriteDestroyed(final RegionMapPutContext putInfo) {
+ Token oldValueInVM = putInfo.getRegionEntry().getValueAsToken();
// if region is under GII, check if token is destroyed
- if (!overwriteDestroyed) {
+ if (!putInfo.isOverwriteDestroyed()) {
if (!_getOwner().isInitialized()
&& (oldValueInVM == Token.DESTROYED || oldValueInVM == Token.TOMBSTONE)) {
- event.setOldValueDestroyedToken();
+ putInfo.getEvent().setOldValueDestroyedToken();
return false;
}
}
- if (ifNew && !Token.isRemoved(oldValueInVM)) {
+ if (putInfo.isIfNew() && !Token.isRemoved(oldValueInVM)) {
return false;
}
return true;
}
- private boolean continueUpdate(RegionEntry re, EntryEventImpl event, boolean ifOld,
- boolean replaceOnClient) {
- if (ifOld) {
+ private boolean continueUpdate(final RegionMapPutContext putInfo) {
+ if (putInfo.isIfOld()) {
+ final EntryEventImpl event = putInfo.getEvent();
+ final RegionEntry re = putInfo.getRegionEntry();
// only update, so just do tombstone maintainence and exit
if (re.isTombstone() && event.getVersionTag() != null) {
// refresh the tombstone so it doesn't time out too soon
@@ -2610,7 +2640,7 @@ public abstract class AbstractRegionMap
_getOwner().rescheduleTombstone(re, re.getVersionStamp().asVersionTag());
return false;
}
- if (re.isRemoved() && !replaceOnClient) {
+ if (re.isRemoved() && !putInfo.isReplaceOnClient()) {
return false;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index acb5e06..45a896f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -7322,11 +7322,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- void cleanUpOnIncompleteOp(EntryEventImpl event, RegionEntry regionEntry) {
- // Ok to remove entry as index has not been modified yet by the operation
- this.entries.removeEntry(event.getKey(), regionEntry, false);
- }
-
public static void validateRegionName(String name, InternalRegionArguments internalRegionArgs) {
if (name == null) {
throw new IllegalArgumentException(
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPutContext.java b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPutContext.java
new file mode 100644
index 0000000..f8a2778
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPutContext.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.map;
+
+import java.util.Set;
+
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.RegionEntry;
+import org.apache.geode.internal.offheap.annotations.Released;
+
+/**
+ * This class is used to hold a bunch of immutable items
+ * used by AbstractRegionMap when doing a "put" operation.
+ * It also has a few mutable items updates while a "put"
+ * operation is being done.
+ */
+public class RegionMapPutContext {
+ private final EntryEventImpl event;
+ private final boolean ifNew;
+ private final boolean ifOld;
+ private final boolean overwriteDestroyed;
+ private final boolean requireOldValue;
+ private final boolean uninitialized;
+ private final boolean retrieveOldValueForDelta;
+ private final boolean replaceOnClient;
+ private final boolean onlyExisting;
+ private final boolean cacheWrite;
+ private final CacheWriter cacheWriter;
+ private final Set netWriteRecipients;
+ private final Object expectedOldValue;
+ private boolean clearOccured;
+ private long lastModifiedTime;
+ private RegionEntry regionEntry;
+ private boolean create;
+ private boolean completed;
+ @Released
+ private Object oldValueForDelta;
+
+ public RegionMapPutContext(LocalRegion owner, EntryEventImpl event, boolean ifNew, boolean ifOld,
+ boolean overwriteDestroyed, boolean requireOldValue,
+
+ Object expectedOldValue) {
+ if (owner == null) {
+ // "fix" for bug 32440
+ Assert.assertTrue(false, "The owner for RegionMap " + this + " is null for event " + event);
+ }
+ this.event = event;
+ this.ifNew = ifNew;
+ this.ifOld = ifOld;
+ this.overwriteDestroyed = overwriteDestroyed;
+ this.requireOldValue = requireOldValue;
+ this.uninitialized = !owner.isInitialized();
+ this.retrieveOldValueForDelta = event.getDeltaBytes() != null && event.getRawNewValue() == null;
+ this.replaceOnClient =
+ event.getOperation() == Operation.REPLACE && owner.getServerProxy() != null;
+ this.onlyExisting = ifOld && !isReplaceOnClient();
+ this.cacheWriter = owner.basicGetWriter();
+ this.cacheWrite = !event.isOriginRemote() && !event.isNetSearch() && event.isGenerateCallbacks()
+ && (getCacheWriter() != null || owner.hasServerProxy() || owner.getScope().isDistributed());
+ this.expectedOldValue = expectedOldValue;
+ if (isCacheWrite()) {
+ if (getCacheWriter() == null && owner.getScope().isDistributed()) {
+ this.netWriteRecipients =
+ ((DistributedRegion) owner).getCacheDistributionAdvisor().adviseNetWrite();
+ } else {
+ this.netWriteRecipients = null;
+ }
+ } else {
+ this.netWriteRecipients = null;
+ }
+ }
+
+ public boolean isIfNew() {
+ return ifNew;
+ }
+
+ public boolean isIfOld() {
+ return ifOld;
+ }
+
+ public boolean isOverwriteDestroyed() {
+ return overwriteDestroyed;
+ }
+
+ public boolean isRequireOldValue() {
+ return requireOldValue;
+ }
+
+ public boolean isUninitialized() {
+ return uninitialized;
+ }
+
+ public boolean isRetrieveOldValueForDelta() {
+ return retrieveOldValueForDelta;
+ }
+
+ public boolean isReplaceOnClient() {
+ return replaceOnClient;
+ }
+
+ public boolean isOnlyExisting() {
+ return onlyExisting;
+ }
+
+ public boolean isCacheWrite() {
+ return cacheWrite;
+ }
+
+ public CacheWriter getCacheWriter() {
+ return cacheWriter;
+ }
+
+ public Set getNetWriteRecipients() {
+ return netWriteRecipients;
+ }
+
+ public Object getExpectedOldValue() {
+ return expectedOldValue;
+ }
+
+ public boolean getClearOccured() {
+ return clearOccured;
+ }
+
+ public void setClearOccured(boolean clearOccured) {
+ this.clearOccured = clearOccured;
+ }
+
+ public long getLastModifiedTime() {
+ return lastModifiedTime;
+ }
+
+ public void setLastModifiedTime(long lastModifiedTime) {
+ this.lastModifiedTime = lastModifiedTime;
+ }
+
+ public RegionEntry getRegionEntry() {
+ return regionEntry;
+ }
+
+ public void setRegionEntry(RegionEntry regionEntry) {
+ this.regionEntry = regionEntry;
+ }
+
+ /**
+ * @return true if put created a new entry; false if it updated an existing one.
+ */
+ public boolean isCreate() {
+ return create;
+ }
+
+ public void setCreate(boolean v) {
+ this.create = v;
+ }
+
+ public EntryEventImpl getEvent() {
+ return event;
+ }
+
+ public boolean isCompleted() {
+ return this.completed;
+ }
+
+ public void setCompleted(boolean b) {
+ this.completed = b;
+ }
+
+ public Object getOldValueForDelta() {
+ return this.oldValueForDelta;
+ }
+
+ public void setOldValueForDelta(Object value) {
+ this.oldValueForDelta = value;
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
index d3053b0..ed5ccf2 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
@@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -30,6 +32,13 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -38,10 +47,12 @@ import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Scope;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.eviction.EvictableEntry;
import org.apache.geode.internal.cache.eviction.EvictionController;
import org.apache.geode.internal.cache.eviction.EvictionCounters;
+import org.apache.geode.internal.cache.map.RegionMapPutContext;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionHolder;
import org.apache.geode.internal.cache.versions.VersionTag;
@@ -105,6 +116,7 @@ public class AbstractRegionMapTest {
// invalidate on region that is not initialized should create
// entry in map as invalid.
+ when(arm._getOwner().isInitialized()).thenReturn(false);
assertThatThrownBy(() -> arm.invalidate(event, true, false, false))
.isInstanceOf(EntryNotFoundException.class);
@@ -122,6 +134,7 @@ public class AbstractRegionMapTest {
// invalidate on region that is not initialized should create
// entry in map as invalid.
+ when(arm._getOwner().isInitialized()).thenReturn(false);
assertThatThrownBy(() -> arm.invalidate(event, true, false, false))
.isInstanceOf(EntryNotFoundException.class);
@@ -158,6 +171,7 @@ public class AbstractRegionMapTest {
// invalidate on region that is not initialized should create
// entry in map as invalid.
+ when(arm._getOwner().isInitialized()).thenReturn(false);
assertTrue(arm.invalidate(event, true, true, false));
verify(arm._getOwner(), times(1)).basicInvalidatePart2(any(), any(), anyBoolean(),
anyBoolean());
@@ -728,6 +742,139 @@ public class AbstractRegionMapTest {
assertEquals(re.getValueAsToken(), token);
}
+ @Test
+ public void updateOnEmptyMapReturnsNull() {
+ final TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
+ final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
+
+ RegionEntry result = arm.basicPut(event, 0L, false, true, null, false, false);
+
+ assertThat(result).isNull();
+ verify(arm._getOwner(), never()).basicPutPart2(any(), any(), anyBoolean(), anyLong(),
+ anyBoolean());
+ verify(arm._getOwner(), never()).basicPutPart3(any(), any(), anyBoolean(), anyLong(),
+ anyBoolean(), anyBoolean(), anyBoolean(), any(), anyBoolean());
+ }
+
+ @Test
+ public void createOnExistingEntryReturnsNull() {
+ final TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
+ // do a create to get a region entry in the map
+ arm.basicPut(createEventForCreate(arm._getOwner(), "key"), 0L, true, false, null, false, false);
+ final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
+ final boolean ifNew = true;
+ final boolean ifOld = false;
+ final boolean requireOldValue = false;
+ final Object expectedOldValue = null;
+
+ RegionEntry result =
+ arm.basicPut(event, 0L, ifNew, ifOld, expectedOldValue, requireOldValue, false);
+ assertThat(result).isNull();
+ verify(arm._getOwner(), never()).basicPutPart2(eq(event), any(), anyBoolean(), anyLong(),
+ anyBoolean());
+ verify(arm._getOwner(), never()).basicPutPart3(eq(event), any(), anyBoolean(), anyLong(),
+ anyBoolean(), anyBoolean(), anyBoolean(), any(), anyBoolean());
+ }
+
+ @Test
+ public void createOnEntryReturnedFromPutIfAbsentDoesNothing() {
+ CustomEntryConcurrentHashMap<String, RegionEntry> map =
+ mock(CustomEntryConcurrentHashMap.class);
+ RegionEntry entry = mock(RegionEntry.class);
+ when(entry.getValueAsToken()).thenReturn(Token.NOT_A_TOKEN);
+ when(map.get(KEY)).thenReturn(null);
+ when(map.putIfAbsent((String) eq(KEY), any())).thenReturn(entry);
+ final TestableAbstractRegionMap arm = new TestableAbstractRegionMap(false, map, null);
+ final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
+ final boolean ifNew = true;
+ final boolean ifOld = false;
+ final boolean requireOldValue = false;
+ final Object expectedOldValue = null;
+
+ RegionEntry result =
+ arm.basicPut(event, 0L, ifNew, ifOld, expectedOldValue, requireOldValue, false);
+ assertThat(result).isNull();
+ verify(arm._getOwner(), never()).basicPutPart2(eq(event), any(), anyBoolean(), anyLong(),
+ anyBoolean());
+ verify(arm._getOwner(), never()).basicPutPart3(eq(event), any(), anyBoolean(), anyLong(),
+ anyBoolean(), anyBoolean(), anyBoolean(), any(), anyBoolean());
+ }
+
+ @Test
+ public void createOnExistingEntryWithRemovePhase2DoesCreate() throws RegionClearedException {
+ final TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
+ // do a create to get a region entry in the map
+ RegionEntry createdEntry = arm.basicPut(createEventForCreate(arm._getOwner(), "key"), 0L, true,
+ false, null, false, false);
+ createdEntry.setValue(null, Token.REMOVED_PHASE2);
+ final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
+ event.setNewValue("create");
+ final boolean ifNew = true;
+ final boolean ifOld = false;
+ final boolean requireOldValue = false;
+ final Object expectedOldValue = null;
+
+ RegionEntry result =
+ arm.basicPut(event, 0L, ifNew, ifOld, expectedOldValue, requireOldValue, false);
+
+ assertThat(result).isNotNull();
+ assertThat(result).isNotSameAs(createdEntry);
+ assertThat(result.getKey()).isEqualTo("key");
+ assertThat(result.getValue()).isEqualTo("create");
+ verify(arm._getOwner(), times(1)).basicPutPart2(eq(event), eq(result), eq(true), anyLong(),
+ eq(false));
+ verify(arm._getOwner(), times(1)).basicPutPart3(eq(event), eq(result), eq(true), anyLong(),
+ eq(true), eq(ifNew), eq(ifOld), eq(expectedOldValue), eq(requireOldValue));
+ }
+
+
+ @Test
+ public void updateOnExistingEntryDoesUpdate() {
+ final TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
+ // do a create to get a region entry in the map
+ RegionEntry createdEntry = arm.basicPut(createEventForCreate(arm._getOwner(), "key"), 0L, true,
+ false, null, false, false);
+ final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
+ event.setNewValue("update");
+ final boolean ifNew = false;
+ final boolean ifOld = true;
+ final boolean requireOldValue = false;
+ final Object expectedOldValue = null;
+
+ RegionEntry result =
+ arm.basicPut(event, 0L, ifNew, ifOld, expectedOldValue, requireOldValue, false);
+
+ assertThat(result).isSameAs(createdEntry);
+ assertThat(result.getKey()).isEqualTo("key");
+ assertThat(result.getValue()).isEqualTo("update");
+ verify(arm._getOwner(), times(1)).basicPutPart2(eq(event), eq(result), eq(true), anyLong(),
+ eq(false));
+ verify(arm._getOwner(), times(1)).basicPutPart3(eq(event), eq(result), eq(true), anyLong(),
+ eq(true), eq(ifNew), eq(ifOld), eq(expectedOldValue), eq(requireOldValue));
+ }
+
+ @Test
+ public void createOnEmptyMapAddsEntry() {
+ final TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
+ final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
+ event.setNewValue("value");
+ final boolean ifNew = true;
+ final boolean ifOld = false;
+ final boolean requireOldValue = false;
+ final Object expectedOldValue = null;
+
+ RegionEntry result =
+ arm.basicPut(event, 0L, ifNew, ifOld, expectedOldValue, requireOldValue, false);
+
+ assertThat(result).isNotNull();
+ assertThat(result.getKey()).isEqualTo("key");
+ assertThat(result.getValue()).isEqualTo("value");
+ verify(arm._getOwner(), times(1)).basicPutPart2(eq(event), eq(result), eq(true), anyLong(),
+ eq(false));
+ verify(arm._getOwner(), times(1)).basicPutPart3(eq(event), eq(result), eq(true), anyLong(),
+ eq(true), eq(ifNew), eq(ifOld), eq(expectedOldValue), eq(requireOldValue));
+ }
+
/**
* TestableAbstractRegionMap
*/
@@ -749,6 +896,8 @@ public class AbstractRegionMapTest {
when(owner.getCachePerfStats()).thenReturn(cachePerfStats);
when(owner.getConcurrencyChecksEnabled()).thenReturn(withConcurrencyChecks);
when(owner.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+ when(owner.getScope()).thenReturn(Scope.LOCAL);
+ when(owner.isInitialized()).thenReturn(true);
doThrow(EntryNotFoundException.class).when(owner).checkEntryNotFound(any());
initialize(owner, new Attributes(), null, false);
if (map != null) {
@@ -760,6 +909,88 @@ public class AbstractRegionMapTest {
}
}
+ @Test
+ public void verifyConcurrentCreateHasCorrectResult() throws Exception {
+ CountDownLatch firstCreateAddedUninitializedEntry = new CountDownLatch(1);
+ CountDownLatch secondCreateFoundFirstCreatesEntry = new CountDownLatch(1);
+ TestableBasicPutMap arm = new TestableBasicPutMap(firstCreateAddedUninitializedEntry,
+ secondCreateFoundFirstCreatesEntry);
+ // The key needs to be long enough to not be stored inline on the region entry.
+ String key1 = "lonGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGkey";
+ String key2 = new String(key1);
+
+ Future<RegionEntry> future = doFirstCreateInAnotherThread(arm, key1);
+ if (!firstCreateAddedUninitializedEntry.await(5, TimeUnit.SECONDS)) {
+ // something is wrong with the other thread
+ // so count down the latch it may be waiting
+ // on and then call get to see what went wrong with him.
+ secondCreateFoundFirstCreatesEntry.countDown();
+ fail("other thread took too long. It returned " + future.get());
+ }
+ EntryEventImpl event = createEventForCreate(arm._getOwner(), key2);
+ // now do the second create
+ RegionEntry result = arm.basicPut(event, 0L, true, false, null, false, false);
+
+ RegionEntry resultFromOtherThread = future.get();
+
+ assertThat(result).isNull();
+ assertThat(resultFromOtherThread).isNotNull();
+ assertThat(resultFromOtherThread.getKey()).isSameAs(key1);
+ }
+
+ private EntryEventImpl createEventForCreate(LocalRegion lr, String key) {
+ when(lr.getKeyInfo(key)).thenReturn(new KeyInfo(key, null, null));
+ EntryEventImpl event =
+ EntryEventImpl.create(lr, Operation.CREATE, key, false, null, true, false);
+ event.setNewValue("create_value");
+ return event;
+ }
+
+ private Future<RegionEntry> doFirstCreateInAnotherThread(TestableBasicPutMap arm, String key) {
+ Future<RegionEntry> result = CompletableFuture.supplyAsync(() -> {
+ EntryEventImpl event = createEventForCreate(arm._getOwner(), key);
+ return arm.basicPut(event, 0L, true, false, null, false, false);
+ });
+ return result;
+ }
+
+ private static class TestableBasicPutMap extends TestableAbstractRegionMap {
+ private final CountDownLatch firstCreateAddedUninitializedEntry;
+ private final CountDownLatch secondCreateFoundFirstCreatesEntry;
+ private boolean alreadyCalledPutIfAbsentNewEntry;
+ private boolean alreadyCalledAddRegionEntryToMapAndDoPut;
+
+ public TestableBasicPutMap(CountDownLatch removePhase1Completed,
+ CountDownLatch secondCreateFoundFirstCreatesEntry) {
+ super();
+ this.firstCreateAddedUninitializedEntry = removePhase1Completed;
+ this.secondCreateFoundFirstCreatesEntry = secondCreateFoundFirstCreatesEntry;
+ }
+
+ @Override
+ protected boolean addRegionEntryToMapAndDoPut(final RegionMapPutContext putInfo) {
+ if (!alreadyCalledAddRegionEntryToMapAndDoPut) {
+ alreadyCalledAddRegionEntryToMapAndDoPut = true;
+ } else {
+ this.secondCreateFoundFirstCreatesEntry.countDown();
+ }
+ return super.addRegionEntryToMapAndDoPut(putInfo);
+ }
+
+ @Override
+ protected void putIfAbsentNewEntry(final RegionMapPutContext putInfo) {
+ super.putIfAbsentNewEntry(putInfo);
+ if (!alreadyCalledPutIfAbsentNewEntry) {
+ alreadyCalledPutIfAbsentNewEntry = true;
+ this.firstCreateAddedUninitializedEntry.countDown();
+ try {
+ this.secondCreateFoundFirstCreatesEntry.await(5, TimeUnit.SECONDS);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ }
+ }
+
/**
* TestableVMLRURegionMap
*/
--
To stop receiving notification emails like this one, please contact
dschneider@apache.org.