You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/04/12 18:34:09 UTC

[geode] branch develop updated: GEODE-4957: fix race in concurrent create on region (#1750)

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 65b52f2  GEODE-4957: fix race in concurrent create on region (#1750)
65b52f2 is described below

commit 65b52f28f4e11b9f5ec82b0e457fc0292132e5da
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Thu Apr 12 11:34:02 2018 -0700

    GEODE-4957: fix race in concurrent create on region (#1750)
    
    Refactored basicPut so that a RegionEntry with REMOVE_PHASE1
    is added to the map, it will already be synchronized.
    This prevents a second concurrent threads from "stealing" it and
    creating a region entry with a key that is the same but has a different
    identity.
---
 .../geode/internal/cache/AbstractRegionMap.java    | 576 +++++++++++----------
 .../apache/geode/internal/cache/LocalRegion.java   |   5 -
 .../internal/cache/map/RegionMapPutContext.java    | 194 +++++++
 .../internal/cache/AbstractRegionMapTest.java      | 231 +++++++++
 4 files changed, 728 insertions(+), 278 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 493410b..2c57182 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -29,7 +29,6 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.GemFireIOException;
 import org.apache.geode.InvalidDeltaException;
 import org.apache.geode.cache.CacheEvent;
-import org.apache.geode.cache.CacheWriter;
 import org.apache.geode.cache.CacheWriterException;
 import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.EntryNotFoundException;
@@ -59,6 +58,7 @@ import org.apache.geode.internal.cache.ha.HARegionQueue;
 import org.apache.geode.internal.cache.map.CacheModificationLock;
 import org.apache.geode.internal.cache.map.FocusedRegionMap;
 import org.apache.geode.internal.cache.map.RegionMapDestroy;
+import org.apache.geode.internal.cache.map.RegionMapPutContext;
 import org.apache.geode.internal.cache.persistence.DiskRegionView;
 import org.apache.geode.internal.cache.region.entry.RegionEntryFactoryBuilder;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
@@ -72,7 +72,6 @@ import org.apache.geode.internal.cache.versions.VersionSource;
 import org.apache.geode.internal.cache.versions.VersionStamp;
 import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
-import org.apache.geode.internal.concurrent.MapCallbackAdapter;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
@@ -2165,279 +2164,305 @@ public abstract class AbstractRegionMap
     }
   }
 
-  private RegionEntry getOrCreateRegionEntry(Object ownerRegion, EntryEventImpl event, Object value,
-      MapCallbackAdapter<Object, Object, Object, Object> valueCreator, boolean onlyExisting,
-      boolean returnTombstone) {
-    Object key = event.getKey();
-    RegionEntry retVal = null;
-    retVal = getEntry(event);
-    if (onlyExisting) {
-      if (!returnTombstone && (retVal != null && retVal.isTombstone())) {
-        return null;
-      }
-      return retVal;
-    }
-    if (retVal != null) {
-      return retVal;
-    }
-    if (valueCreator != null) {
-      value = valueCreator.newValue(key, ownerRegion, value, null);
-    }
-    retVal = getEntryFactory().createEntry((RegionEntryContext) ownerRegion, key, value);
-    RegionEntry oldRe = putEntryIfAbsent(key, retVal);
-    if (oldRe != null) {
-      if (retVal instanceof OffHeapRegionEntry) {
-        ((OffHeapRegionEntry) retVal).release();
-      }
-      return oldRe;
-    }
-    return retVal;
-  }
-
   /*
    * returns null if the operation fails
    */
   @Override
-  public RegionEntry basicPut(EntryEventImpl event, final long lastModified, final boolean ifNew,
-      final boolean ifOld, Object expectedOldValue, // only non-null if ifOld
-      boolean requireOldValue, final boolean overwriteDestroyed)
+  public RegionEntry basicPut(EntryEventImpl event, final long unused, final boolean ifNew,
+      final boolean ifOld, final Object expectedOldValue, // only non-null if ifOld
+      final boolean requireOldValue, final boolean overwriteDestroyed)
       throws CacheWriterException, TimeoutException {
 
     final LocalRegion owner = _getOwner();
+    final RegionMapPutContext putInfo = new RegionMapPutContext(owner, event, ifNew, ifOld,
+        overwriteDestroyed, requireOldValue, expectedOldValue);
 
     entryEventSerialization.serializeNewValueIfNeeded(owner, event);
-
-    boolean clearOccured = false;
-    if (owner == null) {
-      // "fix" for bug 32440
-      Assert.assertTrue(false, "The owner for RegionMap " + this + " is null for event " + event);
-    }
-    if (logger.isTraceEnabled(LogMarker.LRU_TOMBSTONE_COUNT_VERBOSE)
-        && !(owner instanceof HARegion)) {
-      logger.trace(LogMarker.LRU_TOMBSTONE_COUNT_VERBOSE,
-          "ARM.basicPut called for {} expectedOldValue={} requireOldValue={} ifNew={} ifOld={} initialized={} overwriteDestroyed={}",
-          event, expectedOldValue, requireOldValue, ifNew, ifOld, owner.isInitialized(),
-          overwriteDestroyed);
-    }
-
-    RegionEntry result = null;
-    long lastModifiedTime = 0;
-    // copy into local var to prevent race condition with setter
-    final CacheWriter cacheWriter = owner.basicGetWriter();
-    final boolean cacheWrite =
-        !event.isOriginRemote() && !event.isNetSearch() && event.isGenerateCallbacks()
-            && (cacheWriter != null || owner.hasServerProxy() || owner.scope.isDistributed());
-    /*
-     * For performance reason, we try to minimize object creation and do as much work as we can
-     * outside of synchronization, especially getting distribution advice.
-     */
-    final Set netWriteRecipients;
-    if (cacheWrite) {
-      if (cacheWriter == null && owner.scope.isDistributed()) {
-        netWriteRecipients =
-            ((DistributedRegion) owner).getCacheDistributionAdvisor().adviseNetWrite();
-      } else {
-        netWriteRecipients = null;
-      }
+    runWhileLockedForCacheModification(event, () -> doBasicPut(putInfo));
+    if (putInfo.isCompleted()) {
+      return putInfo.getRegionEntry();
     } else {
-      netWriteRecipients = null;
+      return null;
     }
+  }
 
-    // mbid: this has been added to maintain consistency between the disk region
-    // and the region map after clear() has been called. This will set the
-    // reference of the diskSegmentRegion as a ThreadLocal so that if the diskRegionSegment
-    // is later changed by another thread, we can do the necessary.
-    boolean uninitialized = !owner.isInitialized();
-    boolean retrieveOldValueForDelta =
-        event.getDeltaBytes() != null && event.getRawNewValue() == null;
-    IndexManager oqlIndexManager = null;
+  private void runWhileLockedForCacheModification(EntryEventImpl event, Runnable r) {
+    final LocalRegion owner = _getOwner();
     lockForCacheModification(owner, event);
     try {
+      r.run();
+    } finally {
+      releaseCacheModificationLock(owner, event);
+    }
+  }
+
+  private void doBasicPut(final RegionMapPutContext putInfo) {
+    try {
+      doWithIndexInUpdateMode(() -> {
+        do {
+          putInfo.setRegionEntry(null);
+          if (!findExistingEntry(putInfo)) {
+            return;
+          }
+          createNewEntryIfNeeded(putInfo);
+        } while (!addRegionEntryToMapAndDoPut(putInfo));
+      });
+    } catch (DiskAccessException dae) {
+      _getOwner().handleDiskAccessException(dae);
+      throw dae;
+    } finally {
+      doAfterPut(putInfo);
+    }
+  }
+
+  private void doWithIndexInUpdateMode(Runnable r) {
+    final IndexManager oqlIndexManager = getInitializedIndexManager();
+    if (oqlIndexManager != null) {
       try {
-        // Fix for Bug #44431. We do NOT want to update the region and wait
-        // later for index INIT as region.clear() can cause inconsistency if
-        // happened in parallel as it also does index INIT.
-        oqlIndexManager = owner.getIndexManager();
-        if (oqlIndexManager != null) {
-          oqlIndexManager.waitForIndexInit();
-        }
+        r.run();
+      } finally {
+        oqlIndexManager.countDownIndexUpdaters();
+      }
+    } else {
+      r.run();
+    }
+  }
 
-        // fix for bug #42169, replace must go to server if entry not on client
-        boolean replaceOnClient =
-            event.getOperation() == Operation.REPLACE && owner.getServerProxy() != null;
-        // Rather than having two different blocks for synchronizing oldRe
-        // and newRe, have only one block and synchronize re
-        RegionEntry re = null;
-        boolean eventRecorded = false;
-        boolean onlyExisting = ifOld && !replaceOnClient;
+  /**
+   * Stores the found entry in putInfo.getRegionEntry.
+   *
+   * @return false if an existing entry was not found and this put requires
+   *         an existing one; otherwise returns true.
+   */
+  private boolean findExistingEntry(final RegionMapPutContext putInfo) {
+    final Object key = putInfo.getEvent().getKey();
+    RegionEntry re = getEntry(key);
+    if (putInfo.isOnlyExisting()) {
+      if (re == null || re.isTombstone()) {
+        return false;
+      }
+    }
+    putInfo.setRegionEntry(re);
+    return true;
+  }
 
-        re = getOrCreateRegionEntry(owner, event,
+  /**
+   * Stores the created entry in putInfo.getRegionEntry.
+   */
+  private void createNewEntryIfNeeded(final RegionMapPutContext putInfo) {
+    putInfo.setCreate(putInfo.getRegionEntry() == null);
+    if (putInfo.isCreate()) {
+      final Object key = putInfo.getEvent().getKey();
+      RegionEntry newEntry = getEntryFactory().createEntry(_getOwner(), key, Token.REMOVED_PHASE1);
+      putInfo.setRegionEntry(newEntry);
+    }
+  }
 
-            Token.REMOVED_PHASE1, null, onlyExisting, false);
-        if (re == null) {
-          return null;
-        }
-        while (true) {
-          synchronized (re) {
-            // if the re goes into removed2 state, it will be removed
-            // from the map. otherwise we can append an event to it
-            // and change its state
-            if (re.isRemovedPhase2()) {
-              _getOwner().getCachePerfStats().incRetries();
-              getEntryMap().remove(event.getKey(), re);
-              re = getOrCreateRegionEntry(owner, event, Token.REMOVED_PHASE1, null, onlyExisting,
-                  false);
-              if (re == null) {
-                // this will happen when onlyExisting is true
-                return null;
-              }
-              continue;
-            } else {
-              @Released
-              Object oldValueForDelta = null;
-              if (retrieveOldValueForDelta) {
-                // defer the lruUpdateCallback to prevent a deadlock (see bug 51121).
-                final boolean disabled = disableLruUpdateCallback();
-                try {
-                  // Old value is faulted in from disk if not found in memory.
-                  oldValueForDelta = re.getValue(owner); // OFFHEAP: if we are synced on oldRe no
-                                                         // issue since we can use ARE's ref
-                } finally {
-                  if (disabled) {
-                    enableLruUpdateCallback();
-                  }
-                }
-              }
+  /**
+   * @return false if caller should retry
+   */
+  protected boolean addRegionEntryToMapAndDoPut(final RegionMapPutContext putInfo) {
+    synchronized (putInfo.getRegionEntry()) {
+      putIfAbsentNewEntry(putInfo);
+      return doPutOnRegionEntry(putInfo);
+    }
+  }
 
-              try {
+  protected void putIfAbsentNewEntry(final RegionMapPutContext putInfo) {
+    if (putInfo.isCreate()) {
+      RegionEntry oldRe = putEntryIfAbsent(putInfo.getEvent().getKey(), putInfo.getRegionEntry());
+      if (oldRe != null) {
+        putInfo.setCreate(false);
+        putInfo.setRegionEntry(oldRe);
+      }
+    }
+  }
 
-                event.setRegionEntry(re);
-                // set old value in event
-                setOldValueInEvent(event, re, cacheWrite, requireOldValue);
-                if (!continueUpdate(re, event, ifOld, replaceOnClient)) {
-                  return null;
-                }
-                // overwrite destroyed?
-                if (!continueOverwriteDestroyed(re, event, overwriteDestroyed, ifNew)) {
-                  return null;
-                }
-                // check expectedOldValue
-                if (!satisfiesExpectedOldValue(event, re, expectedOldValue, replaceOnClient)) {
-                  return null;
-                }
-                // invoke cacheWriter
-                invokeCacheWriter(re, event, cacheWrite, cacheWriter, netWriteRecipients,
-                    requireOldValue, expectedOldValue, replaceOnClient);
+  /**
+   * @return false if caller should retry
+   */
+  private boolean doPutOnRegionEntry(final RegionMapPutContext putInfo) {
+    final RegionEntry re = putInfo.getRegionEntry();
 
-                // notify index of an update
-                notifyIndex(re, true);
-                try {
-                  try {
-                    if ((cacheWrite && event.getOperation().isUpdate()) // if there is a
-                                                                        // cacheWriter, type of
-                                                                        // event has already been
-                                                                        // set
-                        || !re.isRemoved() || replaceOnClient) {
-                      // update
-                      updateEntry(event, requireOldValue, oldValueForDelta, re);
-                    } else {
-                      // create
-                      createEntry(event, owner, re);
-                    }
-                    owner.recordEvent(event);
-                    eventRecorded = true;
-                  } catch (RegionClearedException rce) {
-                    clearOccured = true;
-                    owner.recordEvent(event);
-                  } catch (ConcurrentCacheModificationException ccme) {
-                    VersionTag tag = event.getVersionTag();
-                    if (tag != null && tag.isTimeStampUpdated()) {
-                      // Notify gateways of new time-stamp.
-                      owner.notifyTimestampsToGateways(event);
-                    }
-                    throw ccme;
-                  }
-                  if (uninitialized) {
-                    event.inhibitCacheListenerNotification(true);
-                  }
-                  updateLru(clearOccured, re, event);
+    synchronized (re) {
+      if (isRegionEntryRemoved(putInfo)) {
+        return false;
+      }
 
-                  lastModifiedTime = owner.basicPutPart2(event, re, !uninitialized,
-                      lastModifiedTime, clearOccured);
-                } finally {
-                  notifyIndex(re, false);
-                }
-                result = re;
-                break;
-              } finally {
-                OffHeapHelper.release(oldValueForDelta);
-                if (re != null && !onlyExisting && !isOpComplete(re, event)) {
-                  owner.cleanUpOnIncompleteOp(event, re);
-                } else if (re != null && owner.isUsedForPartitionedRegionBucket()) {
-                  BucketRegion br = (BucketRegion) owner;
-                  CachePerfStats stats = br.getPartitionedRegion().getCachePerfStats();
-                }
-              } // try
-            }
-          } // sync re
-        } // end while
-      } catch (DiskAccessException dae) {
-        // Asif:Feel that it is safe to destroy the region here as there appears
-        // to be no chance of deadlock during region destruction
-        result = null;
-        this._getOwner().handleDiskAccessException(dae);
-        throw dae;
+      setOldValueForDelta(putInfo);
+      try {
+        setOldValueInEvent(putInfo);
+        doCreateOrUpdate(putInfo);
+        return true;
       } finally {
-        if (oqlIndexManager != null) {
-          oqlIndexManager.countDownIndexUpdaters();
+        OffHeapHelper.release(putInfo.getOldValueForDelta());
+        putInfo.setOldValueForDelta(null);
+        if (!putInfo.isCompleted() && putInfo.isCreate()) {
+          // Region entry remove needs to be done while still synced on re.
+          removeEntry(putInfo.getEvent().getKey(), re, false);
         }
-        if (result != null) {
+      }
+    }
+  }
+
+  private IndexManager getInitializedIndexManager() {
+    IndexManager oqlIndexManager;
+    // Fix for Bug #44431. We do NOT want to update the region and wait
+    // later for index INIT as region.clear() can cause inconsistency if
+    // happened in parallel as it also does index INIT.
+    oqlIndexManager = _getOwner().getIndexManager();
+    if (oqlIndexManager != null) {
+      oqlIndexManager.waitForIndexInit();
+    }
+    return oqlIndexManager;
+  }
+
+  private void doAfterPut(RegionMapPutContext putInfo) {
+    if (putInfo.isCompleted()) {
+      final LocalRegion owner = _getOwner();
+      try {
+        final boolean invokeListeners = putInfo.getEvent().basicGetNewValue() != Token.TOMBSTONE;
+        owner.basicPutPart3(putInfo.getEvent(), putInfo.getRegionEntry(),
+            !putInfo.isUninitialized(), putInfo.getLastModifiedTime(), invokeListeners,
+            putInfo.isIfNew(), putInfo.isIfOld(), putInfo.getExpectedOldValue(),
+            putInfo.isRequireOldValue());
+      } finally {
+        if (!putInfo.getClearOccured()) {
           try {
-            // Note we do distribution after releasing all sync to avoid deadlock
-            final boolean invokeListeners = event.basicGetNewValue() != Token.TOMBSTONE;
-            owner.basicPutPart3(event, result, !uninitialized, lastModifiedTime, invokeListeners,
-                ifNew, ifOld, expectedOldValue, requireOldValue);
-          } finally {
-            // bug 32589, post update may throw an exception if exception occurs
-            // for any recipients
-            if (!clearOccured) {
-              try {
-                lruUpdateCallback();
-              } catch (DiskAccessException dae) {
-                // Asif:Feel that it is safe to destroy the region here as there appears
-                // to be no chance of deadlock during region destruction
-                result = null;
-                this._getOwner().handleDiskAccessException(dae);
-                throw dae;
-              }
-            }
-          } // finally
-        } else {
-          resetThreadLocals();
+            lruUpdateCallback();
+          } catch (DiskAccessException dae) {
+            owner.handleDiskAccessException(dae);
+            throw dae;
+          }
         }
       }
+    } else {
+      resetThreadLocals();
+    }
+  }
+
+  /**
+   * @return false if an early out check indicated that
+   *         the put should not be done.
+   */
+  private boolean shouldPutContinue(final RegionMapPutContext putInfo) {
+    if (continueUpdate(putInfo) && continueOverwriteDestroyed(putInfo)
+        && satisfiesExpectedOldValue(putInfo)) {
+      return true;
+    }
+    return false;
+  }
+
+  private void doCreateOrUpdate(final RegionMapPutContext putInfo) {
+    if (!shouldPutContinue(putInfo)) {
+      return;
+    }
+    invokeCacheWriter(putInfo);
+
+    runWithIndexUpdatingInProgress(putInfo, () -> {
+      final EntryEventImpl event = putInfo.getEvent();
+      createOrUpdateEntry(putInfo);
+      if (putInfo.isUninitialized()) {
+        event.inhibitCacheListenerNotification(true);
+      }
+      updateLru(putInfo);
+
+      final RegionEntry re = putInfo.getRegionEntry();
+      long lastModTime = _getOwner().basicPutPart2(event, re, !putInfo.isUninitialized(),
+          putInfo.getLastModifiedTime(), putInfo.getClearOccured());
+      putInfo.setLastModifiedTime(lastModTime);
+      putInfo.setCompleted(true);
+    });
+  }
+
+  private void runWithIndexUpdatingInProgress(RegionMapPutContext putInfo, Runnable r) {
+    final RegionEntry re = putInfo.getRegionEntry();
+    notifyIndex(re, true);
+    try {
+      r.run();
     } finally {
-      releaseCacheModificationLock(owner, event);
+      notifyIndex(re, false);
+    }
+  }
+
+  private void createOrUpdateEntry(final RegionMapPutContext putInfo) {
+    final EntryEventImpl event = putInfo.getEvent();
+    final LocalRegion owner = _getOwner();
+    try {
+      if (isUpdate(putInfo)) {
+        updateEntry(putInfo);
+      } else {
+        createEntry(putInfo);
+      }
+      owner.recordEvent(event);
+    } catch (RegionClearedException rce) {
+      putInfo.setClearOccured(true);
+      owner.recordEvent(event);
+    } catch (ConcurrentCacheModificationException ccme) {
+      VersionTag tag = event.getVersionTag();
+      if (tag != null && tag.isTimeStampUpdated()) {
+        owner.notifyTimestampsToGateways(event);
+      }
+      throw ccme;
+    }
+  }
+
+  private boolean isUpdate(final RegionMapPutContext putInfo) {
+    if (putInfo.isCacheWrite() && putInfo.getEvent().getOperation().isUpdate()) {
+      // if there is a cacheWriter, type of event has already been set
+      return true;
+    }
+    if (putInfo.isReplaceOnClient()) {
+      return true;
+    }
+    if (!putInfo.getRegionEntry().isRemoved()) {
+      return true;
+    }
+    return false;
+  }
+
+  private void setOldValueForDelta(final RegionMapPutContext putInfo) {
+    if (putInfo.isRetrieveOldValueForDelta()) {
+      runWhileEvictionDisabled(() -> {
+        // Old value is faulted in from disk if not found in memory.
+        putInfo.setOldValueForDelta(putInfo.getRegionEntry().getValue(_getOwner()));
+        // OFFHEAP: if we are synced on region entry no issue since we can use ARE's ref
+      });
+    }
+  }
+
+  private void runWhileEvictionDisabled(Runnable r) {
+    final boolean disabled = disableLruUpdateCallback();
+    try {
+      r.run();
+    } finally {
+      if (disabled) {
+        enableLruUpdateCallback();
+      }
     }
-    return result;
   }
 
   /**
-   * If the value in the VM is still REMOVED_PHASE1 Token, then the operation was not completed (due
-   * to cacheWriter exception, concurrentMap operation) etc.
+   * If the re goes into removed2 state, it will be removed from the map.
+   *
+   * @return true if re was remove phase 2
    */
-  private boolean isOpComplete(RegionEntry re, EntryEventImpl event) {
-    if (re.getValueAsToken() == Token.REMOVED_PHASE1) {
+  private boolean isRegionEntryRemoved(final RegionMapPutContext putInfo) {
+    final RegionEntry re = putInfo.getRegionEntry();
+    if (re.isRemovedPhase2()) {
+      _getOwner().getCachePerfStats().incRetries();
+      getEntryMap().remove(putInfo.getEvent().getKey(), re);
+      return true;
+    } else {
       return false;
     }
-    return true;
   }
 
-  private boolean satisfiesExpectedOldValue(EntryEventImpl event, RegionEntry re,
-      Object expectedOldValue, boolean replaceOnClient) {
+  private boolean satisfiesExpectedOldValue(final RegionMapPutContext putInfo) {
     // replace is propagated to server, so no need to check
     // satisfiesOldValue on client
-    if (expectedOldValue != null && !replaceOnClient) {
+    final EntryEventImpl event = putInfo.getEvent();
+    if (putInfo.getExpectedOldValue() != null && !putInfo.isReplaceOnClient()) {
       assert event.getOperation().guaranteesOldValue();
       // We already called setOldValueInEvent so the event will have the old value.
       @Unretained
@@ -2445,7 +2470,8 @@ public abstract class AbstractRegionMap
       // Note that v will be null instead of INVALID because setOldValue
       // converts INVALID to null.
       // But checkExpectedOldValue handle this and says INVALID equals null.
-      if (!AbstractRegionEntry.checkExpectedOldValue(expectedOldValue, v, event.getRegion())) {
+      if (!AbstractRegionEntry.checkExpectedOldValue(putInfo.getExpectedOldValue(), v,
+          event.getRegion())) {
         return false;
       }
     }
@@ -2453,10 +2479,12 @@ public abstract class AbstractRegionMap
   }
 
   // PRECONDITION: caller must be synced on re
-  private void setOldValueInEvent(EntryEventImpl event, RegionEntry re, boolean cacheWrite,
-      boolean requireOldValue) {
-    boolean needToSetOldValue =
-        cacheWrite || requireOldValue || event.getOperation().guaranteesOldValue();
+  private void setOldValueInEvent(final RegionMapPutContext putInfo) {
+    final EntryEventImpl event = putInfo.getEvent();
+    final RegionEntry re = putInfo.getRegionEntry();
+    event.setRegionEntry(re);
+    boolean needToSetOldValue = putInfo.isCacheWrite() || putInfo.isRequireOldValue()
+        || event.getOperation().guaranteesOldValue();
     if (needToSetOldValue) {
       if (event.getOperation().guaranteesOldValue()) {
         // In these cases we want to even get the old value from disk if it is not in memory
@@ -2502,8 +2530,10 @@ public abstract class AbstractRegionMap
     }
   }
 
-  protected void createEntry(EntryEventImpl event, final LocalRegion owner, RegionEntry re)
-      throws RegionClearedException {
+  private void createEntry(final RegionMapPutContext putInfo) throws RegionClearedException {
+    final LocalRegion owner = _getOwner();
+    final EntryEventImpl event = putInfo.getEvent();
+    final RegionEntry re = putInfo.getRegionEntry();
     final boolean wasTombstone = re.isTombstone();
     processVersionTag(re, event);
     event.putNewEntry(owner, re);
@@ -2513,22 +2543,24 @@ public abstract class AbstractRegionMap
     }
   }
 
-  protected void updateEntry(EntryEventImpl event, boolean requireOldValue, Object oldValueForDelta,
-      RegionEntry re) throws RegionClearedException {
-    final int oldSize = event.getRegion().calculateRegionEntryValueSize(re);
+  private void updateEntry(final RegionMapPutContext putInfo) throws RegionClearedException {
+    final EntryEventImpl event = putInfo.getEvent();
+    final RegionEntry re = putInfo.getRegionEntry();
     final boolean wasTombstone = re.isTombstone();
+    final int oldSize = event.getRegion().calculateRegionEntryValueSize(re);
     processVersionTag(re, event);
-    event.putExistingEntry(event.getRegion(), re, requireOldValue, oldValueForDelta);
+    event.putExistingEntry(event.getRegion(), re, putInfo.isRequireOldValue(),
+        putInfo.getOldValueForDelta());
     EntryLogger.logPut(event);
     updateSize(event, oldSize, true/* isUpdate */, wasTombstone);
   }
 
-  private void updateLru(boolean clearOccured, RegionEntry re, EntryEventImpl event) {
-    if (!clearOccured) {
-      if (event.getOperation().isCreate()) {
-        lruEntryCreate(re);
+  private void updateLru(final RegionMapPutContext putInfo) {
+    if (!putInfo.getClearOccured()) {
+      if (putInfo.getEvent().getOperation().isCreate()) {
+        lruEntryCreate(putInfo.getRegionEntry());
       } else {
-        lruEntryUpdate(re);
+        lruEntryUpdate(putInfo.getRegionEntry());
       }
     }
   }
@@ -2551,52 +2583,50 @@ public abstract class AbstractRegionMap
     }
   }
 
-  private void invokeCacheWriter(RegionEntry re, EntryEventImpl event, boolean cacheWrite,
-      CacheWriter cacheWriter, Set netWriteRecipients, boolean requireOldValue,
-      Object expectedOldValue, boolean replaceOnClient) {
+  private void invokeCacheWriter(RegionMapPutContext putInfo) {
+    final EntryEventImpl event = putInfo.getEvent();
     // invoke listeners only if region is initialized
-    if (_getOwner().isInitialized() && cacheWrite) {
+    if (_getOwner().isInitialized() && putInfo.isCacheWrite()) {
       // event.setOldValue already called in setOldValueInEvent
 
       // bug #42638 for replaceOnClient, do not make the event create
       // or update since replace must propagate to server
-      if (!replaceOnClient) {
-        if (re.isDestroyedOrRemoved()) {
+      if (!putInfo.isReplaceOnClient()) {
+        if (putInfo.getRegionEntry().isDestroyedOrRemoved()) {
           event.makeCreate();
         } else {
           event.makeUpdate();
         }
       }
-      _getOwner().cacheWriteBeforePut(event, netWriteRecipients, cacheWriter, requireOldValue,
-          expectedOldValue);
+      _getOwner().cacheWriteBeforePut(event, putInfo.getNetWriteRecipients(),
+          putInfo.getCacheWriter(), putInfo.isRequireOldValue(), putInfo.getExpectedOldValue());
     }
-    if (!_getOwner().isInitialized() && !cacheWrite) {
-      // block setting of old value in putNewValueNoSync, don't
-      // need it
+    if (!_getOwner().isInitialized() && !putInfo.isCacheWrite()) {
+      // block setting of old value in putNewValueNoSync, don't need it
       event.oldValueNotAvailable();
     }
   }
 
-  private boolean continueOverwriteDestroyed(RegionEntry re, EntryEventImpl event,
-      boolean overwriteDestroyed, boolean ifNew) {
-    Token oldValueInVM = re.getValueAsToken();
+  private boolean continueOverwriteDestroyed(final RegionMapPutContext putInfo) {
+    Token oldValueInVM = putInfo.getRegionEntry().getValueAsToken();
     // if region is under GII, check if token is destroyed
-    if (!overwriteDestroyed) {
+    if (!putInfo.isOverwriteDestroyed()) {
       if (!_getOwner().isInitialized()
           && (oldValueInVM == Token.DESTROYED || oldValueInVM == Token.TOMBSTONE)) {
-        event.setOldValueDestroyedToken();
+        putInfo.getEvent().setOldValueDestroyedToken();
         return false;
       }
     }
-    if (ifNew && !Token.isRemoved(oldValueInVM)) {
+    if (putInfo.isIfNew() && !Token.isRemoved(oldValueInVM)) {
       return false;
     }
     return true;
   }
 
-  private boolean continueUpdate(RegionEntry re, EntryEventImpl event, boolean ifOld,
-      boolean replaceOnClient) {
-    if (ifOld) {
+  private boolean continueUpdate(final RegionMapPutContext putInfo) {
+    if (putInfo.isIfOld()) {
+      final EntryEventImpl event = putInfo.getEvent();
+      final RegionEntry re = putInfo.getRegionEntry();
       // only update, so just do tombstone maintainence and exit
       if (re.isTombstone() && event.getVersionTag() != null) {
         // refresh the tombstone so it doesn't time out too soon
@@ -2610,7 +2640,7 @@ public abstract class AbstractRegionMap
         _getOwner().rescheduleTombstone(re, re.getVersionStamp().asVersionTag());
         return false;
       }
-      if (re.isRemoved() && !replaceOnClient) {
+      if (re.isRemoved() && !putInfo.isReplaceOnClient()) {
         return false;
       }
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index acb5e06..45a896f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -7322,11 +7322,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     }
   }
 
-  void cleanUpOnIncompleteOp(EntryEventImpl event, RegionEntry regionEntry) {
-    // Ok to remove entry as index has not been modified yet by the operation
-    this.entries.removeEntry(event.getKey(), regionEntry, false);
-  }
-
   public static void validateRegionName(String name, InternalRegionArguments internalRegionArgs) {
     if (name == null) {
       throw new IllegalArgumentException(
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPutContext.java b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPutContext.java
new file mode 100644
index 0000000..f8a2778
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapPutContext.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.map;
+
+import java.util.Set;
+
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.RegionEntry;
+import org.apache.geode.internal.offheap.annotations.Released;
+
+/**
+ * This class is used to hold a bunch of immutable items
+ * used by AbstractRegionMap when doing a "put" operation.
+ * It also has a few mutable items updates while a "put"
+ * operation is being done.
+ */
+public class RegionMapPutContext {
+  private final EntryEventImpl event;
+  private final boolean ifNew;
+  private final boolean ifOld;
+  private final boolean overwriteDestroyed;
+  private final boolean requireOldValue;
+  private final boolean uninitialized;
+  private final boolean retrieveOldValueForDelta;
+  private final boolean replaceOnClient;
+  private final boolean onlyExisting;
+  private final boolean cacheWrite;
+  private final CacheWriter cacheWriter;
+  private final Set netWriteRecipients;
+  private final Object expectedOldValue;
+  private boolean clearOccured;
+  private long lastModifiedTime;
+  private RegionEntry regionEntry;
+  private boolean create;
+  private boolean completed;
+  @Released
+  private Object oldValueForDelta;
+
+  public RegionMapPutContext(LocalRegion owner, EntryEventImpl event, boolean ifNew, boolean ifOld,
+      boolean overwriteDestroyed, boolean requireOldValue,
+
+      Object expectedOldValue) {
+    if (owner == null) {
+      // "fix" for bug 32440
+      Assert.assertTrue(false, "The owner for RegionMap " + this + " is null for event " + event);
+    }
+    this.event = event;
+    this.ifNew = ifNew;
+    this.ifOld = ifOld;
+    this.overwriteDestroyed = overwriteDestroyed;
+    this.requireOldValue = requireOldValue;
+    this.uninitialized = !owner.isInitialized();
+    this.retrieveOldValueForDelta = event.getDeltaBytes() != null && event.getRawNewValue() == null;
+    this.replaceOnClient =
+        event.getOperation() == Operation.REPLACE && owner.getServerProxy() != null;
+    this.onlyExisting = ifOld && !isReplaceOnClient();
+    this.cacheWriter = owner.basicGetWriter();
+    this.cacheWrite = !event.isOriginRemote() && !event.isNetSearch() && event.isGenerateCallbacks()
+        && (getCacheWriter() != null || owner.hasServerProxy() || owner.getScope().isDistributed());
+    this.expectedOldValue = expectedOldValue;
+    if (isCacheWrite()) {
+      if (getCacheWriter() == null && owner.getScope().isDistributed()) {
+        this.netWriteRecipients =
+            ((DistributedRegion) owner).getCacheDistributionAdvisor().adviseNetWrite();
+      } else {
+        this.netWriteRecipients = null;
+      }
+    } else {
+      this.netWriteRecipients = null;
+    }
+  }
+
+  public boolean isIfNew() {
+    return ifNew;
+  }
+
+  public boolean isIfOld() {
+    return ifOld;
+  }
+
+  public boolean isOverwriteDestroyed() {
+    return overwriteDestroyed;
+  }
+
+  public boolean isRequireOldValue() {
+    return requireOldValue;
+  }
+
+  public boolean isUninitialized() {
+    return uninitialized;
+  }
+
+  public boolean isRetrieveOldValueForDelta() {
+    return retrieveOldValueForDelta;
+  }
+
+  public boolean isReplaceOnClient() {
+    return replaceOnClient;
+  }
+
+  public boolean isOnlyExisting() {
+    return onlyExisting;
+  }
+
+  public boolean isCacheWrite() {
+    return cacheWrite;
+  }
+
+  public CacheWriter getCacheWriter() {
+    return cacheWriter;
+  }
+
+  public Set getNetWriteRecipients() {
+    return netWriteRecipients;
+  }
+
+  public Object getExpectedOldValue() {
+    return expectedOldValue;
+  }
+
+  public boolean getClearOccured() {
+    return clearOccured;
+  }
+
+  public void setClearOccured(boolean clearOccured) {
+    this.clearOccured = clearOccured;
+  }
+
+  public long getLastModifiedTime() {
+    return lastModifiedTime;
+  }
+
+  public void setLastModifiedTime(long lastModifiedTime) {
+    this.lastModifiedTime = lastModifiedTime;
+  }
+
+  public RegionEntry getRegionEntry() {
+    return regionEntry;
+  }
+
+  public void setRegionEntry(RegionEntry regionEntry) {
+    this.regionEntry = regionEntry;
+  }
+
+  /**
+   * @return true if put created a new entry; false if it updated an existing one.
+   */
+  public boolean isCreate() {
+    return create;
+  }
+
+  public void setCreate(boolean v) {
+    this.create = v;
+  }
+
+  public EntryEventImpl getEvent() {
+    return event;
+  }
+
+  public boolean isCompleted() {
+    return this.completed;
+  }
+
+  public void setCompleted(boolean b) {
+    this.completed = b;
+  }
+
+  public Object getOldValueForDelta() {
+    return this.oldValueForDelta;
+  }
+
+  public void setOldValueForDelta(Object value) {
+    this.oldValueForDelta = value;
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
index d3053b0..ed5ccf2 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
@@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -30,6 +32,13 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
 import org.junit.After;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -38,10 +47,12 @@ import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Scope;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.eviction.EvictableEntry;
 import org.apache.geode.internal.cache.eviction.EvictionController;
 import org.apache.geode.internal.cache.eviction.EvictionCounters;
+import org.apache.geode.internal.cache.map.RegionMapPutContext;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.cache.versions.VersionHolder;
 import org.apache.geode.internal.cache.versions.VersionTag;
@@ -105,6 +116,7 @@ public class AbstractRegionMapTest {
 
     // invalidate on region that is not initialized should create
     // entry in map as invalid.
+    when(arm._getOwner().isInitialized()).thenReturn(false);
     assertThatThrownBy(() -> arm.invalidate(event, true, false, false))
         .isInstanceOf(EntryNotFoundException.class);
 
@@ -122,6 +134,7 @@ public class AbstractRegionMapTest {
 
     // invalidate on region that is not initialized should create
     // entry in map as invalid.
+    when(arm._getOwner().isInitialized()).thenReturn(false);
     assertThatThrownBy(() -> arm.invalidate(event, true, false, false))
         .isInstanceOf(EntryNotFoundException.class);
 
@@ -158,6 +171,7 @@ public class AbstractRegionMapTest {
 
       // invalidate on region that is not initialized should create
       // entry in map as invalid.
+      when(arm._getOwner().isInitialized()).thenReturn(false);
       assertTrue(arm.invalidate(event, true, true, false));
       verify(arm._getOwner(), times(1)).basicInvalidatePart2(any(), any(), anyBoolean(),
           anyBoolean());
@@ -728,6 +742,139 @@ public class AbstractRegionMapTest {
     assertEquals(re.getValueAsToken(), token);
   }
 
+  @Test
+  public void updateOnEmptyMapReturnsNull() {
+    final TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
+    final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
+
+    RegionEntry result = arm.basicPut(event, 0L, false, true, null, false, false);
+
+    assertThat(result).isNull();
+    verify(arm._getOwner(), never()).basicPutPart2(any(), any(), anyBoolean(), anyLong(),
+        anyBoolean());
+    verify(arm._getOwner(), never()).basicPutPart3(any(), any(), anyBoolean(), anyLong(),
+        anyBoolean(), anyBoolean(), anyBoolean(), any(), anyBoolean());
+  }
+
+  @Test
+  public void createOnExistingEntryReturnsNull() {
+    final TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
+    // do a create to get a region entry in the map
+    arm.basicPut(createEventForCreate(arm._getOwner(), "key"), 0L, true, false, null, false, false);
+    final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
+    final boolean ifNew = true;
+    final boolean ifOld = false;
+    final boolean requireOldValue = false;
+    final Object expectedOldValue = null;
+
+    RegionEntry result =
+        arm.basicPut(event, 0L, ifNew, ifOld, expectedOldValue, requireOldValue, false);
+    assertThat(result).isNull();
+    verify(arm._getOwner(), never()).basicPutPart2(eq(event), any(), anyBoolean(), anyLong(),
+        anyBoolean());
+    verify(arm._getOwner(), never()).basicPutPart3(eq(event), any(), anyBoolean(), anyLong(),
+        anyBoolean(), anyBoolean(), anyBoolean(), any(), anyBoolean());
+  }
+
+  @Test
+  public void createOnEntryReturnedFromPutIfAbsentDoesNothing() {
+    CustomEntryConcurrentHashMap<String, RegionEntry> map =
+        mock(CustomEntryConcurrentHashMap.class);
+    RegionEntry entry = mock(RegionEntry.class);
+    when(entry.getValueAsToken()).thenReturn(Token.NOT_A_TOKEN);
+    when(map.get(KEY)).thenReturn(null);
+    when(map.putIfAbsent((String) eq(KEY), any())).thenReturn(entry);
+    final TestableAbstractRegionMap arm = new TestableAbstractRegionMap(false, map, null);
+    final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
+    final boolean ifNew = true;
+    final boolean ifOld = false;
+    final boolean requireOldValue = false;
+    final Object expectedOldValue = null;
+
+    RegionEntry result =
+        arm.basicPut(event, 0L, ifNew, ifOld, expectedOldValue, requireOldValue, false);
+    assertThat(result).isNull();
+    verify(arm._getOwner(), never()).basicPutPart2(eq(event), any(), anyBoolean(), anyLong(),
+        anyBoolean());
+    verify(arm._getOwner(), never()).basicPutPart3(eq(event), any(), anyBoolean(), anyLong(),
+        anyBoolean(), anyBoolean(), anyBoolean(), any(), anyBoolean());
+  }
+
+  @Test
+  public void createOnExistingEntryWithRemovePhase2DoesCreate() throws RegionClearedException {
+    final TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
+    // do a create to get a region entry in the map
+    RegionEntry createdEntry = arm.basicPut(createEventForCreate(arm._getOwner(), "key"), 0L, true,
+        false, null, false, false);
+    createdEntry.setValue(null, Token.REMOVED_PHASE2);
+    final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
+    event.setNewValue("create");
+    final boolean ifNew = true;
+    final boolean ifOld = false;
+    final boolean requireOldValue = false;
+    final Object expectedOldValue = null;
+
+    RegionEntry result =
+        arm.basicPut(event, 0L, ifNew, ifOld, expectedOldValue, requireOldValue, false);
+
+    assertThat(result).isNotNull();
+    assertThat(result).isNotSameAs(createdEntry);
+    assertThat(result.getKey()).isEqualTo("key");
+    assertThat(result.getValue()).isEqualTo("create");
+    verify(arm._getOwner(), times(1)).basicPutPart2(eq(event), eq(result), eq(true), anyLong(),
+        eq(false));
+    verify(arm._getOwner(), times(1)).basicPutPart3(eq(event), eq(result), eq(true), anyLong(),
+        eq(true), eq(ifNew), eq(ifOld), eq(expectedOldValue), eq(requireOldValue));
+  }
+
+
+  @Test
+  public void updateOnExistingEntryDoesUpdate() {
+    final TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
+    // do a create to get a region entry in the map
+    RegionEntry createdEntry = arm.basicPut(createEventForCreate(arm._getOwner(), "key"), 0L, true,
+        false, null, false, false);
+    final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
+    event.setNewValue("update");
+    final boolean ifNew = false;
+    final boolean ifOld = true;
+    final boolean requireOldValue = false;
+    final Object expectedOldValue = null;
+
+    RegionEntry result =
+        arm.basicPut(event, 0L, ifNew, ifOld, expectedOldValue, requireOldValue, false);
+
+    assertThat(result).isSameAs(createdEntry);
+    assertThat(result.getKey()).isEqualTo("key");
+    assertThat(result.getValue()).isEqualTo("update");
+    verify(arm._getOwner(), times(1)).basicPutPart2(eq(event), eq(result), eq(true), anyLong(),
+        eq(false));
+    verify(arm._getOwner(), times(1)).basicPutPart3(eq(event), eq(result), eq(true), anyLong(),
+        eq(true), eq(ifNew), eq(ifOld), eq(expectedOldValue), eq(requireOldValue));
+  }
+
+  @Test
+  public void createOnEmptyMapAddsEntry() {
+    final TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
+    final EntryEventImpl event = createEventForCreate(arm._getOwner(), "key");
+    event.setNewValue("value");
+    final boolean ifNew = true;
+    final boolean ifOld = false;
+    final boolean requireOldValue = false;
+    final Object expectedOldValue = null;
+
+    RegionEntry result =
+        arm.basicPut(event, 0L, ifNew, ifOld, expectedOldValue, requireOldValue, false);
+
+    assertThat(result).isNotNull();
+    assertThat(result.getKey()).isEqualTo("key");
+    assertThat(result.getValue()).isEqualTo("value");
+    verify(arm._getOwner(), times(1)).basicPutPart2(eq(event), eq(result), eq(true), anyLong(),
+        eq(false));
+    verify(arm._getOwner(), times(1)).basicPutPart3(eq(event), eq(result), eq(true), anyLong(),
+        eq(true), eq(ifNew), eq(ifOld), eq(expectedOldValue), eq(requireOldValue));
+  }
+
   /**
    * TestableAbstractRegionMap
    */
@@ -749,6 +896,8 @@ public class AbstractRegionMapTest {
       when(owner.getCachePerfStats()).thenReturn(cachePerfStats);
       when(owner.getConcurrencyChecksEnabled()).thenReturn(withConcurrencyChecks);
       when(owner.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+      when(owner.getScope()).thenReturn(Scope.LOCAL);
+      when(owner.isInitialized()).thenReturn(true);
       doThrow(EntryNotFoundException.class).when(owner).checkEntryNotFound(any());
       initialize(owner, new Attributes(), null, false);
       if (map != null) {
@@ -760,6 +909,88 @@ public class AbstractRegionMapTest {
     }
   }
 
+  @Test
+  public void verifyConcurrentCreateHasCorrectResult() throws Exception {
+    CountDownLatch firstCreateAddedUninitializedEntry = new CountDownLatch(1);
+    CountDownLatch secondCreateFoundFirstCreatesEntry = new CountDownLatch(1);
+    TestableBasicPutMap arm = new TestableBasicPutMap(firstCreateAddedUninitializedEntry,
+        secondCreateFoundFirstCreatesEntry);
+    // The key needs to be long enough to not be stored inline on the region entry.
+    String key1 = "lonGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGkey";
+    String key2 = new String(key1);
+
+    Future<RegionEntry> future = doFirstCreateInAnotherThread(arm, key1);
+    if (!firstCreateAddedUninitializedEntry.await(5, TimeUnit.SECONDS)) {
+      // something is wrong with the other thread
+      // so count down the latch it may be waiting
+      // on and then call get to see what went wrong with him.
+      secondCreateFoundFirstCreatesEntry.countDown();
+      fail("other thread took too long. It returned " + future.get());
+    }
+    EntryEventImpl event = createEventForCreate(arm._getOwner(), key2);
+    // now do the second create
+    RegionEntry result = arm.basicPut(event, 0L, true, false, null, false, false);
+
+    RegionEntry resultFromOtherThread = future.get();
+
+    assertThat(result).isNull();
+    assertThat(resultFromOtherThread).isNotNull();
+    assertThat(resultFromOtherThread.getKey()).isSameAs(key1);
+  }
+
+  private EntryEventImpl createEventForCreate(LocalRegion lr, String key) {
+    when(lr.getKeyInfo(key)).thenReturn(new KeyInfo(key, null, null));
+    EntryEventImpl event =
+        EntryEventImpl.create(lr, Operation.CREATE, key, false, null, true, false);
+    event.setNewValue("create_value");
+    return event;
+  }
+
+  private Future<RegionEntry> doFirstCreateInAnotherThread(TestableBasicPutMap arm, String key) {
+    Future<RegionEntry> result = CompletableFuture.supplyAsync(() -> {
+      EntryEventImpl event = createEventForCreate(arm._getOwner(), key);
+      return arm.basicPut(event, 0L, true, false, null, false, false);
+    });
+    return result;
+  }
+
+  private static class TestableBasicPutMap extends TestableAbstractRegionMap {
+    private final CountDownLatch firstCreateAddedUninitializedEntry;
+    private final CountDownLatch secondCreateFoundFirstCreatesEntry;
+    private boolean alreadyCalledPutIfAbsentNewEntry;
+    private boolean alreadyCalledAddRegionEntryToMapAndDoPut;
+
+    public TestableBasicPutMap(CountDownLatch removePhase1Completed,
+        CountDownLatch secondCreateFoundFirstCreatesEntry) {
+      super();
+      this.firstCreateAddedUninitializedEntry = removePhase1Completed;
+      this.secondCreateFoundFirstCreatesEntry = secondCreateFoundFirstCreatesEntry;
+    }
+
+    @Override
+    protected boolean addRegionEntryToMapAndDoPut(final RegionMapPutContext putInfo) {
+      if (!alreadyCalledAddRegionEntryToMapAndDoPut) {
+        alreadyCalledAddRegionEntryToMapAndDoPut = true;
+      } else {
+        this.secondCreateFoundFirstCreatesEntry.countDown();
+      }
+      return super.addRegionEntryToMapAndDoPut(putInfo);
+    }
+
+    @Override
+    protected void putIfAbsentNewEntry(final RegionMapPutContext putInfo) {
+      super.putIfAbsentNewEntry(putInfo);
+      if (!alreadyCalledPutIfAbsentNewEntry) {
+        alreadyCalledPutIfAbsentNewEntry = true;
+        this.firstCreateAddedUninitializedEntry.countDown();
+        try {
+          this.secondCreateFoundFirstCreatesEntry.await(5, TimeUnit.SECONDS);
+        } catch (InterruptedException ignore) {
+        }
+      }
+    }
+  }
+
   /**
    * TestableVMLRURegionMap
    */

-- 
To stop receiving notification emails like this one, please contact
dschneider@apache.org.