You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2018/01/31 19:06:59 UTC

[geode] branch develop updated: GEODE-4364: extract RegionMapDestroy and add RegionMapDestroyTest (#1347)

This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 21e2e0a  GEODE-4364: extract RegionMapDestroy and add RegionMapDestroyTest (#1347)
21e2e0a is described below

commit 21e2e0ae73a155b0c072e385544f3fb0d382c4e0
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Wed Jan 31 11:06:55 2018 -0800

    GEODE-4364: extract RegionMapDestroy and add RegionMapDestroyTest (#1347)
---
 .../internal/cache/AbstractBucketRegionQueue.java  |   2 +-
 .../geode/internal/cache/AbstractDiskRegion.java   |  96 ++-
 .../geode/internal/cache/AbstractRegion.java       |  28 +-
 .../geode/internal/cache/AbstractRegionMap.java    | 748 +++------------------
 .../apache/geode/internal/cache/BucketRegion.java  |   4 +-
 .../geode/internal/cache/DistributedRegion.java    |   2 +-
 .../geode/internal/cache/EntryEventImpl.java       |   6 +-
 .../geode/internal/cache/EvictableRegion.java      |   3 +-
 .../{VMRegionMap.java => InternalEntryEvent.java}  |  16 +-
 .../geode/internal/cache/InternalRegion.java       |  35 +-
 .../apache/geode/internal/cache/LocalRegion.java   |  28 +-
 .../cache/OfflineCompactionDiskRegion.java         |  16 +-
 .../internal/cache/PlaceHolderDiskRegion.java      |  27 +-
 .../geode/internal/cache/ProxyRegionMap.java       |  18 +-
 .../org/apache/geode/internal/cache/RegionMap.java |   9 +-
 .../{VMRegionMap.java => RegionMapOwner.java}      |  15 +-
 .../geode/internal/cache/VMLRURegionMap.java       |  32 +-
 .../apache/geode/internal/cache/VMRegionMap.java   |   2 +-
 .../geode/internal/cache/ValidatingDiskRegion.java |   5 -
 .../cache/entries/AbstractRegionEntry.java         |   2 +-
 .../geode/internal/cache/entries/DiskEntry.java    |   5 +-
 .../HashRegionEntry.java}                          |  18 +-
 .../CacheModificationLock.java}                    |  23 +-
 .../geode/internal/cache/map/FocusedRegionMap.java |  45 ++
 .../geode/internal/cache/map/RegionMapDestroy.java | 650 ++++++++++++++++++
 .../cache/persistence/DiskRecoveryStore.java       |   2 -
 .../ConcurrentMapWithReusableEntries.java          |  41 ++
 .../concurrent/CustomEntryConcurrentHashMap.java   |  20 +-
 .../internal/cache/ARMLockTestHookAdapter.java     |  18 +-
 .../internal/cache/AbstractRegionJUnitTest.java    | 668 +-----------------
 .../internal/cache/AbstractRegionMapTest.java      | 214 +++---
 .../internal/cache/ClearRvvLockingDUnitTest.java   |   8 +-
 .../internal/cache/ClearTXLockingDUnitTest.java    |   2 +-
 .../cache/ClientServerTransactionDUnitTest.java    |   4 +-
 ...stroyEntryWithConcurrentOperationJUnitTest.java | 229 -------
 .../DestroyEntryDuringCloseIntegrationTest.java    |  97 +++
 .../internal/cache/map/RegionMapDestroyTest.java   | 655 ++++++++++++++++++
 37 files changed, 1969 insertions(+), 1824 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index 26cc6a5..8991735 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -137,7 +137,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
   }
 
   @Override
-  protected void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event) {
+  public void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event) {
     /**
      * We are doing local destroy on this bucket. No need to send destroy operation to remote nodes.
      */
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractDiskRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractDiskRegion.java
index 6c993db..1d7a79f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractDiskRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractDiskRegion.java
@@ -29,7 +29,6 @@ import org.apache.geode.StatisticsFactory;
 import org.apache.geode.cache.EvictionAction;
 import org.apache.geode.cache.EvictionAlgorithm;
 import org.apache.geode.cache.EvictionAttributes;
-import org.apache.geode.cache.Region;
 import org.apache.geode.compression.Compressor;
 import org.apache.geode.internal.ClassPathLoader;
 import org.apache.geode.internal.CopyOnWriteHashSet;
@@ -46,6 +45,7 @@ import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.util.concurrent.ConcurrentMapWithReusableEntries;
 import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap;
 
 /**
@@ -236,8 +236,10 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     this.offHeap = drv.getOffHeap();
   }
 
+  @Override
   public abstract String getName();
 
+  @Override
   public DiskStoreImpl getDiskStore() {
     return this.ds;
   }
@@ -248,26 +250,32 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     this.versionVector = new DiskRegionVersionVector(ds.getDiskStoreID());
   }
 
+  @Override
   public long getId() {
     return this.id;
   }
 
+  @Override
   public long getClearOplogEntryId() {
     return this.clearOplogEntryId;
   }
 
+  @Override
   public void setClearOplogEntryId(long v) {
     this.clearOplogEntryId = v;
   }
 
+  @Override
   public RegionVersionVector getClearRVV() {
     return this.clearRVV;
   }
 
+  @Override
   public void setClearRVV(RegionVersionVector rvv) {
     this.clearRVV = rvv;
   }
 
+  @Override
   public void setConfig(byte lruAlgorithm, byte lruAction, int lruLimit, int concurrencyLevel,
       int initialCapacity, float loadFactor, boolean statisticsEnabled, boolean isBucket,
       EnumSet<DiskRegionFlag> flags, String partitionName, int startingBucketId,
@@ -321,6 +329,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
         .setAction(getActualLruAction()).setMaximum(getLruLimit());
   }
 
+  @Override
   public byte getLruAlgorithm() {
     return this.lruAlgorithm;
   }
@@ -329,6 +338,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     return EvictionAlgorithm.parseValue(getLruAlgorithm());
   }
 
+  @Override
   public byte getLruAction() {
     return this.lruAction;
   }
@@ -337,26 +347,32 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     return EvictionAction.parseValue(getLruAction());
   }
 
+  @Override
   public int getLruLimit() {
     return this.lruLimit;
   }
 
+  @Override
   public int getConcurrencyLevel() {
     return this.concurrencyLevel;
   }
 
+  @Override
   public int getInitialCapacity() {
     return this.initialCapacity;
   }
 
+  @Override
   public float getLoadFactor() {
     return this.loadFactor;
   }
 
+  @Override
   public boolean getStatisticsEnabled() {
     return this.statisticsEnabled;
   }
 
+  @Override
   public boolean isBucket() {
     return this.isBucket;
   }
@@ -366,10 +382,12 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     return this.flags;
   }
 
+  @Override
   public String getPartitionName() {
     return this.partitionName;
   }
 
+  @Override
   public int getStartingBucketId() {
     return this.startingBucketId;
   }
@@ -386,6 +404,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
   private final CopyOnWriteHashSet<PersistentMemberID> offlineMembers;
   private final CopyOnWriteHashSet<PersistentMemberID> equalMembers;
 
+  @Override
   public PersistentMemberID addMyInitializingPMID(PersistentMemberID pmid) {
     PersistentMemberID result = this.myInitializingId;
     this.myInitializingId = pmid;
@@ -395,44 +414,54 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     return result;
   }
 
+  @Override
   public void markInitialized() {
     assert this.myInitializingId != null;
     this.myInitializedId = this.myInitializingId;
     this.myInitializingId = null;
   }
 
+  @Override
   public boolean addOnlineMember(PersistentMemberID pmid) {
     return this.onlineMembers.add(pmid);
   }
 
+  @Override
   public boolean addOfflineMember(PersistentMemberID pmid) {
     return this.offlineMembers.add(pmid);
   }
 
+  @Override
   public boolean addOfflineAndEqualMember(PersistentMemberID pmid) {
     return this.equalMembers.add(pmid);
   }
 
+  @Override
   public boolean rmOnlineMember(PersistentMemberID pmid) {
     return this.onlineMembers.remove(pmid);
   }
 
+  @Override
   public boolean rmOfflineMember(PersistentMemberID pmid) {
     return this.offlineMembers.remove(pmid);
   }
 
+  @Override
   public boolean rmEqualMember(PersistentMemberID pmid) {
     return this.equalMembers.remove(pmid);
   }
 
+  @Override
   public void markBeginDestroyRegion() {
     this.aboutToDestroy = true;
   }
 
+  @Override
   public void markBeginDestroyDataStorage() {
     this.aboutToDestroyDataStorage = true;
   }
 
+  @Override
   public void markEndDestroyRegion() {
     this.onlineMembers.clear();
     this.offlineMembers.clear();
@@ -443,6 +472,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     this.isRecreated = false;
   }
 
+  @Override
   public void markEndDestroyDataStorage() {
     this.myInitializedId = null;
     this.myInitializingId = null;
@@ -450,6 +480,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
   }
 
   // PersistentMemberView methods
+  @Override
   public PersistentMemberID getMyInitializingID() {
     DiskInitFile dif = this.ds.getDiskInitFile();
     if (dif == null)
@@ -459,6 +490,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     }
   }
 
+  @Override
   public PersistentMemberID getMyPersistentID() {
     DiskInitFile dif = this.ds.getDiskInitFile();
     if (dif == null)
@@ -468,6 +500,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     }
   }
 
+  @Override
   public Set<PersistentMemberID> getOnlineMembers() {
     DiskInitFile dif = this.ds.getDiskInitFile();
     if (dif == null)
@@ -477,6 +510,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     }
   }
 
+  @Override
   public Set<PersistentMemberID> getOfflineMembers() {
     DiskInitFile dif = this.ds.getDiskInitFile();
     if (dif == null)
@@ -486,6 +520,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     }
   }
 
+  @Override
   public Set<PersistentMemberID> getOfflineAndEqualMembers() {
     DiskInitFile dif = this.ds.getDiskInitFile();
     if (dif == null)
@@ -495,11 +530,13 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     }
   }
 
+  @Override
   public Set<PersistentMemberPattern> getRevokedMembers() {
     DiskInitFile dif = this.ds.getDiskInitFile();
     return ds.getRevokedMembers();
   }
 
+  @Override
   public void memberOffline(PersistentMemberID persistentID) {
     this.ds.memberOffline(this, persistentID);
     if (logger.isDebugEnabled()) {
@@ -508,6 +545,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     }
   }
 
+  @Override
   public void memberOfflineAndEqual(PersistentMemberID persistentID) {
     this.ds.memberOfflineAndEqual(this, persistentID);
     if (logger.isDebugEnabled()) {
@@ -516,6 +554,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     }
   }
 
+  @Override
   public void memberOnline(PersistentMemberID persistentID) {
     this.ds.memberOnline(this, persistentID);
     if (logger.isDebugEnabled()) {
@@ -524,6 +563,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     }
   }
 
+  @Override
   public void memberRemoved(PersistentMemberID persistentID) {
     this.ds.memberRemoved(this, persistentID);
     if (logger.isDebugEnabled()) {
@@ -532,6 +572,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     }
   }
 
+  @Override
   public void memberRevoked(PersistentMemberPattern revokedPattern) {
     this.ds.memberRevoked(revokedPattern);
     if (logger.isDebugEnabled()) {
@@ -540,6 +581,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     }
   }
 
+  @Override
   public void setInitializing(PersistentMemberID newId) {
     this.ds.setInitializing(this, newId);
     if (logger.isDebugEnabled()) {
@@ -548,6 +590,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     }
   }
 
+  @Override
   public void setInitialized() {
     this.ds.setInitialized(this);
     if (logger.isDebugEnabled()) {
@@ -556,33 +599,36 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     }
   }
 
+  @Override
   public PersistentMemberID generatePersistentID() {
     return this.ds.generatePersistentID(this);
   }
 
+  @Override
   public boolean isRecreated() {
     return this.isRecreated;
   }
 
+  @Override
   public boolean hasConfigChanged() {
     return this.configChanged;
   }
 
+  @Override
   public void setConfigChanged(boolean v) {
     this.configChanged = v;
   }
 
+  @Override
   public void endDestroy(LocalRegion region) {
     // Clean up the state if we were ready to recover this region
     if (isReadyForRecovery()) {
       ds.updateDiskRegion(this);
-      this.entriesMapIncompatible = false;
-      if (this.entries != null) {
-        CustomEntryConcurrentHashMap<Object, Object> other =
-            ((AbstractRegionMap) this.entries)._getMap();
-        Iterator<Map.Entry<Object, Object>> it = other.entrySetWithReusableEntries().iterator();
-        while (it.hasNext()) {
-          Map.Entry<Object, Object> me = it.next();
+      entriesMapIncompatible = false;
+      if (entries != null) {
+        ConcurrentMapWithReusableEntries<Object, Object> other =
+            entries.getCustomEntryConcurrentHashMap();
+        for (Map.Entry<Object, Object> me : other.entrySetWithReusableEntries()) {
           RegionEntry oldRe = (RegionEntry) me.getValue();
           if (oldRe instanceof OffHeapRegionEntry) {
             ((OffHeapRegionEntry) oldRe).release();
@@ -592,29 +638,29 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
           }
         }
       }
-      this.entries = null;
-      this.readyForRecovery = false;
+      entries = null;
+      readyForRecovery = false;
     }
 
-    if (this.aboutToDestroyDataStorage) {
-      this.ds.endDestroyDataStorage(region, (DiskRegion) this);
+    if (aboutToDestroyDataStorage) {
+      ds.endDestroyDataStorage(region, (DiskRegion) this);
       if (logger.isDebugEnabled()) {
         logger.trace(LogMarker.PERSIST, "PersistentView {} - {} - endDestroyDataStorage: {}",
             getDiskStoreID().abbrev(), this.getName(), getMyPersistentID());
       }
     } else {
-      this.ds.endDestroyRegion(region, (DiskRegion) this);
+      ds.endDestroyRegion(region, (DiskRegion) this);
       if (logger.isDebugEnabled()) {
         logger.trace(LogMarker.PERSIST, "PersistentView {} - {} - endDestroy: {}",
             getDiskStoreID().abbrev(), this.getName(), getMyPersistentID());
       }
     }
-
   }
 
   /**
    * Begin the destroy of everything related to this disk region.
    */
+  @Override
   public void beginDestroy(LocalRegion region) {
     beginDestroyRegion(region);
     if (logger.isDebugEnabled()) {
@@ -630,6 +676,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
    * Destroy the data storage this this disk region. Destroying the data storage leaves the
    * persistent view, but removes the data.
    */
+  @Override
   public void beginDestroyDataStorage() {
     this.ds.beginDestroyDataStorage((DiskRegion) this);
     if (logger.isDebugEnabled()) {
@@ -640,10 +687,12 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
 
   public void createDataStorage() {}
 
+  @Override
   public boolean wasAboutToDestroy() {
     return this.aboutToDestroy;
   }
 
+  @Override
   public boolean wasAboutToDestroyDataStorage() {
     return this.aboutToDestroyDataStorage;
   }
@@ -702,6 +751,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     }
   }
 
+  @Override
   public void copyExistingRegionMap(LocalRegion lr) {
     waitForRecoveryCompletion();
     if (this.entriesMapIncompatible) {
@@ -728,6 +778,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     this.entries = rm;
   }
 
+  @Override
   public RegionMap getRecoveredEntryMap() {
     return this.entries;
   }
@@ -736,11 +787,13 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     this.readyForRecovery = false;
   }
 
+  @Override
   public boolean isReadyForRecovery() {
     // better name for this method would be isRecovering
     return this.readyForRecovery;
   }
 
+  @Override
   public void prepareForRecovery() {
     this.readyForRecovery = true;
   }
@@ -750,10 +803,12 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
    *
    * @since GemFire 3.2.1
    */
+  @Override
   public int getRecoveredEntryCount() {
     return this.recoveredEntryCount;
   }
 
+  @Override
   public void incRecoveredEntryCount() {
     this.recoveredEntryCount++;
   }
@@ -761,6 +816,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
   /**
    * initializes the number of entries recovered
    */
+  @Override
   public void initRecoveredEntryCount() {
     if (this.recoveryCompleted != null) {
       synchronized (this.recoveryCompleted) {
@@ -772,20 +828,24 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
 
   protected final AtomicLong numOverflowOnDisk;
 
+  @Override
   public long getNumOverflowOnDisk() {
     return this.numOverflowOnDisk.get();
   }
 
+  @Override
   public void incNumOverflowOnDisk(long delta) {
     this.numOverflowOnDisk.addAndGet(delta);
   }
 
   protected final AtomicLong numOverflowBytesOnDisk;
 
+  @Override
   public long getNumOverflowBytesOnDisk() {
     return this.numOverflowBytesOnDisk.get();
   }
 
+  @Override
   public void incNumOverflowBytesOnDisk(long delta) {
     this.numOverflowBytesOnDisk.addAndGet(delta);
 
@@ -793,10 +853,12 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
 
   protected final AtomicLong numEntriesInVM;
 
+  @Override
   public long getNumEntriesInVM() {
     return this.numEntriesInVM.get();
   }
 
+  @Override
   public void incNumEntriesInVM(long delta) {
     this.numEntriesInVM.addAndGet(delta);
   }
@@ -805,6 +867,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
    * Returns true if this region maintains a backup of all its keys and values on disk. Returns
    * false if only values that will not fit in memory are written to disk.
    */
+  @Override
   public boolean isBackup() {
     return this.backup;
   }
@@ -973,6 +1036,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
    *
    * @return an instance of BytesAndBits or Token.REMOVED_PHASE1
    */
+  @Override
   public Object getRaw(DiskId id) {
     this.acquireReadLock();
     try {
@@ -982,6 +1046,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     }
   }
 
+  @Override
   public RegionVersionVector getRegionVersionVector() {
     return this.versionVector;
   }
@@ -1025,10 +1090,12 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     return getDiskStore().getPersistentOplogSet(this);
   }
 
+  @Override
   public String getCompressorClassName() {
     return this.compressorClassName;
   }
 
+  @Override
   public Compressor getCompressor() {
     return this.compressor;
   }
@@ -1038,6 +1105,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     return this.offHeap;
   }
 
+  @Override
   public CachePerfStats getCachePerfStats() {
     return this.ds.getCache().getCachePerfStats();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java
index 8613a56..7818c8a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java
@@ -33,6 +33,7 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.StatisticsFactory;
+import org.apache.geode.annotations.TestingOnly;
 import org.apache.geode.cache.AttributesMutator;
 import org.apache.geode.cache.CacheCallback;
 import org.apache.geode.cache.CacheListener;
@@ -275,22 +276,13 @@ public abstract class AbstractRegion implements InternalRegion, AttributesMutato
     setAttributes(attrs, regionName, internalRegionArgs);
   }
 
-  /**
-   * Unit test constructor. DO NOT USE!
-   *
-   * @since GemFire 8.1
-   * @deprecated For unit testing only. Use
-   *             {@link #AbstractRegion(InternalCache, RegionAttributes, String, InternalRegionArguments)}
-   *             .
-   */
-  @Deprecated
-  AbstractRegion(InternalCache cache, int serialNumber, boolean isPdxTypeRegion,
-      long lastAccessedTime, long lastModifiedTime) {
-    this.cache = cache;
-    this.serialNumber = serialNumber;
-    this.isPdxTypesRegion = isPdxTypeRegion;
-    this.lastAccessedTime = new AtomicLong(lastAccessedTime);
-    this.lastModifiedTime = new AtomicLong(lastModifiedTime);
+  @TestingOnly
+  AbstractRegion() {
+    this.cache = null;
+    this.serialNumber = 0;
+    this.isPdxTypesRegion = false;
+    this.lastAccessedTime = new AtomicLong(0);
+    this.lastModifiedTime = new AtomicLong(0);
   }
 
   /**
@@ -1477,9 +1469,6 @@ public abstract class AbstractRegion implements InternalRegion, AttributesMutato
     // nothing
   }
 
-  /** Throws CacheClosedException or RegionDestroyedException */
-  abstract void checkReadiness();
-
   /**
    * Returns true if this region has no storage
    *
@@ -1546,6 +1535,7 @@ public abstract class AbstractRegion implements InternalRegion, AttributesMutato
   /**
    * Returns true if this region can evict entries.
    */
+  @Override
   public boolean isEntryEvictionPossible() {
     return this.evictionAttributes != null && !this.evictionAttributes.getAlgorithm().isNone();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 0b3329e..c51f93b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -54,6 +54,9 @@ import org.apache.geode.internal.cache.eviction.EvictableEntry;
 import org.apache.geode.internal.cache.eviction.EvictionController;
 import org.apache.geode.internal.cache.ha.HAContainerWrapper;
 import org.apache.geode.internal.cache.ha.HARegionQueue;
+import org.apache.geode.internal.cache.map.CacheModificationLock;
+import org.apache.geode.internal.cache.map.FocusedRegionMap;
+import org.apache.geode.internal.cache.map.RegionMapDestroy;
 import org.apache.geode.internal.cache.persistence.DiskRegionView;
 import org.apache.geode.internal.cache.region.entry.RegionEntryFactoryBuilder;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
@@ -82,6 +85,7 @@ import org.apache.geode.internal.offheap.annotations.Unretained;
 import org.apache.geode.internal.sequencelog.EntryLogger;
 import org.apache.geode.internal.size.ReflectionSingleObjectSizer;
 import org.apache.geode.internal.util.BlobHelper;
+import org.apache.geode.internal.util.concurrent.ConcurrentMapWithReusableEntries;
 import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap;
 
 /**
@@ -89,11 +93,12 @@ import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap;
  *
  * @since GemFire 3.5.1
  */
-public abstract class AbstractRegionMap implements RegionMap {
+public abstract class AbstractRegionMap
+    implements RegionMap, FocusedRegionMap, CacheModificationLock {
   private static final Logger logger = LogService.getLogger();
 
   /** The underlying map for this region. */
-  protected CustomEntryConcurrentHashMap<Object, Object> map;
+  protected ConcurrentMapWithReusableEntries<Object, Object> map;
 
   /**
    * This test hook is used to force the conditions during entry destroy. This hook is used by
@@ -106,24 +111,24 @@ public abstract class AbstractRegionMap implements RegionMap {
   private Attributes attr;
 
   // the region that owns this map
-  private Object owner;
+  private RegionMapOwner owner;
 
   protected AbstractRegionMap(InternalRegionArguments internalRegionArgs) {
     // do nothing
   }
 
-  protected void initialize(Object owner, Attributes attr,
+  protected void initialize(RegionMapOwner owner, Attributes attr,
       InternalRegionArguments internalRegionArgs, boolean isLRU) {
     _setAttributes(attr);
     setOwner(owner);
-    _setMap(createConcurrentMap(attr.initialCapacity, attr.loadFactor, attr.concurrencyLevel, false,
-        new AbstractRegionEntry.HashRegionEntryCreator()));
+    setEntryMap(createConcurrentMapWithReusableEntries(attr.initialCapacity, attr.loadFactor,
+        attr.concurrencyLevel, false, new AbstractRegionEntry.HashRegionEntryCreator()));
 
     boolean isDisk;
     boolean withVersioning;
     boolean offHeap;
-    if (owner instanceof LocalRegion) {
-      LocalRegion region = (LocalRegion) owner;
+    if (owner instanceof InternalRegion) {
+      InternalRegion region = (InternalRegion) owner;
       isDisk = region.getDiskRegion() != null;
       withVersioning = region.getConcurrencyChecksEnabled();
       offHeap = region.getOffHeap();
@@ -140,8 +145,8 @@ public abstract class AbstractRegionMap implements RegionMap {
         withVersioning, offHeap));
   }
 
-  private CustomEntryConcurrentHashMap<Object, Object> createConcurrentMap(int initialCapacity,
-      float loadFactor, int concurrencyLevel, boolean isIdentityMap,
+  private ConcurrentMapWithReusableEntries<Object, Object> createConcurrentMapWithReusableEntries(
+      int initialCapacity, float loadFactor, int concurrencyLevel, boolean isIdentityMap,
       CustomEntryConcurrentHashMap.HashEntryCreator<Object, Object> entryCreator) {
     if (entryCreator != null) {
       return new CustomEntryConcurrentHashMap<>(initialCapacity, loadFactor, concurrencyLevel,
@@ -170,7 +175,7 @@ public abstract class AbstractRegionMap implements RegionMap {
     return this.entryFactory;
   }
 
-  protected void _setAttributes(Attributes a) {
+  private void _setAttributes(Attributes a) {
     this.attr = a;
   }
 
@@ -179,62 +184,71 @@ public abstract class AbstractRegionMap implements RegionMap {
     return this.attr;
   }
 
-  protected LocalRegion _getOwner() {
+  public LocalRegion _getOwner() {
     return (LocalRegion) this.owner;
   }
 
-  protected boolean _isOwnerALocalRegion() {
+  boolean _isOwnerALocalRegion() {
     return this.owner instanceof LocalRegion;
   }
 
-  protected Object _getOwnerObject() {
+  Object _getOwnerObject() {
     return this.owner;
   }
 
+  /**
+   * Tells this map what region owns it.
+   */
+  private void setOwner(RegionMapOwner owner) {
+    this.owner = owner;
+  }
+
   @Override
-  public void setOwner(Object r) {
-    this.owner = r;
+  public ConcurrentMapWithReusableEntries<Object, Object> getCustomEntryConcurrentHashMap() {
+    return map;
   }
 
-  protected CustomEntryConcurrentHashMap<Object, Object> _getMap() {
-    return this.map;
+  @Override
+  public Map<Object, Object> getEntryMap() {
+    return map;
   }
 
-  protected void _setMap(CustomEntryConcurrentHashMap<Object, Object> m) {
-    this.map = m;
+  @Override
+  public void setEntryMap(ConcurrentMapWithReusableEntries<Object, Object> map) {
+    this.map = map;
   }
 
   @Override
   public int size() {
-    return _getMap().size();
+    return getEntryMap().size();
   }
 
   // this is currently used by stats and eviction
   @Override
   public int sizeInVM() {
-    return _getMap().size();
+    return getEntryMap().size();
   }
 
   @Override
   public boolean isEmpty() {
-    return _getMap().isEmpty();
+    return getEntryMap().isEmpty();
   }
 
   @Override
   public Set keySet() {
-    return _getMap().keySet();
+    return getEntryMap().keySet();
   }
 
   @Override
   @SuppressWarnings({"unchecked", "rawtypes"})
   public Collection<RegionEntry> regionEntries() {
-    return (Collection) _getMap().values();
+    return (Collection) getEntryMap().values();
   }
 
   @SuppressWarnings({"unchecked", "rawtypes"})
   @Override
   public Collection<RegionEntry> regionEntriesInVM() {
-    return (Collection) _getMap().values();
+    return (Collection) getEntryMap().values();
   }
 
   @Override
@@ -251,32 +265,33 @@ public abstract class AbstractRegionMap implements RegionMap {
 
   @Override
   public RegionEntry getEntry(Object key) {
-    RegionEntry re = (RegionEntry) _getMap().get(key);
+    RegionEntry re = (RegionEntry) getEntryMap().get(key);
     return re;
   }
 
-  protected RegionEntry getEntry(EntryEventImpl event) {
+  @Override
+  public RegionEntry getEntry(EntryEventImpl event) {
     return getEntry(event.getKey());
   }
 
 
   @Override
   public RegionEntry getEntryInVM(Object key) {
-    return (RegionEntry) _getMap().get(key);
+    return (RegionEntry) getEntryMap().get(key);
   }
 
 
   @Override
-  public RegionEntry putEntryIfAbsent(Object key, RegionEntry re) {
-    RegionEntry oldRe = (RegionEntry) _getMap().putIfAbsent(key, re);
-    if (oldRe == null && (re instanceof OffHeapRegionEntry) && _isOwnerALocalRegion()
+  public RegionEntry putEntryIfAbsent(Object key, RegionEntry regionEntry) {
+    RegionEntry oldRe = (RegionEntry) getEntryMap().putIfAbsent(key, regionEntry);
+    if (oldRe == null && (regionEntry instanceof OffHeapRegionEntry) && _isOwnerALocalRegion()
         && _getOwner().isThisRegionBeingClosedOrDestroyed()) {
       // prevent orphan during concurrent destroy (#48068)
-      Object v = re.getValue();
+      Object v = regionEntry.getValue();
       if (v != Token.REMOVED_PHASE1 && v != Token.REMOVED_PHASE2 && v instanceof StoredObject
           && ((StoredObject) v).hasRefCount()) {
-        if (_getMap().remove(key, re)) {
-          ((OffHeapRegionEntry) re).release();
+        if (getEntryMap().remove(key, regionEntry)) {
+          ((OffHeapRegionEntry) regionEntry).release();
         }
       }
     }
@@ -285,21 +300,21 @@ public abstract class AbstractRegionMap implements RegionMap {
 
   @Override
   public RegionEntry getOperationalEntryInVM(Object key) {
-    RegionEntry re = (RegionEntry) _getMap().get(key);
+    RegionEntry re = (RegionEntry) getEntryMap().get(key);
     return re;
   }
 
 
   @Override
-  public void removeEntry(Object key, RegionEntry re, boolean updateStat) {
-    if (re.isTombstone() && _getMap().get(key) == re) {
+  public void removeEntry(Object key, RegionEntry regionEntry, boolean updateStat) {
+    if (regionEntry.isTombstone() && getEntryMap().get(key) == regionEntry) {
       logger.fatal(
           LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE),
           new Exception("stack trace"));
       return; // can't remove tombstones except from the tombstone sweeper
     }
-    if (_getMap().remove(key, re)) {
-      re.removePhase2();
+    if (getEntryMap().remove(key, regionEntry)) {
+      regionEntry.removePhase2();
       if (updateStat) {
         incEntryCount(-1);
       }
@@ -307,17 +322,17 @@ public abstract class AbstractRegionMap implements RegionMap {
   }
 
   @Override
-  public void removeEntry(Object key, RegionEntry re, boolean updateStat, EntryEventImpl event,
-      final InternalRegion owner) {
+  public void removeEntry(Object key, RegionEntry regionEntry, boolean updateStat,
+      EntryEventImpl event, final InternalRegion internalRegion) {
     boolean success = false;
-    if (re.isTombstone() && _getMap().get(key) == re) {
+    if (regionEntry.isTombstone() && getEntryMap().get(key) == regionEntry) {
       logger.fatal(
           LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE),
           new Exception("stack trace"));
       return; // can't remove tombstones except from the tombstone sweeper
     }
-    if (_getMap().remove(key, re)) {
-      re.removePhase2();
+    if (getEntryMap().remove(key, regionEntry)) {
+      regionEntry.removePhase2();
       success = true;
       if (updateStat) {
         incEntryCount(-1);
@@ -345,7 +360,7 @@ public abstract class AbstractRegionMap implements RegionMap {
   }
 
   private void _mapClear() {
-    _getMap().clear();
+    getEntryMap().clear();
   }
 
   @Override
@@ -419,7 +434,7 @@ public abstract class AbstractRegionMap implements RegionMap {
 
               boolean tombstone = re.isTombstone();
               // note: it.remove() did not reliably remove the entry so we use remove(K,V) here
-              if (_getMap().remove(re.getKey(), re)) {
+              if (getEntryMap().remove(re.getKey(), re)) {
                 if (OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()) {
                   GatewaySenderEventImpl.release(re.getValue()); // OFFHEAP _getValue ok
                 }
@@ -448,9 +463,9 @@ public abstract class AbstractRegionMap implements RegionMap {
         incEntryCount(-delta);
         incEntryCount(-tombstones);
         if (logger.isDebugEnabled()) {
-          logger.debug("Size after clearing = {}", _getMap().size());
+          logger.debug("Size after clearing = {}", getEntryMap().size());
         }
-        if (isTraceEnabled && _getMap().size() < 20) {
+        if (isTraceEnabled && getEntryMap().size() < 20) {
           _getOwner().dumpBackingMap();
         }
       }
@@ -493,7 +508,8 @@ public abstract class AbstractRegionMap implements RegionMap {
   /**
    * Tell an LRU that an existing entry has been destroyed
    */
-  protected void lruEntryDestroy(RegionEntry e) {
+  @Override
+  public void lruEntryDestroy(RegionEntry regionEntry) {
     // do nothing by default
   }
 
@@ -532,15 +548,16 @@ public abstract class AbstractRegionMap implements RegionMap {
    * Process an incoming version tag for concurrent operation detection. This must be done before
    * modifying the region entry.
    *
-   * @param re the entry that is to be modified
+   * @param regionEntry the entry that is to be modified
    * @param event the modification to the entry
    * @throws InvalidDeltaException if the event contains a delta that cannot be applied
    * @throws ConcurrentCacheModificationException if the event is in conflict with a previously
    *         applied change
    */
-  private void processVersionTag(RegionEntry re, EntryEventImpl event) {
-    if (re.getVersionStamp() != null) {
-      re.getVersionStamp().processVersionTag(event);
+  @Override
+  public void processVersionTag(RegionEntry regionEntry, EntryEventImpl event) {
+    if (regionEntry.getVersionStamp() != null) {
+      regionEntry.getVersionStamp().processVersionTag(event);
 
       // during initialization we record version tag info to detect ops the
       // image provider hasn't seen
@@ -570,7 +587,7 @@ public abstract class AbstractRegionMap implements RegionMap {
     // so that they will be in the correct order.
     OrderedTombstoneMap<RegionEntry> tombstones = new OrderedTombstoneMap<RegionEntry>();
     if (rm != null) {
-      CustomEntryConcurrentHashMap<Object, Object> other = ((AbstractRegionMap) rm)._getMap();
+      ConcurrentMapWithReusableEntries<Object, Object> other = rm.getCustomEntryConcurrentHashMap();
       Iterator<Map.Entry<Object, Object>> it = other.entrySetWithReusableEntries().iterator();
       while (it.hasNext()) {
         Map.Entry<Object, Object> me = it.next();
@@ -601,7 +618,7 @@ public abstract class AbstractRegionMap implements RegionMap {
           // change the disk stats. This also depends on DiskEntry.Helper.initialize not changing
           // the stats for REMOVED_PHASE1
           copyRecoveredEntry(oldRe, newRe);
-          // newRe is now in this._getMap().
+          // newRe is now in this.getCustomEntryConcurrentHashMap().
           if (newRe.isTombstone()) {
             VersionTag tag = newRe.getVersionStamp().asVersionTag();
             tombstones.put(tag, newRe);
@@ -648,7 +665,7 @@ public abstract class AbstractRegionMap implements RegionMap {
 
   }
 
-  protected void copyRecoveredEntry(RegionEntry oldRe, RegionEntry newRe) {
+  private void copyRecoveredEntry(RegionEntry oldRe, RegionEntry newRe) {
     if (newRe.getVersionStamp() != null) {
       newRe.getVersionStamp().setMemberID(oldRe.getVersionStamp().getMemberID());
       newRe.getVersionStamp().setVersions(oldRe.getVersionStamp().asVersionTag());
@@ -658,7 +675,7 @@ public abstract class AbstractRegionMap implements RegionMap {
       ((AbstractOplogDiskRegionEntry) newRe).setDiskId(oldRe);
       _getOwner().getDiskRegion().replaceIncompatibleEntry((DiskEntry) oldRe, (DiskEntry) newRe);
     }
-    _getMap().put(newRe.getKey(), newRe);
+    getEntryMap().put(newRe.getKey(), newRe);
   }
 
   @Override
@@ -679,7 +696,7 @@ public abstract class AbstractRegionMap implements RegionMap {
             if (_isOwnerALocalRegion()) {
               _getOwner().getCachePerfStats().incRetries();
             }
-            _getMap().remove(key, oldRe);
+            getEntryMap().remove(key, oldRe);
             oldRe = putEntryIfAbsent(key, newRe);
           }
           /*
@@ -880,7 +897,7 @@ public abstract class AbstractRegionMap implements RegionMap {
               synchronized (oldRe) {
                 if (oldRe.isRemovedPhase2()) {
                   owner.getCachePerfStats().incRetries();
-                  _getMap().remove(key, oldRe);
+                  getEntryMap().remove(key, oldRe);
                   oldRe = putEntryIfAbsent(key, newRe);
                 } else {
                   boolean acceptedVersionTag = false;
@@ -1047,9 +1064,10 @@ public abstract class AbstractRegionMap implements RegionMap {
     return result;
   }
 
-  protected void initialImagePutEntry(RegionEntry newRe) {}
+  private void initialImagePutEntry(RegionEntry newRe) {}
 
-  boolean confirmEvictionDestroy(RegionEntry re) {
+  @Override
+  public boolean confirmEvictionDestroy(RegionEntry regionEntry) {
     /* We arn't in an LRU context, and should never get here */
     Assert.assertTrue(false, "Not an LRU region, can not confirm LRU eviction operation");
     return true;
@@ -1059,547 +1077,9 @@ public abstract class AbstractRegionMap implements RegionMap {
   public boolean destroy(EntryEventImpl event, boolean inTokenMode, boolean duringRI,
       boolean cacheWrite, boolean isEviction, Object expectedOldValue, boolean removeRecoveredEntry)
       throws CacheWriterException, EntryNotFoundException, TimeoutException {
-
-    final LocalRegion owner = _getOwner();
-
-    if (owner == null) {
-      Assert.assertTrue(false, "The owner for RegionMap " + this // "fix" for bug 32440
-          + " is null for event " + event);
-    }
-
-    boolean retry = true;
-
-    lockForCacheModification(owner, event);
-    try {
-
-      while (retry) {
-        retry = false;
-
-        boolean opCompleted = false;
-        boolean doPart3 = false;
-
-        // We need to acquire the region entry while holding the lock to avoid #45620.
-        // The outer try/finally ensures that the lock will be released without fail.
-        // I'm avoiding indenting just to preserve the ability
-        // to track diffs since the code is fairly complex.
-
-        RegionEntry re = getEntry(event);
-        RegionEntry tombstone = null;
-        boolean haveTombstone = false;
-        /*
-         * Execute the test hook runnable inline (not threaded) if it is not null.
-         */
-        if (null != testHookRunnableForConcurrentOperation) {
-          testHookRunnableForConcurrentOperation.run();
-        }
-
-        try {
-          if (logger.isTraceEnabled(LogMarker.LRU_TOMBSTONE_COUNT)
-              && !(owner instanceof HARegion)) {
-            logger.trace(LogMarker.LRU_TOMBSTONE_COUNT,
-                "ARM.destroy() inTokenMode={}; duringRI={}; riLocalDestroy={}; withRepl={}; fromServer={}; concurrencyEnabled={}; isOriginRemote={}; isEviction={}; operation={}; re={}",
-                inTokenMode, duringRI, event.isFromRILocalDestroy(),
-                owner.getDataPolicy().withReplication(), event.isFromServer(),
-                owner.getConcurrencyChecksEnabled(), event.isOriginRemote(), isEviction,
-                event.getOperation(), re);
-          }
-          if (event.isFromRILocalDestroy()) {
-            // for RI local-destroy we don't want to keep tombstones.
-            // In order to simplify things we just set this recovery
-            // flag to true to force the entry to be removed
-            removeRecoveredEntry = true;
-          }
-          // the logic in this method is already very involved, and adding tombstone
-          // permutations to (re != null) greatly complicates it. So, we check
-          // for a tombstone here and, if found, pretend for a bit that the entry is null
-          if (re != null && re.isTombstone() && !removeRecoveredEntry) {
-            tombstone = re;
-            haveTombstone = true;
-            re = null;
-          }
-          IndexManager oqlIndexManager = owner.getIndexManager();
-          if (re == null) {
-            // we need to create an entry if in token mode or if we've received
-            // a destroy from a peer or WAN gateway and we need to retain version
-            // information for concurrency checks
-            boolean retainForConcurrency = (!haveTombstone
-                && (owner.getDataPolicy().withReplication() || event.isFromServer())
-                && owner.getConcurrencyChecksEnabled()
-                && (event.isOriginRemote() /* destroy received from other must create tombstone */
-                    || event.isFromWANAndVersioned() /* wan event must create a tombstone */
-                    || event.isBridgeEvent())); /*
-                                                 * event from client must create a tombstone so
-                                                 * client has a version #
-                                                 */
-            if (inTokenMode || retainForConcurrency) {
-              // removeRecoveredEntry should be false in this case
-              RegionEntry newRe =
-                  getEntryFactory().createEntry(owner, event.getKey(), Token.REMOVED_PHASE1);
-              // Fix for Bug #44431. We do NOT want to update the region and wait
-              // later for index INIT as region.clear() can cause inconsistency if
-              // happened in parallel as it also does index INIT.
-              if (oqlIndexManager != null) {
-                oqlIndexManager.waitForIndexInit();
-              }
-              try {
-                synchronized (newRe) {
-                  RegionEntry oldRe = putEntryIfAbsent(event.getKey(), newRe);
-                  while (!opCompleted && oldRe != null) {
-                    synchronized (oldRe) {
-                      if (oldRe.isRemovedPhase2()) {
-                        owner.getCachePerfStats().incRetries();
-                        _getMap().remove(event.getKey(), oldRe);
-                        oldRe = putEntryIfAbsent(event.getKey(), newRe);
-                      } else {
-                        event.setRegionEntry(oldRe);
-
-                        // Last transaction related eviction check. This should
-                        // prevent
-                        // transaction conflict (caused by eviction) when the entry
-                        // is being added to transaction state.
-                        if (isEviction) {
-                          if (!confirmEvictionDestroy(oldRe)) {
-                            opCompleted = false;
-                            return opCompleted;
-                          }
-                        }
-                        try {
-                          // if concurrency checks are enabled, destroy will
-                          // set the version tag
-                          boolean destroyed = destroyEntry(oldRe, event, inTokenMode, cacheWrite,
-                              expectedOldValue, false, removeRecoveredEntry);
-                          if (destroyed) {
-                            if (retainForConcurrency) {
-                              owner.basicDestroyBeforeRemoval(oldRe, event);
-                            }
-                            owner.basicDestroyPart2(oldRe, event, inTokenMode,
-                                false /* conflict with clear */, duringRI, true);
-                            lruEntryDestroy(oldRe);
-                            doPart3 = true;
-                          }
-                        } catch (RegionClearedException rce) {
-                          // Ignore. The exception will ensure that we do not update
-                          // the LRU List
-                          owner.basicDestroyPart2(oldRe, event, inTokenMode,
-                              true/* conflict with clear */, duringRI, true);
-                          doPart3 = true;
-                        } catch (ConcurrentCacheModificationException ccme) {
-                          VersionTag tag = event.getVersionTag();
-                          if (tag != null && tag.isTimeStampUpdated()) {
-                            // Notify gateways of new time-stamp.
-                            owner.notifyTimestampsToGateways(event);
-                          }
-                          throw ccme;
-                        }
-                        re = oldRe;
-                        opCompleted = true;
-                      }
-                    } // synchronized oldRe
-                  } // while
-                  if (!opCompleted) {
-                    // The following try has a finally that cleans up the newRe.
-                    // This is only needed if newRe was added to the map which only
-                    // happens if we didn't get completed with oldRe in the above while loop.
-                    try {
-                      re = newRe;
-                      event.setRegionEntry(newRe);
-
-                      try {
-                        // if concurrency checks are enabled, destroy will
-                        // set the version tag
-                        if (isEviction) {
-                          opCompleted = false;
-                          return opCompleted;
-                        }
-                        opCompleted = destroyEntry(newRe, event, inTokenMode, cacheWrite,
-                            expectedOldValue, true, removeRecoveredEntry);
-                        if (opCompleted) {
-                          // This is a new entry that was created because we are in
-                          // token mode or are accepting a destroy operation by adding
-                          // a tombstone. There is no oldValue, so we don't need to
-                          // call updateSizeOnRemove
-                          // owner.recordEvent(event);
-                          event.setIsRedestroyedEntry(true); // native clients need to know if the
-                                                             // entry didn't exist
-                          if (retainForConcurrency) {
-                            owner.basicDestroyBeforeRemoval(oldRe, event);
-                          }
-                          owner.basicDestroyPart2(newRe, event, inTokenMode,
-                              false /* conflict with clear */, duringRI, true);
-                          doPart3 = true;
-                        }
-                      } catch (RegionClearedException rce) {
-                        // Ignore. The exception will ensure that we do not update
-                        // the LRU List
-                        opCompleted = true;
-                        EntryLogger.logDestroy(event);
-                        owner.basicDestroyPart2(newRe, event, inTokenMode,
-                            true /* conflict with clear */, duringRI, true);
-                        doPart3 = true;
-                      } catch (ConcurrentCacheModificationException ccme) {
-                        VersionTag tag = event.getVersionTag();
-                        if (tag != null && tag.isTimeStampUpdated()) {
-                          // Notify gateways of new time-stamp.
-                          owner.notifyTimestampsToGateways(event);
-                        }
-                        throw ccme;
-                      }
-                      // Note no need for LRU work since the entry is destroyed
-                      // and will be removed when gii completes
-                    } finally {
-                      if (!opCompleted
-                          && !haveTombstone /* to fix bug 51583 do this for all operations */ ) {
-                        removeEntry(event.getKey(), newRe, false);
-                      }
-                      if (!opCompleted && isEviction) {
-                        removeEntry(event.getKey(), newRe, false);
-                      }
-                    }
-                  } // !opCompleted
-                } // synchronized newRe
-              } finally {
-                if (oqlIndexManager != null) {
-                  oqlIndexManager.countDownIndexUpdaters();
-                }
-              }
-            } // inTokenMode or tombstone creation
-            else {
-              if (!isEviction || owner.getConcurrencyChecksEnabled()) {
-                // The following ensures that there is not a concurrent operation
-                // on the entry and leaves behind a tombstone if concurrencyChecksEnabled.
-                // It fixes bug #32467 by propagating the destroy to the server even though
-                // the entry isn't in the client
-                RegionEntry newRe = haveTombstone ? tombstone
-                    : getEntryFactory().createEntry(owner, event.getKey(), Token.REMOVED_PHASE1);
-                synchronized (newRe) {
-                  if (haveTombstone && !tombstone.isTombstone()) {
-                    // we have to check this again under synchronization since it may have changed
-                    retry = true;
-                    // retryEntry = tombstone; // leave this in place for debugging
-                    continue;
-                  }
-                  re = (RegionEntry) _getMap().putIfAbsent(event.getKey(), newRe);
-                  if (re != null && re != tombstone) {
-                    // concurrent change - try again
-                    retry = true;
-                    // retryEntry = tombstone; // leave this in place for debugging
-                    continue;
-                  } else if (!isEviction) {
-                    boolean throwex = false;
-                    EntryNotFoundException ex = null;
-                    try {
-                      if (!cacheWrite) {
-                        throwex = true;
-                      } else {
-                        try {
-                          if (!removeRecoveredEntry) {
-                            throwex = !owner.bridgeWriteBeforeDestroy(event, expectedOldValue);
-                          }
-                        } catch (EntryNotFoundException e) {
-                          throwex = true;
-                          ex = e;
-                        }
-                      }
-                      if (throwex) {
-                        if (!event.isOriginRemote() && !event.getOperation().isLocal()
-                            && (event.isFromBridgeAndVersioned() || // if this is a replayed client
-                                                                    // event that already has a
-                                                                    // version
-                                event.isFromWANAndVersioned())) { // or if this is a WAN event that
-                                                                  // has been applied in another
-                                                                  // system
-                          // we must distribute these since they will update the version information
-                          // in peers
-                          if (logger.isDebugEnabled()) {
-                            logger.debug(
-                                "ARM.destroy is allowing wan/client destroy of {} to continue",
-                                event.getKey());
-                          }
-                          throwex = false;
-                          event.setIsRedestroyedEntry(true);
-                          // Distribution of this op happens on re and re might me null here before
-                          // distributing this destroy op.
-                          if (re == null) {
-                            re = newRe;
-                          }
-                          doPart3 = true;
-                        }
-                      }
-                      if (throwex) {
-                        if (ex == null) {
-                          // Fix for 48182, check cache state and/or region state before sending
-                          // entry not found.
-                          // this is from the server and any exceptions will propogate to the client
-                          owner.checkEntryNotFound(event.getKey());
-                        } else {
-                          throw ex;
-                        }
-                      }
-                    } finally {
-                      // either remove the entry or leave a tombstone
-                      try {
-                        if (!event.isOriginRemote() && event.getVersionTag() != null
-                            && owner.getConcurrencyChecksEnabled()) {
-                          // this shouldn't fail since we just created the entry.
-                          // it will either generate a tag or apply a server's version tag
-                          processVersionTag(newRe, event);
-                          if (doPart3) {
-                            owner.generateAndSetVersionTag(event, newRe);
-                          }
-                          try {
-                            owner.recordEvent(event);
-                            newRe.makeTombstone(owner, event.getVersionTag());
-                          } catch (RegionClearedException e) {
-                            // that's okay - when writing a tombstone into a disk, the
-                            // region has been cleared (including this tombstone)
-                          }
-                          opCompleted = true;
-                          // lruEntryCreate(newRe);
-                        } else if (!haveTombstone) {
-                          try {
-                            assert newRe != tombstone;
-                            newRe.setValue(owner, Token.REMOVED_PHASE2);
-                            removeEntry(event.getKey(), newRe, false);
-                          } catch (RegionClearedException e) {
-                            // that's okay - we just need to remove the new entry
-                          }
-                        } else if (event.getVersionTag() != null) { // haveTombstone - update the
-                                                                    // tombstone version info
-                          processVersionTag(tombstone, event);
-                          if (doPart3) {
-                            owner.generateAndSetVersionTag(event, newRe);
-                          }
-                          // This is not conflict, we need to persist the tombstone again with new
-                          // version tag
-                          try {
-                            tombstone.setValue(owner, Token.TOMBSTONE);
-                          } catch (RegionClearedException e) {
-                            // that's okay - when writing a tombstone into a disk, the
-                            // region has been cleared (including this tombstone)
-                          }
-                          owner.recordEvent(event);
-                          owner.rescheduleTombstone(tombstone, event.getVersionTag());
-                          owner.basicDestroyPart2(tombstone, event, inTokenMode,
-                              true /* conflict with clear */, duringRI, true);
-                          opCompleted = true;
-                        } else {
-                          Assert.assertTrue(event.getVersionTag() == null);
-                          Assert.assertTrue(newRe == tombstone);
-                          event.setVersionTag(getVersionTagFromStamp(tombstone.getVersionStamp()));
-                        }
-                      } catch (ConcurrentCacheModificationException ccme) {
-                        VersionTag tag = event.getVersionTag();
-                        if (tag != null && tag.isTimeStampUpdated()) {
-                          // Notify gateways of new time-stamp.
-                          owner.notifyTimestampsToGateways(event);
-                        }
-                        throw ccme;
-                      }
-                    }
-                  }
-                } // synchronized(newRe)
-              }
-            }
-          } // no current entry
-          else { // current entry exists
-            if (oqlIndexManager != null) {
-              oqlIndexManager.waitForIndexInit();
-            }
-            try {
-              synchronized (re) {
-                owner.checkReadiness();
-                // if the entry is a tombstone and the event is from a peer or a client
-                // then we allow the operation to be performed so that we can update the
-                // version stamp. Otherwise we would retain an old version stamp and may allow
-                // an operation that is older than the destroy() to be applied to the cache
-                // Bug 45170: If removeRecoveredEntry, we treat tombstone as regular entry to be
-                // deleted
-                boolean createTombstoneForConflictChecks =
-                    (owner.getConcurrencyChecksEnabled() && (event.isOriginRemote()
-                        || event.getContext() != null || removeRecoveredEntry));
-                if (!re.isRemoved() || createTombstoneForConflictChecks) {
-                  if (re.isRemovedPhase2()) {
-                    _getMap().remove(event.getKey(), re);
-                    owner.getCachePerfStats().incRetries();
-                    retry = true;
-                    continue;
-                  }
-                  if (!event.isOriginRemote() && event.getOperation().isExpiration()) {
-                    // If this expiration started locally then only do it if the RE is not being
-                    // used by a tx.
-                    if (re.isInUseByTransaction()) {
-                      opCompleted = false;
-                      return opCompleted;
-                    }
-                  }
-                  event.setRegionEntry(re);
-
-                  // See comment above about eviction checks
-                  if (isEviction) {
-                    assert expectedOldValue == null;
-                    if (!confirmEvictionDestroy(re)) {
-                      opCompleted = false;
-                      return opCompleted;
-                    }
-                  }
-
-                  boolean removed = false;
-                  try {
-                    opCompleted = destroyEntry(re, event, inTokenMode, cacheWrite, expectedOldValue,
-                        false, removeRecoveredEntry);
-                    if (opCompleted) {
-                      // It is very, very important for Partitioned Regions to keep
-                      // the entry in the map until after distribution occurs so that other
-                      // threads performing a create on this entry wait until the destroy
-                      // distribution is finished.
-                      // keeping backup copies consistent. Fix for bug 35906.
-                      // -- mthomas 07/02/2007 <-- how about that date, kinda cool eh?
-                      owner.basicDestroyBeforeRemoval(re, event);
-
-                      // do this before basicDestroyPart2 to fix bug 31786
-                      if (!inTokenMode) {
-                        if (re.getVersionStamp() == null) {
-                          re.removePhase2();
-                          removeEntry(event.getKey(), re, true, event, owner);
-                          removed = true;
-                        }
-                      }
-                      if (inTokenMode && !duringRI) {
-                        event.inhibitCacheListenerNotification(true);
-                      }
-                      doPart3 = true;
-                      owner.basicDestroyPart2(re, event, inTokenMode,
-                          false /* conflict with clear */, duringRI, true);
-                      // if (!re.isTombstone() || isEviction) {
-                      lruEntryDestroy(re);
-                      // } else {
-                      // lruEntryUpdate(re);
-                      // lruUpdateCallback = true;
-                      // }
-                    } else {
-                      if (!inTokenMode) {
-                        EntryLogger.logDestroy(event);
-                        owner.recordEvent(event);
-                        if (re.getVersionStamp() == null) {
-                          re.removePhase2();
-                          removeEntry(event.getKey(), re, true, event, owner);
-                          lruEntryDestroy(re);
-                        } else {
-                          if (re.isTombstone()) {
-                            // the entry is already a tombstone, but we're destroying it
-                            // again, so we need to reschedule the tombstone's expiration
-                            if (event.isOriginRemote()) {
-                              owner.rescheduleTombstone(re, re.getVersionStamp().asVersionTag());
-                            }
-                          }
-                        }
-                        lruEntryDestroy(re);
-                        opCompleted = true;
-                      }
-                    }
-                  } catch (RegionClearedException rce) {
-                    // Ignore. The exception will ensure that we do not update
-                    // the LRU List
-                    opCompleted = true;
-                    owner.recordEvent(event);
-                    if (inTokenMode && !duringRI) {
-                      event.inhibitCacheListenerNotification(true);
-                    }
-                    owner.basicDestroyPart2(re, event, inTokenMode, true /* conflict with clear */,
-                        duringRI, true);
-                    doPart3 = true;
-                  } finally {
-                    owner.checkReadiness();
-                    if (re.isRemoved() && !re.isTombstone()) {
-                      if (!removed) {
-                        removeEntry(event.getKey(), re, true, event, owner);
-                      }
-                    }
-                  }
-                } // !isRemoved
-                else { // already removed
-                  if (re.isTombstone() && event.getVersionTag() != null) {
-                    // if we're dealing with a tombstone and this is a remote event
-                    // (e.g., from cache client update thread) we need to update
-                    // the tombstone's version information
-                    // TODO use destroyEntry() here
-                    processVersionTag(re, event);
-                    try {
-                      re.makeTombstone(owner, event.getVersionTag());
-                    } catch (RegionClearedException e) {
-                      // that's okay - when writing a tombstone into a disk, the
-                      // region has been cleared (including this tombstone)
-                    }
-                  }
-                  if (expectedOldValue != null) {
-                    // if re is removed then there is no old value, so return false
-                    return false;
-                  }
-
-                  if (!inTokenMode && !isEviction) {
-                    owner.checkEntryNotFound(event.getKey());
-                  }
-                }
-              } // synchronized re
-            } catch (ConcurrentCacheModificationException ccme) {
-              VersionTag tag = event.getVersionTag();
-              if (tag != null && tag.isTimeStampUpdated()) {
-                // Notify gateways of new time-stamp.
-                owner.notifyTimestampsToGateways(event);
-              }
-              throw ccme;
-            } finally {
-              if (oqlIndexManager != null) {
-                oqlIndexManager.countDownIndexUpdaters();
-              }
-            }
-            // No need to call lruUpdateCallback since the only lru action
-            // we may have taken was lruEntryDestroy. This fixes bug 31759.
-
-          } // current entry exists
-          if (opCompleted) {
-            EntryLogger.logDestroy(event);
-          }
-          return opCompleted;
-        } finally {
-          try {
-            // If concurrency conflict is there and event contains gateway version tag then
-            // do NOT distribute.
-            if (event.isConcurrencyConflict()
-                && (event.getVersionTag() != null && event.getVersionTag().isGatewayTag())) {
-              doPart3 = false;
-            }
-            // distribution and listener notification
-            if (doPart3) {
-              owner.basicDestroyPart3(re, event, inTokenMode, duringRI, true, expectedOldValue);
-            }
-          } finally {
-            if (opCompleted) {
-              if (re != null) {
-                // we only want to cancel if concurrency-check is not enabled
-                // re(regionentry) will be null when concurrency-check is enable and removeTombstone
-                // method
-                // will call cancelExpiryTask on regionEntry
-                owner.cancelExpiryTask(re);
-              }
-            }
-          }
-        }
-
-      } // retry loop
-    } finally { // failsafe on the read lock...see comment above
-      releaseCacheModificationLock(owner, event);
-    }
-    return false;
-  }
-
-  private VersionTag getVersionTagFromStamp(VersionStamp stamp) {
-    VersionTag tag = VersionTag.create(stamp.getMemberID());
-    tag.setEntryVersion(stamp.getEntryVersion());
-    tag.setRegionVersion(stamp.getRegionVersion());
-    tag.setVersionTimeStamp(stamp.getVersionTimeStamp());
-    tag.setDistributedSystemId(stamp.getDistributedSystemId());
-    return tag;
+    RegionMapDestroy regionMapDestroy = new RegionMapDestroy((InternalRegion) owner, this, this);
+    return regionMapDestroy.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction,
+        expectedOldValue, removeRecoveredEntry);
   }
 
   @Override
@@ -1731,7 +1211,7 @@ public abstract class AbstractRegionMap implements RegionMap {
               synchronized (oldRe) {
                 if (oldRe.isRemovedPhase2()) {
                   owner.getCachePerfStats().incRetries();
-                  _getMap().remove(key, oldRe);
+                  getEntryMap().remove(key, oldRe);
                   oldRe = putEntryIfAbsent(key, newRe);
                 } else {
                   try {
@@ -1950,7 +1430,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                     // proceed to phase 2 of removal.
                     if (oldRe.isRemovedPhase2()) {
                       owner.getCachePerfStats().incRetries();
-                      _getMap().remove(event.getKey(), oldRe);
+                      getEntryMap().remove(event.getKey(), oldRe);
                       oldRe = putEntryIfAbsent(event.getKey(), newRe);
                     } else {
                       opCompleted = true;
@@ -2317,8 +1797,8 @@ public abstract class AbstractRegionMap implements RegionMap {
 
   }
 
-  protected void invalidateNewEntry(EntryEventImpl event, final LocalRegion owner,
-      RegionEntry newRe) throws RegionClearedException {
+  private void invalidateNewEntry(EntryEventImpl event, final LocalRegion owner, RegionEntry newRe)
+      throws RegionClearedException {
     processVersionTag(newRe, event);
     event.putNewEntry(owner, newRe);
     owner.recordEvent(event);
@@ -2429,7 +1909,7 @@ public abstract class AbstractRegionMap implements RegionMap {
               synchronized (oldRe) {
                 if (oldRe.isRemovedPhase2()) {
                   owner.getCachePerfStats().incRetries();
-                  _getMap().remove(key, oldRe);
+                  getEntryMap().remove(key, oldRe);
                   oldRe = putEntryIfAbsent(key, newRe);
                 } else {
                   opCompleted = true;
@@ -2787,7 +2267,7 @@ public abstract class AbstractRegionMap implements RegionMap {
             // and change its state
             if (re.isRemovedPhase2()) {
               _getOwner().getCachePerfStats().incRetries();
-              _getMap().remove(event.getKey(), re);
+              getEntryMap().remove(event.getKey(), re);
               re = getOrCreateRegionEntry(owner, event, Token.REMOVED_PHASE1, null, onlyExisting,
                   false);
               if (re == null) {
@@ -3120,21 +2600,6 @@ public abstract class AbstractRegionMap implements RegionMap {
     return true;
   }
 
-  protected boolean destroyEntry(RegionEntry re, EntryEventImpl event, boolean inTokenMode,
-      boolean cacheWrite, @Released Object expectedOldValue, boolean forceDestroy,
-      boolean removeRecoveredEntry) throws CacheWriterException, TimeoutException,
-      EntryNotFoundException, RegionClearedException {
-    processVersionTag(re, event);
-    final int oldSize = _getOwner().calculateRegionEntryValueSize(re);
-    boolean retVal = re.destroy(event.getLocalRegion(), event, inTokenMode, cacheWrite,
-        expectedOldValue, forceDestroy, removeRecoveredEntry);
-    if (retVal) {
-      EntryLogger.logDestroy(event);
-      _getOwner().updateSizeOnRemove(event.getKey(), oldSize);
-    }
-    return retVal;
-  }
-
   @Override
   public void txApplyPut(Operation p_putOp, Object key, Object nv, boolean didDestroy,
       TransactionId txId, TXRmtEvent txEvent, EventID eventId, Object aCallbackArgument,
@@ -3285,7 +2750,7 @@ public abstract class AbstractRegionMap implements RegionMap {
               synchronized (oldRe) {
                 if (oldRe.isRemovedPhase2()) {
                   owner.getCachePerfStats().incRetries();
-                  _getMap().remove(key, oldRe);
+                  getEntryMap().remove(key, oldRe);
                   oldRe = putEntryIfAbsent(key, newRe);
                 } else {
                   opCompleted = true;
@@ -3542,9 +3007,9 @@ public abstract class AbstractRegionMap implements RegionMap {
   }
 
   public void dumpMap() {
-    logger.info("dump of concurrent map of size {} for region {}", this._getMap().size(),
+    logger.info("dump of concurrent map of size {} for region {}", getEntryMap().size(),
         this._getOwner());
-    for (Iterator it = this._getMap().values().iterator(); it.hasNext();) {
+    for (Iterator it = getEntryMap().values().iterator(); it.hasNext();) {
       logger.info("dumpMap:" + it.next().toString());
     }
   }
@@ -3718,9 +3183,9 @@ public abstract class AbstractRegionMap implements RegionMap {
     // }
   }
 
-
   /** get version-generation permission from the region's version vector */
-  void lockForCacheModification(LocalRegion owner, EntryEventImpl event) {
+  @Override
+  public void lockForCacheModification(InternalRegion owner, EntryEventImpl event) {
     boolean lockedByBulkOp = event.isBulkOpInProgress() && owner.getDataPolicy().withReplication();
 
     if (armLockTestHook != null)
@@ -3739,7 +3204,8 @@ public abstract class AbstractRegionMap implements RegionMap {
   }
 
   /** release version-generation permission from the region's version vector */
-  void releaseCacheModificationLock(LocalRegion owner, EntryEventImpl event) {
+  @Override
+  public void releaseCacheModificationLock(InternalRegion owner, EntryEventImpl event) {
     boolean lockedByBulkOp = event.isBulkOpInProgress() && owner.getDataPolicy().withReplication();
 
     if (armLockTestHook != null)
@@ -3889,15 +3355,15 @@ public abstract class AbstractRegionMap implements RegionMap {
     return result;
   }
 
-  protected boolean removeTombstone(RegionEntry re) {
-    return _getMap().remove(re.getKey(), re);
+  private boolean removeTombstone(RegionEntry re) {
+    return getEntryMap().remove(re.getKey(), re);
   }
 
   // method used for debugging tombstone count issues
   public boolean verifyTombstoneCount(AtomicInteger numTombstones) {
     int deadEntries = 0;
     try {
-      for (Iterator it = _getMap().values().iterator(); it.hasNext();) {
+      for (Iterator it = getEntryMap().values().iterator(); it.hasNext();) {
         RegionEntry re = (RegionEntry) it.next();
         if (re.isTombstone()) {
           deadEntries++;
@@ -3958,21 +3424,21 @@ public abstract class AbstractRegionMap implements RegionMap {
   public void updateEvictionCounter() {}
 
   public interface ARMLockTestHook {
-    public void beforeBulkLock(LocalRegion region);
+    public void beforeBulkLock(InternalRegion region);
 
-    public void afterBulkLock(LocalRegion region);
+    public void afterBulkLock(InternalRegion region);
 
-    public void beforeBulkRelease(LocalRegion region);
+    public void beforeBulkRelease(InternalRegion region);
 
-    public void afterBulkRelease(LocalRegion region);
+    public void afterBulkRelease(InternalRegion region);
 
-    public void beforeLock(LocalRegion region, CacheEvent event);
+    public void beforeLock(InternalRegion region, CacheEvent event);
 
-    public void afterLock(LocalRegion region, CacheEvent event);
+    public void afterLock(InternalRegion region, CacheEvent event);
 
-    public void beforeRelease(LocalRegion region, CacheEvent event);
+    public void beforeRelease(InternalRegion region, CacheEvent event);
 
-    public void afterRelease(LocalRegion region, CacheEvent event);
+    public void afterRelease(InternalRegion region, CacheEvent event);
 
     public void beforeStateFlushWait();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index ac25c0c..d754f02 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -1224,7 +1224,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   @Override
-  protected void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event) {
+  public void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event) {
     long token = -1;
     DestroyOperation op = null;
     try {
@@ -2137,7 +2137,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   @Override
-  void updateSizeOnRemove(Object key, int oldSize) {
+  public void updateSizeOnRemove(Object key, int oldSize) {
     this.partitionedRegion.getPrStats().incDataStoreEntryCount(-1);
     updateBucket2Size(oldSize, 0, SizeOp.DESTROY);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 16af9a9..978aa3c 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -1626,7 +1626,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
   }
 
   @Override
-  void basicDestroyPart3(RegionEntry re, EntryEventImpl event, boolean inTokenMode,
+  public void basicDestroyPart3(RegionEntry re, EntryEventImpl event, boolean inTokenMode,
       boolean duringRI, boolean invokeCallbacks, Object expectedOldValue) {
 
     distributeDestroy(event, expectedOldValue);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index 5b0f704..761f6e4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -91,8 +91,8 @@ import org.apache.geode.pdx.internal.PeerTypeRegistration;
  *
  * must be public for DataSerializableFixedID
  */
-public class EntryEventImpl
-    implements EntryEvent, InternalCacheEvent, DataSerializableFixedID, EntryOperation, Releasable {
+public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
+    DataSerializableFixedID, EntryOperation, Releasable {
   private static final Logger logger = LogService.getLogger();
 
   // PACKAGE FIELDS //
@@ -1594,7 +1594,7 @@ public class EntryEventImpl
     setNewValueInRegion(owner, reentry, null);
   }
 
-  void setRegionEntry(RegionEntry re) {
+  public void setRegionEntry(RegionEntry re) {
     this.re = re;
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EvictableRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EvictableRegion.java
index 3fdd79c..0e85679 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EvictableRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EvictableRegion.java
@@ -18,7 +18,8 @@ import org.apache.geode.StatisticsFactory;
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.internal.cache.eviction.EvictionController;
 
-public interface EvictableRegion {
+public interface EvictableRegion extends RegionMapOwner {
+
   EvictionAttributes getEvictionAttributes();
 
   boolean getOffHeap();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalEntryEvent.java
similarity index 70%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java
copy to geode-core/src/main/java/org/apache/geode/internal/cache/InternalEntryEvent.java
index c831fa6..3b0e51e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalEntryEvent.java
@@ -12,20 +12,14 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.cache;
 
+import org.apache.geode.cache.EntryEvent;
+
 /**
- * Internal implementation of {@link RegionMap} for regions stored in normal VM memory.
- *
- * @since GemFire 3.5.1
- *
- *
+ * All of the API methods not exposed to User in EntryEvent.
  */
-class VMRegionMap extends AbstractRegionMap {
+public interface InternalEntryEvent extends EntryEvent {
 
-  VMRegionMap(Object owner, Attributes attr, InternalRegionArguments internalRegionArgs) {
-    super(internalRegionArgs);
-    initialize(owner, attr, internalRegionArgs, false/* isLRU */);
-  }
+  void setRegionEntry(RegionEntry re);
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
index 86b8992..d28c7aa 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -47,8 +47,8 @@ import org.apache.geode.internal.cache.versions.VersionTag;
  * </ul>
  */
 @SuppressWarnings("rawtypes")
-public interface InternalRegion
-    extends Region, HasCachePerfStats, RegionEntryContext, RegionAttributes, HasDiskRegion {
+public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryContext,
+    RegionAttributes, HasDiskRegion, RegionMapOwner {
 
   CachePerfStats getCachePerfStats();
 
@@ -140,4 +140,35 @@ public interface InternalRegion
 
   RegionMap getRegionMap();
 
+  void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event);
+
+  void basicDestroyPart2(RegionEntry re, EntryEventImpl event, boolean inTokenMode,
+      boolean conflictWithClear, boolean duringRI, boolean invokeCallbacks);
+
+  void notifyTimestampsToGateways(EntryEventImpl event);
+
+  boolean bridgeWriteBeforeDestroy(EntryEventImpl event, Object expectedOldValue)
+      throws CacheWriterException, EntryNotFoundException, TimeoutException;
+
+  void checkEntryNotFound(Object entryKey);
+
+  void rescheduleTombstone(RegionEntry entry, VersionTag version);
+
+  /** Throws CacheClosedException or RegionDestroyedException */
+  void checkReadiness();
+
+  void basicDestroyPart3(RegionEntry re, EntryEventImpl event, boolean inTokenMode,
+      boolean duringRI, boolean invokeCallbacks, Object expectedOldValue);
+
+  void cancelExpiryTask(RegionEntry regionEntry);
+
+  boolean hasServerProxy();
+
+  int calculateRegionEntryValueSize(RegionEntry re);
+
+  void updateSizeOnRemove(Object key, int oldSize);
+
+  boolean isEntryEvictionPossible();
+
+  KeyInfo getKeyInfo(Object key);
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index aca96d0..17f8152 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -791,6 +791,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     return this.serverRegionProxy;
   }
 
+  @Override
   public boolean hasServerProxy() {
     return this.serverRegionProxy != null;
   }
@@ -2720,7 +2721,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
    *
    * @param entryKey the missing entry's key.
    */
-  void checkEntryNotFound(Object entryKey) {
+  @Override
+  public void checkEntryNotFound(Object entryKey) {
     checkReadiness();
     // Localized string for partitioned region is generic enough for general use
     throw new EntryNotFoundException(
@@ -2919,7 +2921,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   }
 
   /** @return true if this was a client region; false if not */
-  boolean bridgeWriteBeforeDestroy(EntryEventImpl event, Object expectedOldValue)
+  @Override
+  public boolean bridgeWriteBeforeDestroy(EntryEventImpl event, Object expectedOldValue)
       throws CacheWriterException, EntryNotFoundException, TimeoutException {
     if (hasServerProxy()) {
       serverDestroy(event, expectedOldValue);
@@ -3235,7 +3238,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     getGemFireCache().getTombstoneService().scheduleTombstone(this, entry, destroyedVersion);
   }
 
-  void rescheduleTombstone(RegionEntry entry, VersionTag version) {
+  @Override
+  public void rescheduleTombstone(RegionEntry entry, VersionTag version) {
     scheduleTombstone(entry, version, true);
   }
 
@@ -5836,7 +5840,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   /**
    * This notifies all WAN sites about updated timestamp on local site.
    */
-  void notifyTimestampsToGateways(EntryEventImpl event) {
+  @Override
+  public void notifyTimestampsToGateways(EntryEventImpl event) {
     // Create updateTimeStampEvent from event.
     VersionTagHolder updateTimeStampEvent = new VersionTagHolder(event.getVersionTag());
     updateTimeStampEvent.setOperation(Operation.UPDATE_VERSION_STAMP);
@@ -6622,7 +6627,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
    * @param event the event describing the destroy operation
    * @since GemFire 5.1
    */
-  protected void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event) {
+  @Override
+  public void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event) {
     // do nothing
   }
 
@@ -6630,7 +6636,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
    * Called by lower levels, while still holding the write sync lock, and the low level has
    * completed its part of the basic destroy
    */
-  void basicDestroyPart2(RegionEntry re, EntryEventImpl event, boolean inTokenMode,
+  @Override
+  public void basicDestroyPart2(RegionEntry re, EntryEventImpl event, boolean inTokenMode,
       boolean conflictWithClear, boolean duringRI, boolean invokeCallbacks) {
     if (!(this instanceof HARegion)) {
       if (logger.isTraceEnabled()) {
@@ -6697,7 +6704,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
    * distribution and callback notification are done in part2 inside entry lock for maintaining the
    * order of events.
    */
-  void basicDestroyPart3(RegionEntry re, EntryEventImpl event, boolean inTokenMode,
+  @Override
+  public void basicDestroyPart3(RegionEntry re, EntryEventImpl event, boolean inTokenMode,
       boolean duringRI, boolean invokeCallbacks, Object expectedOldValue) {
 
     if (invokeCallbacks) {
@@ -8118,7 +8126,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     }
   }
 
-  void cancelExpiryTask(RegionEntry regionEntry) {
+  @Override
+  public void cancelExpiryTask(RegionEntry regionEntry) {
     cancelExpiryTask(regionEntry, null);
   }
 
@@ -10721,7 +10730,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     // Only needed by BucketRegion
   }
 
-  void updateSizeOnRemove(Object key, int oldSize) {
+  public void updateSizeOnRemove(Object key, int oldSize) {
     // Only needed by BucketRegion
   }
 
@@ -10848,6 +10857,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   /**
    * @return the wrapped {@link KeyInfo}
    */
+  @Override
   public KeyInfo getKeyInfo(Object key) {
     return new KeyInfo(key, null, null);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/OfflineCompactionDiskRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/OfflineCompactionDiskRegion.java
index b0a1ea2..114bc21 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/OfflineCompactionDiskRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/OfflineCompactionDiskRegion.java
@@ -45,40 +45,49 @@ public class OfflineCompactionDiskRegion extends DiskRegion implements DiskRecov
   }
 
   ///////////// DiskRecoveryStore methods ////////////////
+  @Override
   public DiskRegionView getDiskRegionView() {
     return this;
   }
 
+  @Override
   public DiskEntry getDiskEntry(Object key) {
     return null;
   }
 
+  @Override
   public DiskEntry initializeRecoveredEntry(Object key, DiskEntry.RecoveredEntry re) {
     throw new IllegalStateException(
         "updateRecoveredEntry should not be called when offline compacting");
   }
 
+  @Override
   public DiskEntry updateRecoveredEntry(Object key, DiskEntry.RecoveredEntry re) {
     throw new IllegalStateException(
         "updateRecoveredEntry should not be called when offline compacting");
   }
 
+  @Override
   public void destroyRecoveredEntry(Object key) {}
 
+  @Override
   public void foreachRegionEntry(LocalRegion.RegionEntryCallback callback) {
     throw new IllegalStateException(
         "foreachRegionEntry should not be called when offline compacting");
   }
 
+  @Override
   public boolean lruLimitExceeded() {
     return false;
   }
 
+  @Override
   public void copyRecoveredEntries(RegionMap rm) {
     throw new IllegalStateException(
         "copyRecoveredEntries should not be called on OfflineCompactionDiskRegion");
   }
 
+  @Override
   public void updateSizeOnFaultIn(Object key, int newSize, int bytesOnDisk) {
     throw new IllegalStateException(
         "updateSizeOnFaultIn should not be called on OfflineCompactionDiskRegion");
@@ -90,20 +99,18 @@ public class OfflineCompactionDiskRegion extends DiskRegion implements DiskRecov
   }
 
   @Override
-  public int calculateRegionEntryValueSize(RegionEntry re) {
-    return 0;
-  }
-
   public RegionMap getRegionMap() {
     throw new IllegalStateException(
         "getRegionMap should not be called on OfflineCompactionDiskRegion");
   }
 
+  @Override
   public void handleDiskAccessException(DiskAccessException dae) {
     throw new IllegalStateException(
         "handleDiskAccessException should not be called on OfflineCompactionDiskRegion");
   }
 
+  @Override
   public void initializeStats(long numEntriesInVM, long numOverflowOnDisk,
       long numOverflowBytesOnDisk) {
     throw new IllegalStateException(
@@ -113,6 +120,7 @@ public class OfflineCompactionDiskRegion extends DiskRegion implements DiskRecov
 
 
   public static class DummyDiskExceptionHandler implements DiskExceptionHandler {
+    @Override
     public void handleDiskAccessException(DiskAccessException dae) {
       // nothing needed
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PlaceHolderDiskRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PlaceHolderDiskRegion.java
index 62fc9b9..68ed60a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PlaceHolderDiskRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PlaceHolderDiskRegion.java
@@ -31,7 +31,8 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
  *
  * @since GemFire prPersistSprint2
  */
-public class PlaceHolderDiskRegion extends AbstractDiskRegion implements DiskRecoveryStore {
+public class PlaceHolderDiskRegion extends AbstractDiskRegion
+    implements DiskRecoveryStore, RegionMapOwner {
 
   private final String name;
 
@@ -70,6 +71,7 @@ public class PlaceHolderDiskRegion extends AbstractDiskRegion implements DiskRec
     // nothing needed
   }
 
+  @Override
   public void finishPendingDestroy() {
     // nothing needed
   }
@@ -84,10 +86,12 @@ public class PlaceHolderDiskRegion extends AbstractDiskRegion implements DiskRec
   }
 
   // DiskRecoveryStore methods
+  @Override
   public DiskRegionView getDiskRegionView() {
     return this;
   }
 
+  @Override
   public DiskEntry getDiskEntry(Object key) {
     RegionEntry re = getRecoveredEntryMap().getEntry(key);
     if (re != null && re.isRemoved() && !re.isTombstone()) {
@@ -96,6 +100,7 @@ public class PlaceHolderDiskRegion extends AbstractDiskRegion implements DiskRec
     return (DiskEntry) re;
   }
 
+  @Override
   public DiskEntry initializeRecoveredEntry(Object key, DiskEntry.RecoveredEntry value) {
     RegionEntry re = getRecoveredEntryMap().initRecoveredEntry(key, value);
     if (re == null) {
@@ -105,30 +110,37 @@ public class PlaceHolderDiskRegion extends AbstractDiskRegion implements DiskRec
     return (DiskEntry) re;
   }
 
+  @Override
   public DiskEntry updateRecoveredEntry(Object key, DiskEntry.RecoveredEntry value) {
     return (DiskEntry) getRecoveredEntryMap().updateRecoveredEntry(key, value);
   }
 
+  @Override
   public void destroyRecoveredEntry(Object key) {
     throw new IllegalStateException("destroyRecoveredEntry should not be called during recovery");
   }
 
+  @Override
   public void copyRecoveredEntries(RegionMap rm) {
     throw new IllegalStateException("copyRecoveredEntries should not be called during recovery");
   }
 
+  @Override
   public void foreachRegionEntry(LocalRegion.RegionEntryCallback callback) {
     throw new IllegalStateException("foreachRegionEntry should not be called during recovery");
   }
 
+  @Override
   public boolean lruLimitExceeded() {
     return getRecoveredEntryMap().lruLimitExceeded(this);
   }
 
+  @Override
   public DiskStoreID getDiskStoreID() {
     return getDiskStore().getDiskStoreID();
   }
 
+  @Override
   public void acquireReadLock() {
     // not needed. The only thread
     // using this method is the async recovery thread.
@@ -137,10 +149,12 @@ public class PlaceHolderDiskRegion extends AbstractDiskRegion implements DiskRec
 
   }
 
+  @Override
   public void releaseReadLock() {
     // not needed
   }
 
+  @Override
   public void updateSizeOnFaultIn(Object key, int newSize, int bytesOnDisk) {
     // only used by bucket regions
   }
@@ -151,10 +165,6 @@ public class PlaceHolderDiskRegion extends AbstractDiskRegion implements DiskRec
   }
 
   @Override
-  public int calculateRegionEntryValueSize(RegionEntry re) {
-    return 0;
-  }
-
   public RegionMap getRegionMap() {
     return getRecoveredEntryMap();
   }
@@ -167,6 +177,7 @@ public class PlaceHolderDiskRegion extends AbstractDiskRegion implements DiskRec
     }
   }
 
+  @Override
   public void handleDiskAccessException(DiskAccessException dae) {
     getDiskStore().getCache().getLoggerI18n().error(
         LocalizedStrings.PlaceHolderDiskRegion_A_DISKACCESSEXCEPTION_HAS_OCCURRED_WHILE_RECOVERING_FROM_DISK,
@@ -174,26 +185,32 @@ public class PlaceHolderDiskRegion extends AbstractDiskRegion implements DiskRec
 
   }
 
+  @Override
   public boolean didClearCountChange() {
     return false;
   }
 
+  @Override
   public CancelCriterion getCancelCriterion() {
     return getDiskStore().getCancelCriterion();
   }
 
+  @Override
   public boolean isSync() {
     return true;
   }
 
+  @Override
   public void endRead(long start, long end, long bytesRead) {
     // do nothing
   }
 
+  @Override
   public boolean isRegionClosed() {
     return false;
   }
 
+  @Override
   public void initializeStats(long numEntriesInVM, long numOverflowOnDisk,
       long numOverflowBytesOnDisk) {
     this.numEntriesInVM.set(numEntriesInVM);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
index 7ab81c9..e58faf8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
@@ -48,6 +48,7 @@ import org.apache.geode.internal.cache.versions.VersionStamp;
 import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.offheap.annotations.Released;
+import org.apache.geode.internal.util.concurrent.ConcurrentMapWithReusableEntries;
 
 /**
  * Internal implementation of {@link RegionMap}for regions whose DataPolicy is proxy. Proxy maps are
@@ -81,11 +82,6 @@ class ProxyRegionMap implements RegionMap {
   }
 
   @Override
-  public void setOwner(Object r) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   public void changeOwner(LocalRegion r) {
     throw new UnsupportedOperationException();
   }
@@ -102,7 +98,7 @@ class ProxyRegionMap implements RegionMap {
 
   @Override
   public Set keySet() {
-    return Collections.EMPTY_SET;
+    return Collections.emptySet();
   }
 
   @Override
@@ -920,4 +916,14 @@ class ProxyRegionMap implements RegionMap {
 
   @Override
   public void updateEvictionCounter() {}
+
+  @Override
+  public ConcurrentMapWithReusableEntries<Object, Object> getCustomEntryConcurrentHashMap() {
+    return null;
+  }
+
+  @Override
+  public void setEntryMap(ConcurrentMapWithReusableEntries<Object, Object> map) {
+
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java
index b60cf20..2b3a28c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java
@@ -34,6 +34,7 @@ import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.cache.versions.VersionHolder;
 import org.apache.geode.internal.cache.versions.VersionSource;
 import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.internal.util.concurrent.ConcurrentMapWithReusableEntries;
 
 /**
  * Internal interface used by {@link LocalRegion} to access the map that holds its entries. Note
@@ -80,11 +81,6 @@ public interface RegionMap extends EvictableMap {
    */
   Attributes getAttributes();
 
-  /**
-   * Tells this map what region owns it.
-   */
-  void setOwner(Object r);
-
   void changeOwner(LocalRegion r);
 
   int size();
@@ -372,4 +368,7 @@ public interface RegionMap extends EvictableMap {
 
   void updateEvictionCounter();
 
+  ConcurrentMapWithReusableEntries<Object, Object> getCustomEntryConcurrentHashMap();
+
+  void setEntryMap(ConcurrentMapWithReusableEntries<Object, Object> map);
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMapOwner.java
similarity index 69%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java
copy to geode-core/src/main/java/org/apache/geode/internal/cache/RegionMapOwner.java
index c831fa6..1f5ce01 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMapOwner.java
@@ -12,20 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.cache;
 
-/**
- * Internal implementation of {@link RegionMap} for regions stored in normal VM memory.
- *
- * @since GemFire 3.5.1
- *
- *
- */
-class VMRegionMap extends AbstractRegionMap {
-
-  VMRegionMap(Object owner, Attributes attr, InternalRegionArguments internalRegionArgs) {
-    super(internalRegionArgs);
-    initialize(owner, attr, internalRegionArgs, false/* isLRU */);
-  }
+public interface RegionMapOwner {
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/VMLRURegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/VMLRURegionMap.java
index ab916ae..236c154 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/VMLRURegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/VMLRURegionMap.java
@@ -53,17 +53,17 @@ import org.apache.geode.internal.size.ReflectionSingleObjectSizer;
  *
  * @since GemFire 3.5.1
  */
-class VMLRURegionMap extends AbstractRegionMap {
+// TODO: change back from public to package-private
+public class VMLRURegionMap extends AbstractRegionMap {
   private static final Logger logger = LogService.getLogger();
 
-  VMLRURegionMap(EvictableRegion owner, Attributes attr,
+  public VMLRURegionMap(EvictableRegion owner, Attributes attr,
       InternalRegionArguments internalRegionArgs) {
     this(owner, attr, internalRegionArgs, createEvictionController(owner, internalRegionArgs));
   }
 
-  /** used by unit tests */
-  VMLRURegionMap(EvictableRegion owner, Attributes attr, InternalRegionArguments internalRegionArgs,
-      EvictionController evictionController) {
+  public VMLRURegionMap(EvictableRegion owner, Attributes attr,
+      InternalRegionArguments internalRegionArgs, EvictionController evictionController) {
     super(internalRegionArgs);
     initialize(owner, attr, internalRegionArgs);
     this.evictionController = evictionController;
@@ -85,9 +85,13 @@ class VMLRURegionMap extends AbstractRegionMap {
     return evictionController;
   }
 
-  protected void initialize(EvictableRegion owner, Attributes attr,
+  protected void initialize(EvictableRegion evictableRegion, Attributes attr,
       InternalRegionArguments internalRegionArgs) {
-    super.initialize(owner, attr, internalRegionArgs, true);
+    if (evictableRegion instanceof InternalRegion) {
+      initialize((InternalRegion) evictableRegion, attr, internalRegionArgs, true);
+    } else {
+      initialize((PlaceHolderDiskRegion) evictableRegion, attr, internalRegionArgs, true);
+    }
   }
 
   @Override
@@ -717,13 +721,13 @@ class VMLRURegionMap extends AbstractRegionMap {
   }
 
   @Override
-  protected void lruEntryDestroy(RegionEntry re) {
-    final EvictableEntry e = (EvictableEntry) re;
+  public void lruEntryDestroy(RegionEntry regionEntry) {
+    final EvictableEntry e = (EvictableEntry) regionEntry;
     if (logger.isTraceEnabled(LogMarker.LRU)) {
       logger.trace(LogMarker.LRU,
           "lruEntryDestroy for key={}; list size is: {}; actual size is: {}; map size is: {}; entry size: {}; in lru clock: {}",
-          re.getKey(), getTotalEntrySize(), this.getEvictionList().size(), size(), e.getEntrySize(),
-          !e.isEvicted());
+          regionEntry.getKey(), getTotalEntrySize(), this.getEvictionList().size(), size(),
+          e.getEntrySize(), !e.isEvicted());
     }
 
     // if (this.lruCreatedKey == re.getKey()) {
@@ -734,7 +738,7 @@ class VMLRURegionMap extends AbstractRegionMap {
     getEvictionList().destroyEntry(e);
     // if (removed || wasEvicted) { // evicted entries have already been removed from the list
     changeTotalEntrySize(-1 * e.getEntrySize());// subtract the size.
-    Token vTok = re.getValueAsToken();
+    Token vTok = regionEntry.getValueAsToken();
     if (vTok == Token.DESTROYED || vTok == Token.TOMBSTONE) {
       // OFFHEAP noop TODO: use re.isDestroyedOrTombstone
       // if in token mode we need to recalculate the size of the entry since it's
@@ -791,9 +795,9 @@ class VMLRURegionMap extends AbstractRegionMap {
   }
 
   @Override
-  boolean confirmEvictionDestroy(RegionEntry re) {
+  public boolean confirmEvictionDestroy(RegionEntry regionEntry) {
     // We assume here that a LRURegionMap contains LRUEntries
-    EvictableEntry lruRe = (EvictableEntry) re;
+    EvictableEntry lruRe = (EvictableEntry) regionEntry;
     if (lruRe.isInUseByTransaction() || lruRe.isDestroyed()) {
       lruRe.unsetEvicted();
       return false;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java
index c831fa6..264f447 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java
@@ -24,7 +24,7 @@ package org.apache.geode.internal.cache;
  */
 class VMRegionMap extends AbstractRegionMap {
 
-  VMRegionMap(Object owner, Attributes attr, InternalRegionArguments internalRegionArgs) {
+  VMRegionMap(RegionMapOwner owner, Attributes attr, InternalRegionArguments internalRegionArgs) {
     super(internalRegionArgs);
     initialize(owner, attr, internalRegionArgs, false/* isLRU */);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java
index 98c4e6d..cc0908a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java
@@ -130,11 +130,6 @@ public class ValidatingDiskRegion extends DiskRegion implements DiskRecoveryStor
   }
 
   @Override
-  public int calculateRegionEntryValueSize(RegionEntry re) {
-    return 0;
-  }
-
-  @Override
   public RegionMap getRegionMap() {
     throw new IllegalStateException("getRegionMap should not be called on ValidatingDiskRegion");
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
index 63ec46b..a57fd81 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
@@ -103,7 +103,7 @@ import org.apache.geode.pdx.internal.PdxInstanceImpl;
  *
  * @since GemFire 3.5.1
  */
-public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Object, Object> {
+public abstract class AbstractRegionEntry implements HashRegionEntry<Object, Object> {
   private static final Logger logger = LogService.getLogger();
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java
index 0801101..504d133 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java
@@ -1314,7 +1314,7 @@ public interface DiskEntry extends RegionEntry {
     public static int overflowToDisk(DiskEntry entry, InternalRegion region,
         EvictionController ccHelper) throws RegionClearedException {
       DiskRegion dr = region.getDiskRegion();
-      final int oldSize = ((DiskRecoveryStore) region).calculateRegionEntryValueSize(entry);
+      final int oldSize = region.calculateRegionEntryValueSize(entry);
       // Get diskID . If it is null, it implies it is overflow only mode.
       DiskId did = entry.getDiskId();
       if (did == null) {
@@ -1440,8 +1440,7 @@ public interface DiskEntry extends RegionEntry {
               if (did.isPendingAsync()) {
                 did.setPendingAsync(false);
                 final Token entryVal = entry.getValueAsToken();
-                final int entryValSize =
-                    ((DiskRecoveryStore) region).calculateRegionEntryValueSize(entry);
+                final int entryValSize = region.calculateRegionEntryValueSize(entry);
                 try {
                   if (Token.isRemovedFromDisk(entryVal)) {
                     if (region.isThisRegionBeingClosedOrDestroyed())
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/HashRegionEntry.java
similarity index 65%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java
copy to geode-core/src/main/java/org/apache/geode/internal/cache/entries/HashRegionEntry.java
index c831fa6..fd5fd81 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/HashRegionEntry.java
@@ -12,20 +12,10 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
+package org.apache.geode.internal.cache.entries;
 
-package org.apache.geode.internal.cache;
+import org.apache.geode.internal.cache.RegionEntry;
+import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.HashEntry;
 
-/**
- * Internal implementation of {@link RegionMap} for regions stored in normal VM memory.
- *
- * @since GemFire 3.5.1
- *
- *
- */
-class VMRegionMap extends AbstractRegionMap {
-
-  VMRegionMap(Object owner, Attributes attr, InternalRegionArguments internalRegionArgs) {
-    super(internalRegionArgs);
-    initialize(owner, attr, internalRegionArgs, false/* isLRU */);
-  }
+public interface HashRegionEntry<K, V> extends RegionEntry, HashEntry<K, V> {
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/map/CacheModificationLock.java
similarity index 60%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java
copy to geode-core/src/main/java/org/apache/geode/internal/cache/map/CacheModificationLock.java
index c831fa6..7c10e61 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/VMRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/map/CacheModificationLock.java
@@ -12,20 +12,17 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
+package org.apache.geode.internal.cache.map;
 
-package org.apache.geode.internal.cache;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.InternalRegion;
 
-/**
- * Internal implementation of {@link RegionMap} for regions stored in normal VM memory.
- *
- * @since GemFire 3.5.1
- *
- *
- */
-class VMRegionMap extends AbstractRegionMap {
+public interface CacheModificationLock {
+
+  /** get version-generation permission from the region's version vector */
+  void lockForCacheModification(InternalRegion owner, EntryEventImpl event);
+
+  /** release version-generation permission from the region's version vector */
+  void releaseCacheModificationLock(InternalRegion owner, EntryEventImpl event);
 
-  VMRegionMap(Object owner, Attributes attr, InternalRegionArguments internalRegionArgs) {
-    super(internalRegionArgs);
-    initialize(owner, attr, internalRegionArgs, false/* isLRU */);
-  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/map/FocusedRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/map/FocusedRegionMap.java
new file mode 100644
index 0000000..5c0411c
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/map/FocusedRegionMap.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.map;
+
+import java.util.Map;
+
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.RegionEntry;
+import org.apache.geode.internal.cache.RegionEntryFactory;
+
+public interface FocusedRegionMap {
+
+  RegionEntry getEntry(EntryEventImpl event);
+
+  RegionEntryFactory getEntryFactory();
+
+  RegionEntry putEntryIfAbsent(Object key, RegionEntry regionEntry);
+
+  Map<Object, Object> getEntryMap();
+
+  boolean confirmEvictionDestroy(RegionEntry regionEntry); // TODO: subclass
+
+  void lruEntryDestroy(RegionEntry regionEntry); // TODO: subclass
+
+  void removeEntry(Object key, RegionEntry regionEntry, boolean updateStat);
+
+  void removeEntry(Object key, RegionEntry regionEntry, boolean updateStat, EntryEventImpl event,
+      final InternalRegion internalRegion);
+
+  void processVersionTag(RegionEntry regionEntry, EntryEventImpl event);
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapDestroy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapDestroy.java
new file mode 100644
index 0000000..f303d21
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapDestroy.java
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.map;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.TimeoutException;
+import org.apache.geode.cache.query.internal.index.IndexManager;
+import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.RegionClearedException;
+import org.apache.geode.internal.cache.RegionEntry;
+import org.apache.geode.internal.cache.Token;
+import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
+import org.apache.geode.internal.cache.versions.VersionStamp;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.offheap.annotations.Released;
+import org.apache.geode.internal.sequencelog.EntryLogger;
+
+/**
+ * RegionMap Destroy operation.
+ *
+ * <p>
+ * Extracted from AbstractRegionMap.
+ */
+public class RegionMapDestroy {
+
+  private static final Logger logger = LogService.getLogger();
+
+  static Runnable testHookRunnableForConcurrentOperation;
+
+  private final InternalRegion internalRegion;
+  private final FocusedRegionMap focusedRegionMap;
+  private final CacheModificationLock cacheModificationLock;
+
+  private EntryEventImpl event;
+  private boolean inTokenMode;
+  private boolean duringRI;
+  private boolean cacheWrite;
+  private boolean isEviction;
+  private Object expectedOldValue;
+  private boolean removeRecoveredEntry;
+
+  private boolean retry = true;
+  private boolean opCompleted = false;
+  private boolean doPart3 = false;
+  private boolean retainForConcurrency = false;
+
+  public RegionMapDestroy(InternalRegion internalRegion, FocusedRegionMap focusedRegionMap,
+      CacheModificationLock cacheModificationLock) {
+    this.internalRegion = internalRegion;
+    this.focusedRegionMap = focusedRegionMap;
+    this.cacheModificationLock = cacheModificationLock;
+  }
+
+  public boolean destroy(EntryEventImpl event, boolean inTokenMode, boolean duringRI,
+      boolean cacheWrite, boolean isEviction, Object expectedOldValue, boolean removeRecoveredEntry)
+      throws CacheWriterException, EntryNotFoundException, TimeoutException {
+
+    if (internalRegion == null) {
+      Assert.assertTrue(false, "The internalRegion for RegionMap " + this // "fix" for bug 32440
+          + " is null for event " + event);
+    }
+
+    this.event = event;
+    this.inTokenMode = inTokenMode;
+    this.duringRI = duringRI;
+    this.cacheWrite = cacheWrite;
+    this.isEviction = isEviction;
+    this.expectedOldValue = expectedOldValue;
+    this.removeRecoveredEntry = removeRecoveredEntry;
+
+    cacheModificationLock.lockForCacheModification(internalRegion, event);
+    try {
+
+      while (retry) {
+        retry = false;
+
+        opCompleted = false;
+
+        // We need to acquire the region entry while holding the lock to avoid #45620.
+        // The outer try/finally ensures that the lock will be released without fail.
+        // I'm avoiding indenting just to preserve the ability
+        // to track diffs since the code is fairly complex.
+
+        RegionEntry regionEntry = focusedRegionMap.getEntry(event);
+        RegionEntry tombstone = null;
+        boolean haveTombstone = false;
+        /*
+         * Execute the test hook runnable inline (not threaded) if it is not null.
+         */
+        if (null != testHookRunnableForConcurrentOperation) {
+          testHookRunnableForConcurrentOperation.run();
+        }
+
+        try {
+          if (logger.isTraceEnabled(LogMarker.LRU_TOMBSTONE_COUNT)
+              && !(internalRegion instanceof HARegion)) {
+            logger.trace(LogMarker.LRU_TOMBSTONE_COUNT,
+                "ARM.destroy() inTokenMode={}; duringRI={}; riLocalDestroy={}; withRepl={}; fromServer={}; concurrencyEnabled={}; isOriginRemote={}; isEviction={}; operation={}; re={}",
+                inTokenMode, duringRI, event.isFromRILocalDestroy(),
+                internalRegion.getDataPolicy().withReplication(), event.isFromServer(),
+                internalRegion.getConcurrencyChecksEnabled(), event.isOriginRemote(), isEviction,
+                event.getOperation(), regionEntry);
+          }
+          if (event.isFromRILocalDestroy()) {
+            // for RI local-destroy we don't want to keep tombstones.
+            // In order to simplify things we just set this recovery
+            // flag to true to force the entry to be removed
+            removeRecoveredEntry = true;
+          }
+          // the logic in this method is already very involved, and adding tombstone
+          // permutations to (re != null) greatly complicates it. So, we check
+          // for a tombstone here and, if found, pretend for a bit that the entry is null
+          if (regionEntry != null && regionEntry.isTombstone() && !removeRecoveredEntry) {
+            tombstone = regionEntry;
+            haveTombstone = true;
+            regionEntry = null;
+          }
+          IndexManager oqlIndexManager = internalRegion.getIndexManager();
+          if (regionEntry == null) {
+            // we need to create an entry if in token mode or if we've received
+            // a destroy from a peer or WAN gateway and we need to retain version
+            // information for concurrency checks
+            retainForConcurrency = (!haveTombstone
+                && (internalRegion.getDataPolicy().withReplication() || event.isFromServer())
+                && internalRegion.getConcurrencyChecksEnabled()
+                && (event.isOriginRemote() /* destroy received from other must create tombstone */
+                    || event.isFromWANAndVersioned() /* wan event must create a tombstone */
+                    || event.isBridgeEvent())); /*
+                                                 * event from client must create a tombstone so
+                                                 * client has a version #
+                                                 */
+            if (inTokenMode || retainForConcurrency) {
+              // removeRecoveredEntry should be false in this case
+              RegionEntry newRegionEntry = focusedRegionMap.getEntryFactory()
+                  .createEntry(internalRegion, event.getKey(), Token.REMOVED_PHASE1);
+              // Fix for Bug #44431. We do NOT want to update the region and wait
+              // later for index INIT as region.clear() can cause inconsistency if
+              // happened in parallel as it also does index INIT.
+              if (oqlIndexManager != null) {
+                oqlIndexManager.waitForIndexInit();
+              }
+              try {
+                synchronized (newRegionEntry) {
+                  RegionEntry oldRegionEntry =
+                      focusedRegionMap.putEntryIfAbsent(event.getKey(), newRegionEntry);
+                  // what is this doing?
+                  while (!opCompleted && oldRegionEntry != null) {
+                    synchronized (oldRegionEntry) {
+                      if (oldRegionEntry.isRemovedPhase2()) {
+                        internalRegion.getCachePerfStats().incRetries();
+                        focusedRegionMap.getEntryMap().remove(event.getKey(), oldRegionEntry);
+                        oldRegionEntry =
+                            focusedRegionMap.putEntryIfAbsent(event.getKey(), newRegionEntry);
+                      } else {
+                        event.setRegionEntry(oldRegionEntry);
+
+                        // Last transaction related eviction check. This should
+                        // prevent
+                        // transaction conflict (caused by eviction) when the entry
+                        // is being added to transaction state.
+                        if (isEviction) {
+                          if (!focusedRegionMap.confirmEvictionDestroy(oldRegionEntry)) {
+                            opCompleted = false;
+                            return opCompleted;
+                          }
+                        }
+                        try {
+                          // if concurrency checks are enabled, destroy will set the version tag
+                          boolean destroyed = destroyEntry(oldRegionEntry, event, inTokenMode,
+                              cacheWrite, expectedOldValue, false, removeRecoveredEntry);
+                          if (destroyed) {
+                            if (retainForConcurrency) {
+                              internalRegion.basicDestroyBeforeRemoval(oldRegionEntry, event);
+                            }
+                            internalRegion.basicDestroyPart2(oldRegionEntry, event, inTokenMode,
+                                false /* conflict with clear */, duringRI, true);
+                            focusedRegionMap.lruEntryDestroy(oldRegionEntry);
+                            doPart3 = true;
+                          }
+                        } catch (RegionClearedException rce) {
+                          // Ignore. The exception will ensure that we do not update
+                          // the LRU List
+                          internalRegion.basicDestroyPart2(oldRegionEntry, event, inTokenMode,
+                              true/* conflict with clear */, duringRI, true);
+                          doPart3 = true;
+                        } catch (ConcurrentCacheModificationException ccme) {
+                          // TODO: GEODE-3967: change will go here
+                          VersionTag tag = event.getVersionTag();
+                          if (tag != null && tag.isTimeStampUpdated()) {
+                            // Notify gateways of new time-stamp.
+                            internalRegion.notifyTimestampsToGateways(event);
+                          }
+                          throw ccme;
+                        }
+                        regionEntry = oldRegionEntry;
+                        opCompleted = true;
+                      }
+                    } // synchronized oldRegionEntry
+                  } // while
+                  if (!opCompleted) {
+                    // The following try has a finally that cleans up the newRegionEntry.
+                    // This is only needed if newRegionEntry was added to the map which only
+                    // happens if we didn't get completed with oldRegionEntry in the above while
+                    // loop.
+                    try {
+                      regionEntry = newRegionEntry;
+                      event.setRegionEntry(newRegionEntry);
+
+                      try {
+                        // if concurrency checks are enabled, destroy will set the version tag
+                        if (isEviction) {
+                          opCompleted = false;
+                          return opCompleted;
+                        }
+                        destroyEntryInternal(newRegionEntry, oldRegionEntry);
+                      } catch (RegionClearedException rce) {
+                        handleRegionClearedExceptionDuringDestroyEntryInternal(newRegionEntry);
+
+                      } catch (ConcurrentCacheModificationException ccme) {
+                        VersionTag tag = event.getVersionTag();
+                        if (tag != null && tag.isTimeStampUpdated()) {
+                          // Notify gateways of new time-stamp.
+                          internalRegion.notifyTimestampsToGateways(event);
+                        }
+                        throw ccme;
+                      }
+                      // Note no need for LRU work since the entry is destroyed
+                      // and will be removed when gii completes
+                    } finally {
+                      if (!opCompleted
+                          && !haveTombstone /* to fix bug 51583 do this for all operations */ ) {
+                        focusedRegionMap.removeEntry(event.getKey(), newRegionEntry, false);
+                      }
+                      if (!opCompleted && isEviction) {
+                        focusedRegionMap.removeEntry(event.getKey(), newRegionEntry, false);
+                      }
+                    }
+                  } // !opCompleted
+                } // synchronized newRegionEntry
+              } finally {
+                if (oqlIndexManager != null) {
+                  oqlIndexManager.countDownIndexUpdaters();
+                }
+              }
+            } // inTokenMode or tombstone creation
+            else {
+              if (!isEviction || internalRegion.getConcurrencyChecksEnabled()) {
+                // The following ensures that there is not a concurrent operation
+                // on the entry and leaves behind a tombstone if concurrencyChecksEnabled.
+                // It fixes bug #32467 by propagating the destroy to the server even though
+                // the entry isn't in the client
+                RegionEntry newRe = haveTombstone ? tombstone
+                    : focusedRegionMap.getEntryFactory().createEntry(internalRegion, event.getKey(),
+                        Token.REMOVED_PHASE1);
+                synchronized (newRe) {
+                  if (haveTombstone && !tombstone.isTombstone()) {
+                    // we have to check this again under synchronization since it may have changed
+                    retry = true;
+                    // retryEntry = tombstone; // leave this in place for debugging
+                    continue;
+                  }
+                  regionEntry = (RegionEntry) focusedRegionMap.getEntryMap()
+                      .putIfAbsent(event.getKey(), newRe);
+                  if (regionEntry != null && regionEntry != tombstone) {
+                    // concurrent change - try again
+                    retry = true;
+                    // retryEntry = tombstone; // leave this in place for debugging
+                    continue;
+                  } else if (!isEviction) {
+                    boolean throwex = false;
+                    EntryNotFoundException ex = null;
+                    try {
+                      if (!cacheWrite) {
+                        throwex = true;
+                      } else {
+                        try {
+                          if (!removeRecoveredEntry) {
+                            throwex =
+                                !internalRegion.bridgeWriteBeforeDestroy(event, expectedOldValue);
+                          }
+                        } catch (EntryNotFoundException e) {
+                          throwex = true;
+                          ex = e;
+                        }
+                      }
+                      if (throwex) {
+                        if (!event.isOriginRemote() && !event.getOperation().isLocal()
+                            && (event.isFromBridgeAndVersioned() || // if this is a replayed client
+                            // event that already has a
+                            // version
+                                event.isFromWANAndVersioned())) { // or if this is a WAN event that
+                          // has been applied in another
+                          // system
+                          // we must distribute these since they will update the version information
+                          // in peers
+                          if (logger.isDebugEnabled()) {
+                            logger.debug(
+                                "ARM.destroy is allowing wan/client destroy of {} to continue",
+                                event.getKey());
+                          }
+                          throwex = false;
+                          event.setIsRedestroyedEntry(true);
+                          // Distribution of this op happens on re and re might me null here before
+                          // distributing this destroy op.
+                          if (regionEntry == null) {
+                            regionEntry = newRe;
+                          }
+                          doPart3 = true;
+                        }
+                      }
+                      if (throwex) {
+                        if (ex == null) {
+                          // Fix for 48182, check cache state and/or region state before sending
+                          // entry not found.
+                          // this is from the server and any exceptions will propogate to the client
+                          internalRegion.checkEntryNotFound(event.getKey());
+                        } else {
+                          throw ex;
+                        }
+                      }
+                    } finally {
+                      // either remove the entry or leave a tombstone
+                      try {
+                        if (!event.isOriginRemote() && event.getVersionTag() != null
+                            && internalRegion.getConcurrencyChecksEnabled()) {
+                          // this shouldn't fail since we just created the entry.
+                          // it will either generate a tag or apply a server's version tag
+                          focusedRegionMap.processVersionTag(newRe, event);
+                          if (doPart3) {
+                            internalRegion.generateAndSetVersionTag(event, newRe);
+                          }
+                          try {
+                            internalRegion.recordEvent(event);
+                            newRe.makeTombstone(internalRegion, event.getVersionTag());
+                          } catch (RegionClearedException e) {
+                            // that's okay - when writing a tombstone into a disk, the
+                            // region has been cleared (including this tombstone)
+                          }
+                          opCompleted = true;
+                          // lruEntryCreate(newRe);
+                        } else if (!haveTombstone) {
+                          try {
+                            assert newRe != tombstone;
+                            newRe.setValue(internalRegion, Token.REMOVED_PHASE2);
+                            focusedRegionMap.removeEntry(event.getKey(), newRe, false);
+                          } catch (RegionClearedException e) {
+                            // that's okay - we just need to remove the new entry
+                          }
+                        } else if (event.getVersionTag() != null) { // haveTombstone - update the
+                          // tombstone version info
+                          focusedRegionMap.processVersionTag(tombstone, event);
+                          if (doPart3) {
+                            internalRegion.generateAndSetVersionTag(event, newRe);
+                          }
+                          // This is not conflict, we need to persist the tombstone again with new
+                          // version tag
+                          try {
+                            tombstone.setValue(internalRegion, Token.TOMBSTONE);
+                          } catch (RegionClearedException e) {
+                            // that's okay - when writing a tombstone into a disk, the
+                            // region has been cleared (including this tombstone)
+                          }
+                          internalRegion.recordEvent(event);
+                          internalRegion.rescheduleTombstone(tombstone, event.getVersionTag());
+                          internalRegion.basicDestroyPart2(tombstone, event, inTokenMode,
+                              true /* conflict with clear */, duringRI, true);
+                          opCompleted = true;
+                        } else {
+                          Assert.assertTrue(event.getVersionTag() == null);
+                          Assert.assertTrue(newRe == tombstone);
+                          event.setVersionTag(getVersionTagFromStamp(tombstone.getVersionStamp()));
+                        }
+                      } catch (ConcurrentCacheModificationException ccme) {
+                        VersionTag tag = event.getVersionTag();
+                        if (tag != null && tag.isTimeStampUpdated()) {
+                          // Notify gateways of new time-stamp.
+                          internalRegion.notifyTimestampsToGateways(event);
+                        }
+                        throw ccme;
+                      }
+                    }
+                  }
+                } // synchronized(newRe)
+              }
+            }
+          } // no current entry
+          else { // current entry exists
+            if (oqlIndexManager != null) {
+              oqlIndexManager.waitForIndexInit();
+            }
+            try {
+              synchronized (regionEntry) {
+                internalRegion.checkReadiness();
+                // if the entry is a tombstone and the event is from a peer or a client
+                // then we allow the operation to be performed so that we can update the
+                // version stamp. Otherwise we would retain an old version stamp and may allow
+                // an operation that is older than the destroy() to be applied to the cache
+                // Bug 45170: If removeRecoveredEntry, we treat tombstone as regular entry to be
+                // deleted
+                boolean createTombstoneForConflictChecks =
+                    (internalRegion.getConcurrencyChecksEnabled() && (event.isOriginRemote()
+                        || event.getContext() != null || removeRecoveredEntry));
+                if (!regionEntry.isRemoved() || createTombstoneForConflictChecks) {
+                  if (regionEntry.isRemovedPhase2()) {
+                    focusedRegionMap.getEntryMap().remove(event.getKey(), regionEntry);
+                    internalRegion.getCachePerfStats().incRetries();
+                    retry = true;
+                    continue;
+                  }
+                  if (!event.isOriginRemote() && event.getOperation().isExpiration()) {
+                    // If this expiration started locally then only do it if the RE is not being
+                    // used by a tx.
+                    if (regionEntry.isInUseByTransaction()) {
+                      opCompleted = false;
+                      return opCompleted;
+                    }
+                  }
+                  event.setRegionEntry(regionEntry);
+
+                  // See comment above about eviction checks
+                  if (isEviction) {
+                    assert expectedOldValue == null;
+                    if (!focusedRegionMap.confirmEvictionDestroy(regionEntry)) {
+                      opCompleted = false;
+                      return opCompleted;
+                    }
+                  }
+
+                  boolean removed = false;
+                  try {
+                    opCompleted = destroyEntry(regionEntry, event, inTokenMode, cacheWrite,
+                        expectedOldValue, false, removeRecoveredEntry);
+                    if (opCompleted) {
+                      // It is very, very important for Partitioned Regions to keep
+                      // the entry in the map until after distribution occurs so that other
+                      // threads performing a create on this entry wait until the destroy
+                      // distribution is finished.
+                      // keeping backup copies consistent. Fix for bug 35906.
+                      internalRegion.basicDestroyBeforeRemoval(regionEntry, event);
+
+                      // do this before basicDestroyPart2 to fix bug 31786
+                      if (!inTokenMode) {
+                        if (regionEntry.getVersionStamp() == null) {
+                          regionEntry.removePhase2();
+                          focusedRegionMap.removeEntry(event.getKey(), regionEntry, true, event,
+                              internalRegion);
+                          removed = true;
+                        }
+                      }
+                      if (inTokenMode && !duringRI) {
+                        event.inhibitCacheListenerNotification(true);
+                      }
+                      doPart3 = true;
+                      internalRegion.basicDestroyPart2(regionEntry, event, inTokenMode,
+                          false /* conflict with clear */, duringRI, true);
+                      focusedRegionMap.lruEntryDestroy(regionEntry);
+                    } else {
+                      if (!inTokenMode) {
+                        EntryLogger.logDestroy(event);
+                        internalRegion.recordEvent(event);
+                        if (regionEntry.getVersionStamp() == null) {
+                          regionEntry.removePhase2();
+                          focusedRegionMap.removeEntry(event.getKey(), regionEntry, true, event,
+                              internalRegion);
+                          focusedRegionMap.lruEntryDestroy(regionEntry);
+                        } else {
+                          if (regionEntry.isTombstone()) {
+                            // the entry is already a tombstone, but we're destroying it
+                            // again, so we need to reschedule the tombstone's expiration
+                            if (event.isOriginRemote()) {
+                              internalRegion.rescheduleTombstone(regionEntry,
+                                  regionEntry.getVersionStamp().asVersionTag());
+                            }
+                          }
+                        }
+                        focusedRegionMap.lruEntryDestroy(regionEntry);
+                        opCompleted = true;
+                      }
+                    }
+                  } catch (RegionClearedException rce) {
+                    // Ignore. The exception will ensure that we do not update
+                    // the LRU List
+                    opCompleted = true;
+                    internalRegion.recordEvent(event);
+                    if (inTokenMode && !duringRI) {
+                      event.inhibitCacheListenerNotification(true);
+                    }
+                    internalRegion.basicDestroyPart2(regionEntry, event, inTokenMode,
+                        true /* conflict with clear */, duringRI, true);
+                    doPart3 = true;
+                  } finally {
+                    internalRegion.checkReadiness();
+                    if (regionEntry.isRemoved() && !regionEntry.isTombstone()) {
+                      if (!removed) {
+                        focusedRegionMap.removeEntry(event.getKey(), regionEntry, true, event,
+                            internalRegion);
+                      }
+                    }
+                  }
+                } // !isRemoved
+                else { // already removed
+                  if (regionEntry.isTombstone() && event.getVersionTag() != null) {
+                    // if we're dealing with a tombstone and this is a remote event
+                    // (e.g., from cache client update thread) we need to update
+                    // the tombstone's version information
+                    // TODO use destroyEntry() here
+                    focusedRegionMap.processVersionTag(regionEntry, event);
+                    try {
+                      regionEntry.makeTombstone(internalRegion, event.getVersionTag());
+                    } catch (RegionClearedException e) {
+                      // that's okay - when writing a tombstone into a disk, the
+                      // region has been cleared (including this tombstone)
+                    }
+                  }
+                  if (expectedOldValue != null) {
+                    // if re is removed then there is no old value, so return false
+                    return false;
+                  }
+
+                  if (!inTokenMode && !isEviction) {
+                    internalRegion.checkEntryNotFound(event.getKey());
+                  }
+                }
+              } // synchronized re
+            } catch (ConcurrentCacheModificationException ccme) {
+              VersionTag tag = event.getVersionTag();
+              if (tag != null && tag.isTimeStampUpdated()) {
+                // Notify gateways of new time-stamp.
+                internalRegion.notifyTimestampsToGateways(event);
+              }
+              throw ccme;
+            } finally {
+              if (oqlIndexManager != null) {
+                oqlIndexManager.countDownIndexUpdaters();
+              }
+            }
+            // No need to call lruUpdateCallback since the only lru action
+            // we may have taken was lruEntryDestroy. This fixes bug 31759.
+
+          } // current entry exists
+          if (opCompleted) {
+            EntryLogger.logDestroy(event);
+          }
+          return opCompleted;
+        } finally {
+          try {
+            // If concurrency conflict is there and event contains gateway version tag then
+            // do NOT distribute.
+            if (event.isConcurrencyConflict()
+                && (event.getVersionTag() != null && event.getVersionTag().isGatewayTag())) {
+              doPart3 = false;
+            }
+            // distribution and listener notification
+            if (doPart3) {
+              internalRegion.basicDestroyPart3(regionEntry, event, inTokenMode, duringRI, true,
+                  expectedOldValue);
+            }
+          } finally {
+            if (opCompleted) {
+              if (regionEntry != null) {
+                // we only want to cancel if concurrency-check is not enabled
+                // re(regionentry) will be null when concurrency-check is enable and removeTombstone
+                // method
+                // will call cancelExpiryTask on regionEntry
+                internalRegion.cancelExpiryTask(regionEntry);
+              }
+            }
+          }
+        }
+
+      } // retry loop
+    } finally { // failsafe on the read lock...see comment above
+      cacheModificationLock.releaseCacheModificationLock(internalRegion, event);
+    }
+    return false;
+  }
+
+  private void handleRegionClearedExceptionDuringDestroyEntryInternal(RegionEntry newRegionEntry) {
+    // Ignore. The exception will ensure that we do not update the LRU List
+    opCompleted = true;
+    EntryLogger.logDestroy(event);
+    internalRegion.basicDestroyPart2(newRegionEntry, event, inTokenMode, true, duringRI, true);
+    doPart3 = true;
+  }
+
+  private void destroyEntryInternal(RegionEntry newRegionEntry, RegionEntry oldRegionEntry)
+      throws RegionClearedException {
+    opCompleted = destroyEntry(newRegionEntry, event, inTokenMode, cacheWrite, expectedOldValue,
+        true, removeRecoveredEntry);
+    if (opCompleted) {
+      // This is a new entry that was created because we are in
+      // token mode or are accepting a destroy operation by adding
+      // a tombstone. There is no oldValue, so we don't need to
+      // call updateSizeOnRemove
+      // internalRegion.recordEvent(event);
+      event.setIsRedestroyedEntry(true); // native clients need to know if the
+      // entry didn't exist
+      if (retainForConcurrency) {
+        internalRegion.basicDestroyBeforeRemoval(oldRegionEntry, event);
+      }
+      internalRegion.basicDestroyPart2(newRegionEntry, event, inTokenMode, false, duringRI, true);
+      doPart3 = true;
+    }
+  }
+
+  private boolean destroyEntry(RegionEntry re, EntryEventImpl event, boolean inTokenMode,
+      boolean cacheWrite, @Released Object expectedOldValue, boolean forceDestroy,
+      boolean removeRecoveredEntry) throws CacheWriterException, TimeoutException,
+      EntryNotFoundException, RegionClearedException {
+    focusedRegionMap.processVersionTag(re, event);
+    final int oldSize = internalRegion.calculateRegionEntryValueSize(re);
+    boolean retVal = re.destroy(event.getLocalRegion(), event, inTokenMode, cacheWrite,
+        expectedOldValue, forceDestroy, removeRecoveredEntry);
+    if (retVal) {
+      EntryLogger.logDestroy(event);
+      internalRegion.updateSizeOnRemove(event.getKey(), oldSize);
+    }
+    return retVal;
+  }
+
+  private VersionTag getVersionTagFromStamp(VersionStamp stamp) {
+    VersionTag tag = VersionTag.create(stamp.getMemberID());
+    tag.setEntryVersion(stamp.getEntryVersion());
+    tag.setRegionVersion(stamp.getRegionVersion());
+    tag.setVersionTimeStamp(stamp.getVersionTimeStamp());
+    tag.setDistributedSystemId(stamp.getDistributedSystemId());
+    return tag;
+  }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskRecoveryStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskRecoveryStore.java
index 9a3890c..a9ce682 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskRecoveryStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskRecoveryStore.java
@@ -52,8 +52,6 @@ public interface DiskRecoveryStore {
 
   public int calculateValueSize(Object val);
 
-  public int calculateRegionEntryValueSize(RegionEntry re);
-
   public RegionMap getRegionMap();
 
   public void handleDiskAccessException(DiskAccessException dae);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/ConcurrentMapWithReusableEntries.java b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/ConcurrentMapWithReusableEntries.java
new file mode 100644
index 0000000..a1f9b05
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/ConcurrentMapWithReusableEntries.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.util.concurrent;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+public interface ConcurrentMapWithReusableEntries<K, V> extends ConcurrentMap<K, V> {
+
+  /**
+   * Returns a {@link Set} view of the mappings contained in this map. The set is backed by the map,
+   * so changes to the map are reflected in the set, and vice-versa. The set supports element
+   * removal, which removes the corresponding mapping from the map, via the
+   * <tt>Iterator.remove</tt>, <tt>Set.remove</tt>, <tt>removeAll</tt>, <tt>retainAll</tt>, and
+   * <tt>clear</tt> operations. It does not support the <tt>add</tt> or <tt>addAll</tt> operations.
+   *
+   * <p>
+   * The view's <tt>iterator</tt> is a "weakly consistent" iterator that will never throw
+   * {@link java.util.ConcurrentModificationException}, and guarantees to traverse elements as they
+   * existed upon construction of the iterator, and may (but is not guaranteed to) reflect any
+   * modifications subsequent to construction.
+   *
+   * <p>
+   * This set provides entries that are reused during iteration so caller cannot store the returned
+   * <code>Map.Entry</code> objects.
+   */
+  Set<Map.Entry<K, V>> entrySetWithReusableEntries();
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java
index ed92f65..82756da 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java
@@ -118,7 +118,7 @@ import org.apache.geode.internal.util.ArrayUtils;
  * @param <V> the type of mapped values
  */
 public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
-    implements ConcurrentMap<K, V>, Serializable {
+    implements ConcurrentMapWithReusableEntries<K, V>, Serializable {
 
   private static final long serialVersionUID = -7056732555635108300L;
 
@@ -1892,23 +1892,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
 
   // GemStone addition
 
-  /**
-   * Returns a {@link Set} view of the mappings contained in this map. The set is backed by the map,
-   * so changes to the map are reflected in the set, and vice-versa. The set supports element
-   * removal, which removes the corresponding mapping from the map, via the
-   * <tt>Iterator.remove</tt>, <tt>Set.remove</tt>, <tt>removeAll</tt>, <tt>retainAll</tt>, and
-   * <tt>clear</tt> operations. It does not support the <tt>add</tt> or <tt>addAll</tt> operations.
-   *
-   * <p>
-   * The view's <tt>iterator</tt> is a "weakly consistent" iterator that will never throw
-   * {@link java.util.ConcurrentModificationException}, and guarantees to traverse elements as they
-   * existed upon construction of the iterator, and may (but is not guaranteed to) reflect any
-   * modifications subsequent to construction.
-   *
-   * <p>
-   * This set provides entries that are reused during iteration so caller cannot store the returned
-   * <code>Map.Entry</code> objects.
-   */
+  @Override
   public Set<Map.Entry<K, V>> entrySetWithReusableEntries() {
     final Set<Map.Entry<K, V>> es = this.reusableEntrySet;
     return (es != null) ? es : (this.reusableEntrySet = new EntrySet(true));
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ARMLockTestHookAdapter.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ARMLockTestHookAdapter.java
index 4e9cd9c..afebe40 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ARMLockTestHookAdapter.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ARMLockTestHookAdapter.java
@@ -15,28 +15,26 @@
 package org.apache.geode.internal.cache;
 
 import java.io.Serializable;
-import java.util.concurrent.CountDownLatch;
 
 import org.apache.geode.cache.CacheEvent;
-import org.apache.geode.test.dunit.VM;
 
 public class ARMLockTestHookAdapter implements AbstractRegionMap.ARMLockTestHook, Serializable {
 
-  public void beforeBulkLock(LocalRegion region) {};
+  public void beforeBulkLock(InternalRegion region) {};
 
-  public void afterBulkLock(LocalRegion region) {};
+  public void afterBulkLock(InternalRegion region) {};
 
-  public void beforeBulkRelease(LocalRegion region) {};
+  public void beforeBulkRelease(InternalRegion region) {};
 
-  public void afterBulkRelease(LocalRegion region) {};
+  public void afterBulkRelease(InternalRegion region) {};
 
-  public void beforeLock(LocalRegion region, CacheEvent event) {};
+  public void beforeLock(InternalRegion region, CacheEvent event) {};
 
-  public void afterLock(LocalRegion region, CacheEvent event) {};
+  public void afterLock(InternalRegion region, CacheEvent event) {};
 
-  public void beforeRelease(LocalRegion region, CacheEvent event) {};
+  public void beforeRelease(InternalRegion region, CacheEvent event) {};
 
-  public void afterRelease(LocalRegion region, CacheEvent event) {};
+  public void afterRelease(InternalRegion region, CacheEvent event) {};
 
   public void beforeStateFlushWait() {}
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionJUnitTest.java
index de34733..8bb7106 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionJUnitTest.java
@@ -14,48 +14,15 @@
  */
 package org.apache.geode.internal.cache;
 
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.spy;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.Statistics;
-import org.apache.geode.cache.AttributesMutator;
-import org.apache.geode.cache.CacheLoaderException;
-import org.apache.geode.cache.CacheWriterException;
-import org.apache.geode.cache.EntryExistsException;
-import org.apache.geode.cache.EntryNotFoundException;
-import org.apache.geode.cache.InterestResultPolicy;
 import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionAttributes;
-import org.apache.geode.cache.RegionExistsException;
-import org.apache.geode.cache.TimeoutException;
-import org.apache.geode.cache.client.internal.ServerRegionProxy;
-import org.apache.geode.cache.query.FunctionDomainException;
-import org.apache.geode.cache.query.NameResolutionException;
-import org.apache.geode.cache.query.QueryInvocationTargetException;
-import org.apache.geode.cache.query.SelectResults;
-import org.apache.geode.cache.query.TypeMismatchException;
-import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.eviction.EvictionController;
 import org.apache.geode.internal.cache.extension.ExtensionPoint;
 import org.apache.geode.internal.cache.extension.SimpleExtensionPoint;
-import org.apache.geode.internal.cache.versions.RegionVersionVector;
-import org.apache.geode.internal.cache.versions.VersionSource;
-import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 /**
@@ -74,632 +41,9 @@ public class AbstractRegionJUnitTest {
    * rest.
    */
   @Test
-  public void testGetExtensionPoint() {
-    // final Cache cache = new MockCache();
-    final AbstractRegion region = new MockRegion(null, 0, false, 0, 0);
-    final ExtensionPoint<Region<?, ?>> extensionPoint = region.getExtensionPoint();
-    assertNotNull(extensionPoint);
-    assertEquals(extensionPoint.getClass(), SimpleExtensionPoint.class);
-  }
-
-  @SuppressWarnings("rawtypes")
-  private static class MockRegion extends AbstractRegion {
-
-    /**
-     * @see AbstractRegion#AbstractRegion(InternalCache, int, boolean, long, long)
-     */
-    @SuppressWarnings("deprecation")
-    private MockRegion(GemFireCacheImpl cache, int serialNumber, boolean isPdxTypeRegion,
-        long lastAccessedTime, long lastModifiedTime) {
-      super(cache, serialNumber, isPdxTypeRegion, lastAccessedTime, lastModifiedTime);
-    }
-
-    @Override
-    public String getName() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public String getFullPath() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Region getParentRegion() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public RegionAttributes getAttributes() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public AttributesMutator getAttributesMutator() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void invalidateRegion(Object aCallbackArgument) throws TimeoutException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void localInvalidateRegion(Object aCallbackArgument) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void destroyRegion(Object aCallbackArgument)
-        throws CacheWriterException, TimeoutException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void localDestroyRegion(Object aCallbackArgument) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void close() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void saveSnapshot(OutputStream outputStream) throws IOException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void loadSnapshot(InputStream inputStream)
-        throws IOException, ClassNotFoundException, CacheWriterException, TimeoutException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Region getSubregion(String path) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Region createSubregion(String subregionName, RegionAttributes aRegionAttributes)
-        throws RegionExistsException, TimeoutException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Set subregions(boolean recursive) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Entry getEntry(Object key) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Object put(Object key, Object value, Object aCallbackArgument)
-        throws TimeoutException, CacheWriterException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void create(Object key, Object value, Object aCallbackArgument)
-        throws TimeoutException, EntryExistsException, CacheWriterException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void invalidate(Object key, Object aCallbackArgument)
-        throws TimeoutException, EntryNotFoundException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void localInvalidate(Object key, Object aCallbackArgument)
-        throws EntryNotFoundException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Object destroy(Object key, Object aCallbackArgument)
-        throws TimeoutException, EntryNotFoundException, CacheWriterException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void localDestroy(Object key, Object aCallbackArgument) throws EntryNotFoundException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Set keySet() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Collection values() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Set entrySet(boolean recursive) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Object getUserAttribute() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void setUserAttribute(Object value) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean isDestroyed() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean containsValueForKey(Object key) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean containsKey(Object key) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Lock getRegionDistributedLock() throws IllegalStateException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Lock getDistributedLock(Object key) throws IllegalStateException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void writeToDisk() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public SelectResults query(String queryPredicate) throws FunctionDomainException,
-        TypeMismatchException, NameResolutionException, QueryInvocationTargetException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void forceRolling() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean containsValue(Object value) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Set entrySet() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean isEmpty() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void putAll(Map map) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void putAll(Map map, Object aCallbackArgument) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void removeAll(Collection keys) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void removeAll(Collection keys, Object aCallbackArgument) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Object remove(Object key) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int size() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void registerInterest(Object key) {
-      throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public void registerInterest(Object key, InterestResultPolicy policy) {
-      throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public void registerInterestRegex(String regex) {
-      throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public void registerInterestRegex(String regex, InterestResultPolicy policy) {
-      throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public void unregisterInterest(Object key) {
-      throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public void unregisterInterestRegex(String regex) {
-      throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public List getInterestList() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void registerInterest(Object key, boolean isDurable) {
-      throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public void registerInterest(Object key, boolean isDurable, boolean receiveValues) {
-      throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public void registerInterest(Object key, InterestResultPolicy policy, boolean isDurable,
-        boolean receiveValues) {
-      throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public void registerInterest(Object key, InterestResultPolicy policy, boolean isDurable) {
-      throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public void registerInterestRegex(String regex, boolean isDurable) {
-      throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public void registerInterestRegex(String regex, boolean isDurable, boolean receiveValues) {
-      throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public void registerInterestRegex(String regex, InterestResultPolicy policy,
-        boolean isDurable) {
-      throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public void registerInterestRegex(String regex, InterestResultPolicy policy, boolean isDurable,
-        boolean receiveValues) {
-      throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public List getInterestListRegex() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Set keySetOnServer() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean containsKeyOnServer(Object key) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int sizeOnServer() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean isEmptyOnServer() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Object putIfAbsent(Object key, Object value) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean remove(Object key, Object value) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean replace(Object key, Object oldValue, Object newValue) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Object replace(Object key, Object value) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int[] getDiskDirSizes() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Version[] getSerializationVersions() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public CachePerfStats getCachePerfStats() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public RegionEntry getRegionEntry(final Object key) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public RegionVersionVector getVersionVector() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Object getValueInVM(Object key) throws EntryNotFoundException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Object getValueOnDisk(Object key) throws EntryNotFoundException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void dispatchListenerEvent(EnumListenerEvent op, InternalCacheEvent event) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean isUsedForPartitionedRegionAdmin() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public ImageState getImageState() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public VersionSource getVersionMember() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public long updateStatsForPut(RegionEntry entry, long lastModified, boolean lruRecentUse) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public FilterProfile getFilterProfile() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public ServerRegionProxy getServerProxy() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void unscheduleTombstone(RegionEntry entry) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void scheduleTombstone(RegionEntry entry, VersionTag destroyedVersion) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void addExpiryTaskIfAbsent(RegionEntry entry) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void generateAndSetVersionTag(InternalCacheEvent event, RegionEntry entry) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean cacheWriteBeforeDestroy(EntryEventImpl event, Object expectedOldValue)
-        throws CacheWriterException, EntryNotFoundException, TimeoutException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void recordEvent(InternalCacheEvent event) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean isThisRegionBeingClosedOrDestroyed() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public DiskRegion getDiskRegion() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public CancelCriterion getCancelCriterion() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int updateSizeOnEvict(Object key, int oldSize) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    Object get(Object key, Object aCallbackArgument, boolean generateCallbacks,
-        EntryEventImpl clientEvent) throws TimeoutException, CacheLoaderException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    void basicClear(RegionEventImpl regionEvent) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    boolean generateEventID() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected InternalDistributedMember getMyId() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    void basicLocalClear(RegionEventImpl rEvent) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    Map basicGetAll(Collection keys, Object callback) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public RegionEntry basicGetEntry(Object key) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void invokePutCallbacks(EnumListenerEvent eventType, EntryEventImpl event,
-        boolean callDispatchListenerEvent, boolean notifyGateways) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void invokeDestroyCallbacks(EnumListenerEvent eventType, EntryEventImpl event,
-        boolean callDispatchListenerEvent, boolean notifyGateways) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void invokeInvalidateCallbacks(EnumListenerEvent eventType, EntryEventImpl event,
-        boolean callDispatchListenerEvent) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public long getTotalEvictions() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Region createSubregion(String subregionName, RegionAttributes attrs,
-        InternalRegionArguments internalRegionArgs)
-        throws RegionExistsException, TimeoutException, IOException, ClassNotFoundException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected boolean isCurrentlyLockGrantor() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public File[] getDiskDirs() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    void checkReadiness() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean supportsConcurrencyChecks() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void addCacheServiceProfile(CacheServiceProfile profile) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void setEvictionMaximum(int maximum) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Statistics getEvictionStatistics() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public long getEvictionCounter() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public RegionMap getRegionMap() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public EvictionController getExistingController(InternalRegionArguments internalArgs) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public String getNameForStats() {
-      throw new UnsupportedOperationException();
-    }
+  public void extensionPointIsSimpleExtensionPointByDefault() {
+    AbstractRegion region = spy(AbstractRegion.class);
+    ExtensionPoint<Region<?, ?>> extensionPoint = region.getExtensionPoint();
+    assertThat(extensionPoint).isNotNull().isInstanceOf(SimpleExtensionPoint.class);
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
index 6885bfd..d3053b0 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
@@ -30,15 +30,14 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import org.junit.After;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.cache.CacheWriterException;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.Operation;
-import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.eviction.EvictableEntry;
 import org.apache.geode.internal.cache.eviction.EvictionController;
@@ -46,6 +45,7 @@ import org.apache.geode.internal.cache.eviction.EvictionCounters;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.cache.versions.VersionHolder;
 import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.internal.util.concurrent.ConcurrentMapWithReusableEntries;
 import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap;
 import org.apache.geode.test.junit.categories.UnitTest;
 
@@ -54,6 +54,11 @@ public class AbstractRegionMapTest {
 
   private static final Object KEY = "key";
 
+  @After
+  public void tearDown() {
+    AbstractRegionMap.FORCE_INVALIDATE_EVENT = false;
+  }
+
   @Test
   public void shouldBeMockable() throws Exception {
     AbstractRegionMap mockAbstractRegionMap = mock(AbstractRegionMap.class);
@@ -83,19 +88,14 @@ public class AbstractRegionMapTest {
   @Test
   public void invalidateOfNonExistentRegionThrowsEntryNotFoundWithForce() {
     AbstractRegionMap.FORCE_INVALIDATE_EVENT = true;
-    try {
-      TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
-      EntryEventImpl event = createEventForInvalidate(arm._getOwner());
-      when(arm._getOwner().isInitialized()).thenReturn(true);
+    TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
+    EntryEventImpl event = createEventForInvalidate(arm._getOwner());
+    when(arm._getOwner().isInitialized()).thenReturn(true);
 
-      assertThatThrownBy(() -> arm.invalidate(event, true, false, false))
-          .isInstanceOf(EntryNotFoundException.class);
-      verify(arm._getOwner(), never()).basicInvalidatePart2(any(), any(), anyBoolean(),
-          anyBoolean());
-      verify(arm._getOwner(), times(1)).invokeInvalidateCallbacks(any(), any(), anyBoolean());
-    } finally {
-      AbstractRegionMap.FORCE_INVALIDATE_EVENT = false;
-    }
+    assertThatThrownBy(() -> arm.invalidate(event, true, false, false))
+        .isInstanceOf(EntryNotFoundException.class);
+    verify(arm._getOwner(), never()).basicInvalidatePart2(any(), any(), anyBoolean(), anyBoolean());
+    verify(arm._getOwner(), times(1)).invokeInvalidateCallbacks(any(), any(), anyBoolean());
   }
 
   @Test
@@ -117,33 +117,18 @@ public class AbstractRegionMapTest {
   @Test
   public void invalidateOfAlreadyInvalidEntryReturnsFalseWithForce() {
     AbstractRegionMap.FORCE_INVALIDATE_EVENT = true;
-    try {
-      TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
-      EntryEventImpl event = createEventForInvalidate(arm._getOwner());
-
-      // invalidate on region that is not initialized should create
-      // entry in map as invalid.
-      assertThatThrownBy(() -> arm.invalidate(event, true, false, false))
-          .isInstanceOf(EntryNotFoundException.class);
-
-      when(arm._getOwner().isInitialized()).thenReturn(true);
-      assertFalse(arm.invalidate(event, true, false, false));
-      verify(arm._getOwner(), never()).basicInvalidatePart2(any(), any(), anyBoolean(),
-          anyBoolean());
-      verify(arm._getOwner(), times(1)).invokeInvalidateCallbacks(any(), any(), anyBoolean());
-    } finally {
-      AbstractRegionMap.FORCE_INVALIDATE_EVENT = false;
-    }
-  }
+    TestableAbstractRegionMap arm = new TestableAbstractRegionMap();
+    EntryEventImpl event = createEventForInvalidate(arm._getOwner());
 
-  private EntryEventImpl createEventForInvalidate(LocalRegion lr) {
-    when(lr.getKeyInfo(KEY)).thenReturn(new KeyInfo(KEY, null, null));
-    return EntryEventImpl.create(lr, Operation.INVALIDATE, KEY, false, null, true, false);
-  }
+    // invalidate on region that is not initialized should create
+    // entry in map as invalid.
+    assertThatThrownBy(() -> arm.invalidate(event, true, false, false))
+        .isInstanceOf(EntryNotFoundException.class);
 
-  private EntryEventImpl createEventForDestroy(LocalRegion lr) {
-    when(lr.getKeyInfo(KEY)).thenReturn(new KeyInfo(KEY, null, null));
-    return EntryEventImpl.create(lr, Operation.DESTROY, KEY, false, null, true, false);
+    when(arm._getOwner().isInitialized()).thenReturn(true);
+    assertFalse(arm.invalidate(event, true, false, false));
+    verify(arm._getOwner(), never()).basicInvalidatePart2(any(), any(), anyBoolean(), anyBoolean());
+    verify(arm._getOwner(), times(1)).invokeInvalidateCallbacks(any(), any(), anyBoolean());
   }
 
   @Test
@@ -205,8 +190,8 @@ public class AbstractRegionMapTest {
     final boolean duringRI = false;
     assertThat(arm.destroy(event, inTokenMode, duringRI, false, false, expectedOldValue, false))
         .isTrue();
-    assertThat(arm._getMap().containsKey(event.getKey())).isTrue();
-    RegionEntry re = (RegionEntry) arm._getMap().get(event.getKey());
+    assertThat(arm.getEntryMap().containsKey(event.getKey())).isTrue();
+    RegionEntry re = (RegionEntry) arm.getEntryMap().get(event.getKey());
     assertThat(re.getValueAsToken()).isEqualTo(Token.DESTROYED);
     boolean invokeCallbacks = true;
     verify(arm._getOwner(), times(1)).basicDestroyPart2(any(), eq(event), eq(inTokenMode),
@@ -250,7 +235,7 @@ public class AbstractRegionMapTest {
     final boolean evict = true;
     assertThat(arm.destroy(event, inTokenMode, duringRI, false, evict, expectedOldValue, false))
         .isFalse();
-    assertThat(arm._getMap().containsKey(event.getKey())).isFalse();
+    assertThat(arm.getEntryMap().containsKey(event.getKey())).isFalse();
     verify(arm._getOwner(), never()).basicDestroyPart2(any(), any(), anyBoolean(), anyBoolean(),
         anyBoolean(), anyBoolean());
     verify(arm._getOwner(), never()).basicDestroyPart3(any(), any(), anyBoolean(), anyBoolean(),
@@ -268,8 +253,8 @@ public class AbstractRegionMapTest {
     final boolean evict = true;
     assertThat(arm.destroy(event, inTokenMode, duringRI, false, evict, expectedOldValue, false))
         .isTrue();
-    assertThat(arm._getMap().containsKey(event.getKey())).isTrue();
-    RegionEntry re = (RegionEntry) arm._getMap().get(event.getKey());
+    assertThat(arm.getEntryMap().containsKey(event.getKey())).isTrue();
+    RegionEntry re = (RegionEntry) arm.getEntryMap().get(event.getKey());
     assertThat(re.getValueAsToken()).isEqualTo(Token.DESTROYED);
     boolean invokeCallbacks = true;
     verify(arm._getOwner(), times(1)).basicDestroyPart2(any(), eq(event), eq(inTokenMode),
@@ -424,8 +409,8 @@ public class AbstractRegionMapTest {
     final boolean duringRI = false;
     assertThat(arm.destroy(event, inTokenMode, duringRI, false, false, expectedOldValue, false))
         .isTrue();
-    assertThat(arm._getMap().containsKey(event.getKey())).isTrue();
-    RegionEntry re = (RegionEntry) arm._getMap().get(event.getKey());
+    assertThat(arm.getEntryMap().containsKey(event.getKey())).isTrue();
+    RegionEntry re = (RegionEntry) arm.getEntryMap().get(event.getKey());
     assertThat(re.getValueAsToken()).isEqualTo(Token.DESTROYED);
     boolean invokeCallbacks = true;
     verify(arm._getOwner(), times(1)).basicDestroyPart2(any(), eq(event), eq(inTokenMode),
@@ -449,8 +434,8 @@ public class AbstractRegionMapTest {
     final boolean duringRI = false;
     assertThat(arm.destroy(event, inTokenMode, duringRI, false, false, expectedOldValue, false))
         .isTrue();
-    assertThat(arm._getMap().containsKey(event.getKey())).isTrue();
-    RegionEntry re = (RegionEntry) arm._getMap().get(event.getKey());
+    assertThat(arm.getEntryMap().containsKey(event.getKey())).isTrue();
+    RegionEntry re = (RegionEntry) arm.getEntryMap().get(event.getKey());
     // why not DESTROY token?
     assertThat(re.getValueAsToken()).isEqualTo(Token.TOMBSTONE);
     // since it was already destroyed why do we do the parts?
@@ -495,7 +480,7 @@ public class AbstractRegionMapTest {
     final boolean removeRecoveredEntry = true;
     assertThat(arm.destroy(event, inTokenMode, duringRI, false, false, expectedOldValue,
         removeRecoveredEntry)).isTrue();
-    assertThat(arm._getMap().containsKey(event.getKey())).isFalse();
+    assertThat(arm.getEntryMap().containsKey(event.getKey())).isFalse();
     boolean invokeCallbacks = true;
     verify(arm._getOwner(), times(1)).basicDestroyPart2(any(), eq(event), eq(inTokenMode),
         eq(false), eq(duringRI), eq(invokeCallbacks));
@@ -531,7 +516,7 @@ public class AbstractRegionMapTest {
     final boolean duringRI = false;
     assertThat(arm.destroy(event, inTokenMode, duringRI, false, false, expectedOldValue, false))
         .isTrue();
-    assertThat(arm._getMap().containsKey(event.getKey())).isFalse();
+    assertThat(arm.getEntryMap().containsKey(event.getKey())).isFalse();
     boolean invokeCallbacks = true;
     verify(arm._getOwner(), times(1)).basicDestroyPart2(any(), eq(event), eq(inTokenMode),
         eq(false), eq(duringRI), eq(invokeCallbacks));
@@ -551,7 +536,7 @@ public class AbstractRegionMapTest {
         .isTrue();
     // This might be a bug. It seems like we should have created a tombstone but we have no
     // version tag so that might be the cause of this bug.
-    assertThat(arm._getMap().containsKey(event.getKey())).isFalse();
+    assertThat(arm.getEntryMap().containsKey(event.getKey())).isFalse();
     boolean invokeCallbacks = true;
     verify(arm._getOwner(), times(1)).basicDestroyPart2(any(), eq(event), eq(inTokenMode),
         eq(false), eq(duringRI), eq(invokeCallbacks));
@@ -574,8 +559,8 @@ public class AbstractRegionMapTest {
     final boolean duringRI = false;
     assertThat(arm.destroy(event, inTokenMode, duringRI, false, false, expectedOldValue, false))
         .isTrue();
-    assertThat(arm._getMap().containsKey(event.getKey())).isTrue();
-    RegionEntry re = (RegionEntry) arm._getMap().get(event.getKey());
+    assertThat(arm.getEntryMap().containsKey(event.getKey())).isTrue();
+    RegionEntry re = (RegionEntry) arm.getEntryMap().get(event.getKey());
     assertThat(re.getValueAsToken()).isEqualTo(Token.TOMBSTONE);
     boolean invokeCallbacks = true;
     verify(arm._getOwner(), times(1)).basicDestroyPart2(any(), eq(event), eq(inTokenMode),
@@ -600,8 +585,8 @@ public class AbstractRegionMapTest {
     final boolean evict = true;
     assertThat(arm.destroy(event, inTokenMode, duringRI, false, evict, expectedOldValue, false))
         .isTrue();
-    assertThat(arm._getMap().containsKey(event.getKey())).isTrue();
-    RegionEntry re = (RegionEntry) arm._getMap().get(event.getKey());
+    assertThat(arm.getEntryMap().containsKey(event.getKey())).isTrue();
+    RegionEntry re = (RegionEntry) arm.getEntryMap().get(event.getKey());
     assertThat(re.getValueAsToken()).isEqualTo(Token.TOMBSTONE);
     boolean invokeCallbacks = true;
     verify(arm._getOwner(), times(1)).basicDestroyPart2(any(), eq(event), eq(inTokenMode),
@@ -610,16 +595,6 @@ public class AbstractRegionMapTest {
         eq(duringRI), eq(invokeCallbacks), eq(expectedOldValue));
   }
 
-
-  private void addEntry(AbstractRegionMap arm) {
-    addEntry(arm, "value");
-  }
-
-  private void addEntry(AbstractRegionMap arm, Object value) {
-    RegionEntry entry = arm.getEntryFactory().createEntry(arm._getOwner(), KEY, value);
-    arm._getMap().put(KEY, entry);
-  }
-
   @Test
   public void destroyWithEmptyRegionWithConcurrencyChecksThrowsException() {
     TestableAbstractRegionMap arm = new TestableAbstractRegionMap(true);
@@ -639,8 +614,8 @@ public class AbstractRegionMapTest {
         anyBoolean(), any());
     // This seems to be a bug. We should not leave an entry in the map
     // added by the destroy call if destroy returns false.
-    assertThat(arm._getMap().containsKey(event.getKey())).isTrue();
-    RegionEntry re = (RegionEntry) arm._getMap().get(event.getKey());
+    assertThat(arm.getEntryMap().containsKey(event.getKey())).isTrue();
+    RegionEntry re = (RegionEntry) arm.getEntryMap().get(event.getKey());
     assertThat(re.getValueAsToken()).isEqualTo(Token.REMOVED_PHASE1);
   }
 
@@ -653,7 +628,7 @@ public class AbstractRegionMapTest {
         anyBoolean(), anyBoolean());
     verify(arm._getOwner(), never()).basicDestroyPart3(any(), any(), anyBoolean(), anyBoolean(),
         anyBoolean(), any());
-    assertThat(arm._getMap().containsKey(event.getKey())).isFalse();
+    assertThat(arm.getEntryMap().containsKey(event.getKey())).isFalse();
   }
 
   @Test
@@ -671,8 +646,8 @@ public class AbstractRegionMapTest {
     final boolean duringRI = false;
     assertThat(arm.destroy(event, inTokenMode, duringRI, false, false, expectedOldValue, false))
         .isTrue();
-    assertThat(arm._getMap().containsKey(event.getKey())).isTrue();
-    RegionEntry re = (RegionEntry) arm._getMap().get(event.getKey());
+    assertThat(arm.getEntryMap().containsKey(event.getKey())).isTrue();
+    RegionEntry re = (RegionEntry) arm.getEntryMap().get(event.getKey());
     assertThat(re.getValueAsToken()).isEqualTo(Token.TOMBSTONE);
     boolean invokeCallbacks = true;
     verify(arm._getOwner(), times(1)).basicDestroyPart2(any(), eq(event), eq(inTokenMode),
@@ -691,7 +666,7 @@ public class AbstractRegionMapTest {
     final boolean duringRI = false;
     assertThat(arm.destroy(event, inTokenMode, duringRI, false, false, expectedOldValue, false))
         .isTrue();
-    assertThat(arm._getMap().containsKey(event.getKey())).isTrue();
+    assertThat(arm.getEntryMap().containsKey(event.getKey())).isTrue();
     boolean invokeCallbacks = true;
     verify(arm._getOwner(), times(1)).basicDestroyPart2(any(), eq(event), eq(inTokenMode),
         eq(false), eq(duringRI), eq(invokeCallbacks));
@@ -702,10 +677,60 @@ public class AbstractRegionMapTest {
     // that calls removePhase1 when the versionTag is null.
     // It seems like this code path needs to tell the higher levels
     // to call removeEntry
-    RegionEntry re = (RegionEntry) arm._getMap().get(event.getKey());
+    RegionEntry re = (RegionEntry) arm.getEntryMap().get(event.getKey());
     assertThat(re.getValueAsToken()).isEqualTo(Token.REMOVED_PHASE1);
   }
 
+  @Test
+  public void txApplyInvalidateDoesNotInvalidateRemovedToken() throws RegionClearedException {
+    TxTestableAbstractRegionMap arm = new TxTestableAbstractRegionMap();
+
+    Object newValue = "value";
+    arm.txApplyPut(Operation.CREATE, KEY, newValue, false,
+        new TXId(mock(InternalDistributedMember.class), 1), mock(TXRmtEvent.class),
+        mock(EventID.class), null, null, null, null, null, null, 1);
+    RegionEntry re = arm.getEntry(KEY);
+    assertNotNull(re);
+
+    Token[] removedTokens =
+        {Token.REMOVED_PHASE2, Token.REMOVED_PHASE1, Token.DESTROYED, Token.TOMBSTONE};
+
+    for (Token token : removedTokens) {
+      verifyTxApplyInvalidate(arm, KEY, re, token);
+    }
+  }
+
+  private EntryEventImpl createEventForInvalidate(LocalRegion lr) {
+    when(lr.getKeyInfo(KEY)).thenReturn(new KeyInfo(KEY, null, null));
+    return EntryEventImpl.create(lr, Operation.INVALIDATE, KEY, false, null, true, false);
+  }
+
+  private EntryEventImpl createEventForDestroy(LocalRegion lr) {
+    when(lr.getKeyInfo(KEY)).thenReturn(new KeyInfo(KEY, null, null));
+    return EntryEventImpl.create(lr, Operation.DESTROY, KEY, false, null, true, false);
+  }
+
+  private void addEntry(AbstractRegionMap arm) {
+    addEntry(arm, "value");
+  }
+
+  private void addEntry(AbstractRegionMap arm, Object value) {
+    RegionEntry entry = arm.getEntryFactory().createEntry(arm._getOwner(), KEY, value);
+    arm.getEntryMap().put(KEY, entry);
+  }
+
+  private void verifyTxApplyInvalidate(TxTestableAbstractRegionMap arm, Object key, RegionEntry re,
+      Token token) throws RegionClearedException {
+    re.setValue(arm._getOwner(), token);
+    arm.txApplyInvalidate(key, Token.INVALID, false,
+        new TXId(mock(InternalDistributedMember.class), 1), mock(TXRmtEvent.class), false,
+        mock(EventID.class), null, null, null, null, null, null, 1);
+    assertEquals(re.getValueAsToken(), token);
+  }
+
+  /**
+   * TestableAbstractRegionMap
+   */
   private static class TestableAbstractRegionMap extends AbstractRegionMap {
 
     protected TestableAbstractRegionMap() {
@@ -717,7 +742,7 @@ public class AbstractRegionMapTest {
     }
 
     protected TestableAbstractRegionMap(boolean withConcurrencyChecks,
-        CustomEntryConcurrentHashMap map, RegionEntryFactory factory) {
+        ConcurrentMapWithReusableEntries map, RegionEntryFactory factory) {
       super(null);
       LocalRegion owner = mock(LocalRegion.class);
       CachePerfStats cachePerfStats = mock(CachePerfStats.class);
@@ -727,14 +752,17 @@ public class AbstractRegionMapTest {
       doThrow(EntryNotFoundException.class).when(owner).checkEntryNotFound(any());
       initialize(owner, new Attributes(), null, false);
       if (map != null) {
-        this._setMap(map);
+        setEntryMap(map);
       }
       if (factory != null) {
-        this.setEntryFactory(factory);
+        setEntryFactory(factory);
       }
     }
   }
 
+  /**
+   * TestableVMLRURegionMap
+   */
   private static class TestableVMLRURegionMap extends VMLRURegionMap {
     private static EvictionAttributes evictionAttributes =
         EvictionAttributes.createLRUEntryAttributes();
@@ -767,12 +795,15 @@ public class AbstractRegionMapTest {
     }
 
     protected TestableVMLRURegionMap(boolean withConcurrencyChecks,
-        CustomEntryConcurrentHashMap hashMap) {
+        ConcurrentMapWithReusableEntries hashMap) {
       this(withConcurrencyChecks);
-      this._setMap(hashMap);
+      setEntryMap(hashMap);
     }
   }
 
+  /**
+   * TxTestableAbstractRegionMap
+   */
   private static class TxTestableAbstractRegionMap extends AbstractRegionMap {
 
     protected TxTestableAbstractRegionMap() {
@@ -785,33 +816,4 @@ public class AbstractRegionMapTest {
     }
   }
 
-  @Test
-  public void txApplyInvalidateDoesNotInvalidateRemovedToken() throws RegionClearedException {
-    TxTestableAbstractRegionMap arm = new TxTestableAbstractRegionMap();
-
-    Object newValue = "value";
-    arm.txApplyPut(Operation.CREATE, KEY, newValue, false,
-        new TXId(mock(InternalDistributedMember.class), 1), mock(TXRmtEvent.class),
-        mock(EventID.class), null, null, null, null, null, null, 1);
-    RegionEntry re = arm.getEntry(KEY);
-    assertNotNull(re);
-
-    Token[] removedTokens =
-        {Token.REMOVED_PHASE2, Token.REMOVED_PHASE1, Token.DESTROYED, Token.TOMBSTONE};
-
-    for (Token token : removedTokens) {
-      verifyTxApplyInvalidate(arm, KEY, re, token);
-    }
-
-  }
-
-  private void verifyTxApplyInvalidate(TxTestableAbstractRegionMap arm, Object key, RegionEntry re,
-      Token token) throws RegionClearedException {
-    re.setValue(arm._getOwner(), token);
-    arm.txApplyInvalidate(key, Token.INVALID, false,
-        new TXId(mock(InternalDistributedMember.class), 1), mock(TXRmtEvent.class), false,
-        mock(EventID.class), null, null, null, null, null, null, 1);
-    assertEquals(re.getValueAsToken(), token);
-  }
-
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ClearRvvLockingDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ClearRvvLockingDUnitTest.java
index bc9431b..e97c531 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ClearRvvLockingDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ClearRvvLockingDUnitTest.java
@@ -624,7 +624,7 @@ public class ClearRvvLockingDUnitTest extends JUnit4CacheTestCase {
    */
   public static class ArmBasicClearHook extends ARMLockTestHookAdapter {
     @Override
-    public void afterRelease(LocalRegion owner, CacheEvent event) {
+    public void afterRelease(InternalRegion owner, CacheEvent event) {
       if ((event.getOperation().isCreate()) && owner.getName().startsWith("test")) {
         invokeClear(theOtherVM);
       }
@@ -637,7 +637,7 @@ public class ClearRvvLockingDUnitTest extends JUnit4CacheTestCase {
   public static class ArmRemoveAndInvalidateClearHook extends ARMLockTestHookAdapter {
 
     @Override
-    public void afterRelease(LocalRegion owner, CacheEvent event) {
+    public void afterRelease(InternalRegion owner, CacheEvent event) {
       if ((event.getOperation().isDestroy() || event.getOperation().isInvalidate())
           && owner.getName().startsWith("test")) {
         invokeClear(theOtherVM);
@@ -650,7 +650,7 @@ public class ClearRvvLockingDUnitTest extends JUnit4CacheTestCase {
    */
   public static class ArmNoAckRemoteHook extends ARMLockTestHookAdapter {
     @Override
-    public void beforeLock(LocalRegion owner, CacheEvent event) {
+    public void beforeLock(InternalRegion owner, CacheEvent event) {
       if (event.isOriginRemote() && event.getOperation().isCreate()
           && owner.getName().startsWith("test")) {
         theOtherVM.invoke(() -> releaseStep1()); // start clear
@@ -671,7 +671,7 @@ public class ClearRvvLockingDUnitTest extends JUnit4CacheTestCase {
    */
   public static class ArmBulkClearHook extends ARMLockTestHookAdapter {
     @Override
-    public void afterBulkRelease(LocalRegion region) {
+    public void afterBulkRelease(InternalRegion region) {
       invokeClear(theOtherVM);
     }
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java
index a270675..b800171 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java
@@ -386,7 +386,7 @@ public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
    */
   public class ArmLockHook extends ARMLockTestHookAdapter {
     @Override
-    public void beforeLock(LocalRegion owner, CacheEvent event) {
+    public void beforeLock(InternalRegion owner, CacheEvent event) {
       if (event != null) {
         if (event.getOperation().isClear() || event.getOperation().isRegionDestroy()
             || event.getOperation().isClose()) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
index 5f282cf..19cc81e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
@@ -1765,8 +1765,6 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
 
   /**
    * there is one txState and zero or more txProxyStates
-   *
-   * @throws Exception
    */
   @Test
   public void testConnectionAffinity() throws Exception {
@@ -3909,7 +3907,7 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
       AbstractRegionMap arm = (AbstractRegionMap) ((LocalRegion) br).entries;
       arm.setARMLockTestHook(new ARMLockTestHookAdapter() {
         @Override
-        public void beforeLock(LocalRegion owner, CacheEvent event) {
+        public void beforeLock(InternalRegion owner, CacheEvent event) {
           List<Integer> ids =
               ((PartitionedRegion) getCache().getRegion(regionName)).getLocalBucketsListTestOnly();
           assertFalse(ids.isEmpty());
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DestroyEntryWithConcurrentOperationJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DestroyEntryWithConcurrentOperationJUnitTest.java
deleted file mode 100644
index c0acd21..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DestroyEntryWithConcurrentOperationJUnitTest.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Properties;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.*;
-import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.test.junit.categories.IntegrationTest;
-
-/**
- * TestCase that emulates the conditions that entry destroy with concurrent destroy region or cache
- * close event will get expected Exception.
- */
-@Category(IntegrationTest.class)
-public class DestroyEntryWithConcurrentOperationJUnitTest {
-  /**
-   * A region entry key.
-   */
-  private static final String KEY = "KEY";
-
-  /**
-   * A region entry value.
-   */
-  private static final String VALUE =
-      " Vestibulum quis lobortis risus. Cras cursus eget dolor in facilisis. Curabitur purus arcu, dignissim ac lorem non, venenatis condimentum tellus. Praesent at erat dapibus, bibendum nunc sed, congue nulla";
-
-  /**
-   * A cache.
-   */
-  private GemFireCacheImpl cache = null;
-
-
-  @Before
-  public void setUp() throws Exception {
-    // Create our cache
-    this.cache = createCache();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    // Cleanup our cache
-    closeCache(this.cache);
-  }
-
-  /**
-   * @return the test's cache.
-   */
-  protected GemFireCacheImpl getCache() {
-    return this.cache;
-  }
-
-  /**
-   * Close a cache.
-   *
-   * @param gfc the cache to close.
-   */
-  protected void closeCache(GemFireCacheImpl gfc) {
-    gfc.close();
-  }
-
-  /**
-   * @return the test's off heap memory size.
-   */
-  protected String getOffHeapMemorySize() {
-    return "2m";
-  }
-
-  /**
-   * @return the type of region for the test.
-   */
-  protected RegionShortcut getRegionShortcut() {
-    return RegionShortcut.REPLICATE;
-  }
-
-  /**
-   * @return the region containing our test data.
-   */
-  protected String getRegionName() {
-    return "region1";
-  }
-
-  /**
-   * Creates and returns the test region with concurrency checks enabled.
-   */
-  protected Region<Object, Object> createRegion(boolean isOffHeap) {
-    return createRegion(true, isOffHeap);
-  }
-
-  /**
-   * Creates and returns the test region.
-   *
-   * @param concurrencyChecksEnabled concurrency checks will be enabled if true.
-   */
-  protected Region<Object, Object> createRegion(boolean concurrencyChecksEnabled,
-      boolean isOffHeap) {
-    return getCache().createRegionFactory(getRegionShortcut()).setOffHeap(isOffHeap)
-        .setConcurrencyChecksEnabled(concurrencyChecksEnabled).create(getRegionName());
-  }
-
-  /**
-   * Creates and returns the test cache.
-   */
-  protected GemFireCacheImpl createCache() {
-    Properties props = new Properties();
-    props.setProperty(LOCATORS, "");
-    props.setProperty(MCAST_PORT, "0");
-    props.setProperty(ConfigurationProperties.OFF_HEAP_MEMORY_SIZE, getOffHeapMemorySize());
-    GemFireCacheImpl result = (GemFireCacheImpl) new CacheFactory(props).create();
-    return result;
-  }
-
-  @Test
-  public void testEntryDestroyWithCacheClose() throws Exception {
-    verifyEntryDestroyWithCacheClose(false);
-  }
-
-  @Test
-  public void testOffHeapRegionEntryDestroyWithCacheClose() throws Exception {
-    verifyEntryDestroyWithCacheClose(true);
-  }
-
-  /**
-   * Simulates the conditions setting a test hook boolean in {@link AbstractRegionMap}. This test
-   * hook forces a cache close during a destroy in a region. This test asserts that a
-   * CacheClosedException is thrown rather than an EntryNotFoundException (or any other exception
-   * type for that matter).
-   */
-  private void verifyEntryDestroyWithCacheClose(boolean isOffHeap) {
-    AbstractRegionMap.testHookRunnableForConcurrentOperation = new Runnable() {
-      @Override
-      public void run() {
-        getCache().close();
-      }
-    };
-
-    // True when the correct exception has been triggered.
-    boolean correctException = false;
-
-    Region<Object, Object> region = createRegion(isOffHeap);
-    region.put(KEY, VALUE);
-
-    try {
-      region.destroy(KEY);
-    } catch (CacheClosedException e) {
-      correctException = true;
-      // e.printStackTrace();
-    } catch (Exception e) {
-      // e.printStackTrace();
-      fail("Did not receive a CacheClosedException.  Received a " + e.getClass().getName()
-          + " instead.");
-    } finally {
-      AbstractRegionMap.testHookRunnableForConcurrentOperation = null;
-    }
-
-    assertTrue("A CacheClosedException was not triggered", correctException);
-  }
-
-  @Test
-  public void testEntryDestroyWithRegionDestroy() throws Exception {
-    verifyConcurrentRegionDestroyWithEntryDestroy(false);
-  }
-
-  @Test
-  public void testEntryDestroyWithOffHeapRegionDestroy() throws Exception {
-    verifyConcurrentRegionDestroyWithEntryDestroy(true);
-  }
-
-  /**
-   * Simulates the conditions by setting a test hook boolean in {@link AbstractRegionMap}. This test
-   * hook forces a region destroy during a destroy operation in a region. This test asserts that a
-   * RegionDestroyedException is thrown rather than an EntryNotFoundException (or any other
-   * exception type for that matter).
-   */
-  private void verifyConcurrentRegionDestroyWithEntryDestroy(boolean isOffHeap) {
-    AbstractRegionMap.testHookRunnableForConcurrentOperation = new Runnable() {
-      @Override
-      public void run() {
-        Cache cache = getCache();
-        Region region = cache.getRegion(getRegionName());
-        region.destroyRegion();
-        assertTrue("Region " + getRegionName() + " is not destroyed.", region.isDestroyed());
-      }
-    };
-
-    // True when the correct exception has been triggered.
-    boolean correctException = false;
-
-    Region<Object, Object> region = createRegion(isOffHeap);
-    region.put(KEY, VALUE);
-
-    try {
-      region.destroy(KEY);
-    } catch (RegionDestroyedException e) {
-      correctException = true;
-      // e.printStackTrace();
-    } catch (Exception e) {
-      // e.printStackTrace();
-      fail("Did not receive a RegionDestroyedException.  Received a " + e.getClass().getName()
-          + " instead.");
-    } finally {
-      AbstractRegionMap.testHookRunnableForConcurrentOperation = null;
-    }
-
-    assertTrue("A RegionDestroyedException was not triggered", correctException);
-  }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/map/DestroyEntryDuringCloseIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/map/DestroyEntryDuringCloseIntegrationTest.java
new file mode 100644
index 0000000..b8d228d
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/map/DestroyEntryDuringCloseIntegrationTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.map;
+
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.OFF_HEAP_MEMORY_SIZE;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.Properties;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+/**
+ * TestCase that emulates the conditions that entry destroy with concurrent destroy region or cache
+ * close event will get expected Exception.
+ */
+@Category(IntegrationTest.class)
+@RunWith(JUnitParamsRunner.class)
+public class DestroyEntryDuringCloseIntegrationTest {
+
+  private static final String KEY = "KEY";
+  private static final String VALUE = "value";
+  public static final String REGION = "region1";
+
+  private Cache cache;
+
+  @Before
+  public void setUp() throws Exception {
+    Properties props = new Properties();
+    props.setProperty(LOCATORS, "");
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(OFF_HEAP_MEMORY_SIZE, "2m");
+    cache = new CacheFactory(props).create();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    RegionMapDestroy.testHookRunnableForConcurrentOperation = null;
+    cache.close();
+  }
+
+  private Region<Object, Object> createRegion(boolean isOffHeap) {
+    return cache.createRegionFactory(RegionShortcut.REPLICATE).setOffHeap(isOffHeap)
+        .setConcurrencyChecksEnabled(true).create(REGION);
+  }
+
+  @Test
+  @Parameters({"true", "false"})
+  public void testEntryDestroyWithCacheClose(boolean offheap) throws Exception {
+    RegionMapDestroy.testHookRunnableForConcurrentOperation = () -> cache.close();
+
+    Region<Object, Object> region = createRegion(offheap);
+    region.put(KEY, VALUE);
+
+    assertThatThrownBy(() -> region.destroy(KEY)).isInstanceOf(CacheClosedException.class);
+  }
+
+  @Test
+  @Parameters({"true", "false"})
+  public void testEntryDestroyWithRegionDestroy(boolean offheap) throws Exception {
+    RegionMapDestroy.testHookRunnableForConcurrentOperation =
+        () -> cache.getRegion(REGION).destroyRegion();
+
+    Region<Object, Object> region = createRegion(offheap);
+    region.put(KEY, VALUE);
+
+    assertThatThrownBy(() -> region.destroy(KEY)).isInstanceOf(RegionDestroyedException.class);
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapDestroyTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapDestroyTest.java
new file mode 100644
index 0000000..bf2f1f7
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapDestroyTest.java
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.internal.cache.AbstractRegionMap;
+import org.apache.geode.internal.cache.CachePerfStats;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.KeyInfo;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.RegionClearedException;
+import org.apache.geode.internal.cache.RegionEntry;
+import org.apache.geode.internal.cache.RegionEntryFactory;
+import org.apache.geode.internal.cache.RegionMap.Attributes;
+import org.apache.geode.internal.cache.Token;
+import org.apache.geode.internal.cache.VMLRURegionMap;
+import org.apache.geode.internal.cache.eviction.EvictableEntry;
+import org.apache.geode.internal.cache.eviction.EvictionController;
+import org.apache.geode.internal.cache.eviction.EvictionCounters;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class RegionMapDestroyTest {
+
+  private static EvictionAttributes evictionAttributes =
+      EvictionAttributes.createLRUEntryAttributes();
+  private static Object KEY = "key";
+
+  private AbstractRegionMap arm;
+  private boolean withConcurrencyChecks;
+  private CustomEntryConcurrentHashMap<Object, Object> entryMap;
+  private RegionEntryFactory factory;
+  private LocalRegion owner;
+  private EvictableEntry evictableEntry;
+  private EvictionController evictionController;
+  private Attributes attributes;
+
+  private EntryEventImpl event;
+  private Object expectedOldValue;
+
+  private boolean inTokenMode;
+  private boolean duringRI;
+  private boolean cacheWrite;
+  private boolean isEviction;
+  private boolean removeRecoveredEntry;
+  private boolean invokeCallbacks;
+
+  @Before
+  public void setUp() {
+    withConcurrencyChecks = true;
+    entryMap = null;
+    factory = null;
+
+    attributes = new Attributes();
+
+    owner = mock(LocalRegion.class);
+    when(owner.getCachePerfStats()).thenReturn(mock(CachePerfStats.class));
+    when(owner.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+    when(owner.getConcurrencyChecksEnabled()).thenReturn(withConcurrencyChecks);
+    doThrow(EntryNotFoundException.class).when(owner).checkEntryNotFound(any());
+
+    evictionController = mock(EvictionController.class);
+    when(evictionController.getEvictionAlgorithm()).thenReturn(evictionAttributes.getAlgorithm());
+    when(evictionController.getCounters()).thenReturn(mock(EvictionCounters.class));
+
+    evictableEntry = mock(EvictableEntry.class);
+
+    event = null;
+    inTokenMode = false;
+    duringRI = false;
+    cacheWrite = false;
+    isEviction = false;
+    expectedOldValue = null;
+    removeRecoveredEntry = false;
+    invokeCallbacks = false;
+  }
+
+  @After
+  public void tearDown() {
+    AbstractRegionMap.FORCE_INVALIDATE_EVENT = false;
+  }
+
+  private void givenConcurrencyChecks(boolean enabled) {
+    withConcurrencyChecks = enabled;
+    when(owner.getConcurrencyChecksEnabled()).thenReturn(withConcurrencyChecks);
+  }
+
+  private void givenEmptyRegionMap() {
+    arm = new SimpleRegionMap();
+    event = createEventForDestroy(arm._getOwner());
+  }
+
+  private void givenEmptyRegionMapWithMockedEntryMap() {
+    entryMap = mock(CustomEntryConcurrentHashMap.class);
+    factory = mock(RegionEntryFactory.class);
+    arm = new SimpleRegionMap(entryMap, factory);
+    event = createEventForDestroy(arm._getOwner());
+  }
+
+  private void givenEviction() {
+    when(owner.getEvictionAttributes()).thenReturn(evictionAttributes);
+    arm = new EvictableRegionMap();
+    event = createEventForDestroy(arm._getOwner());
+    isEviction = true;
+  }
+
+  private void givenEvictionWithMockedEntryMap() {
+    givenEviction();
+
+    entryMap = mock(CustomEntryConcurrentHashMap.class);
+    arm = new EvictableRegionMapWithMockedEntryMap();
+    event = createEventForDestroy(arm._getOwner());
+  }
+
+  private void givenExistingEvictableEntry(Object value) throws RegionClearedException {
+    when(evictableEntry.getValue()).thenReturn(value);
+    when(entryMap.get(KEY)).thenReturn(value == null ? null : evictableEntry);
+    when(entryMap.putIfAbsent(eq(KEY), any())).thenReturn(evictableEntry);
+    when(evictableEntry.destroy(any(), any(), anyBoolean(), anyBoolean(), any(), anyBoolean(),
+        anyBoolean())).thenReturn(true);
+  }
+
+  private void givenExistingEvictableEntryWithMockedIsTombstone() throws RegionClearedException {
+    givenExistingEvictableEntry("value");
+    when(evictableEntry.isTombstone()).thenReturn(true).thenReturn(false);
+    when(evictableEntry.destroy(any(), any(), anyBoolean(), anyBoolean(), any(), anyBoolean(),
+        anyBoolean())).thenReturn(true);
+  }
+
+  private void givenDestroyThrowsRegionClearedException() throws RegionClearedException {
+    when(evictableEntry.destroy(any(), any(), anyBoolean(), anyBoolean(), any(), anyBoolean(),
+        anyBoolean())).thenThrow(RegionClearedException.class);
+    when(entryMap.get(KEY)).thenReturn(null);
+    when(factory.createEntry(any(), any(), any())).thenReturn(evictableEntry);
+  }
+
+  private void givenExistingEntry() {
+    RegionEntry entry = arm.getEntryFactory().createEntry(arm._getOwner(), KEY, "value");
+    arm.getEntryMap().put(KEY, entry);
+  }
+
+  private void givenExistingEntry(Object value) {
+    RegionEntry entry = arm.getEntryFactory().createEntry(arm._getOwner(), KEY, value);
+    arm.getEntryMap().put(KEY, entry);
+  }
+
+  private void givenExistingEntryWithVersionTag() {
+    givenExistingEntry();
+
+    RegionVersionVector<?> versionVector = mock(RegionVersionVector.class);
+    when(arm._getOwner().getVersionVector()).thenReturn(versionVector);
+    VersionTag<?> versionTag = mock(VersionTag.class);
+    when(versionTag.hasValidVersion()).thenReturn(true);
+    event.setVersionTag(versionTag);
+  }
+
+  private void givenExistingEntryWithTokenAndVersionTag(Token token) {
+    givenExistingEntry(token);
+
+    RegionVersionVector<?> versionVector = mock(RegionVersionVector.class);
+    when(arm._getOwner().getVersionVector()).thenReturn(versionVector);
+    VersionTag<?> versionTag = mock(VersionTag.class);
+    when(versionTag.hasValidVersion()).thenReturn(true);
+    event.setVersionTag(versionTag);
+  }
+
+  private void givenRemoteEventWithVersionTag() {
+    givenOriginIsRemote();
+
+    RegionVersionVector versionVector = mock(RegionVersionVector.class);
+    when(arm._getOwner().getVersionVector()).thenReturn(versionVector);
+    VersionTag versionTag = mock(VersionTag.class);
+    when(versionTag.hasValidVersion()).thenReturn(true);
+    event.setVersionTag(versionTag);
+  }
+
+  private void givenInTokenMode() {
+    inTokenMode = true;
+  }
+
+  private void givenRemoveRecoveredEntry() {
+    removeRecoveredEntry = true;
+  }
+
+  private void givenEvictableEntryIsInUseByTransaction() {
+    when(evictableEntry.isInUseByTransaction()).thenReturn(true);
+  }
+
+  private void givenOriginIsRemote() {
+    event.setOriginRemote(true);
+  }
+
+  @Test
+  public void destroyWithEmptyRegionThrowsException() {
+    givenConcurrencyChecks(false);
+    givenEmptyRegionMap();
+
+    assertThatThrownBy(() -> arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction,
+        expectedOldValue, removeRecoveredEntry)).isInstanceOf(EntryNotFoundException.class);
+  }
+
+  @Test
+  public void destroyWithEmptyRegionInTokenModeAddsAToken() {
+    givenConcurrencyChecks(false);
+    givenEmptyRegionMap();
+    givenInTokenMode();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isTrue();
+
+    validateMapContainsTokenValue(Token.DESTROYED);
+    validateInvokedDestroyMethodsOnRegion(false);
+  }
+
+  @Test
+  public void destroyWithEmptyRegionInTokenModeWithRegionClearedExceptionDoesDestroy()
+      throws Exception {
+    givenConcurrencyChecks(false);
+    givenEmptyRegionMapWithMockedEntryMap();
+    givenDestroyThrowsRegionClearedException();
+    givenInTokenMode();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isTrue();
+
+    validateInvokedDestroyMethodsOnRegion(true);
+  }
+
+  @Test
+  public void evictDestroyWithEmptyRegionInTokenModeDoesNothing() {
+    givenEviction();
+    givenEmptyRegionMap();
+    givenInTokenMode();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isFalse();
+
+    validateMapDoesNotContainKey(event.getKey());
+    validateNoDestroyInvocationsOnRegion();
+  }
+
+  @Test
+  public void evictDestroyWithExistingTombstoneInTokenModeChangesToDestroyToken() {
+    givenConcurrencyChecks(true);
+    givenEviction();
+    givenExistingEntry(Token.TOMBSTONE);
+    givenInTokenMode();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isTrue();
+
+    validateMapContainsTokenValue(Token.DESTROYED);
+    validateInvokedDestroyMethodsOnRegion(false);
+  }
+
+  @Test
+  public void evictDestroyWithExistingTombstoneInUseByTransactionInTokenModeDoesNothing()
+      throws RegionClearedException {
+    givenConcurrencyChecks(true);
+    givenEvictionWithMockedEntryMap();
+    givenExistingEvictableEntry(Token.TOMBSTONE);
+    givenEvictableEntryIsInUseByTransaction();
+    givenInTokenMode();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isFalse();
+
+    validateNoDestroyInvocationsOnEvictableEntry();
+    validateNoDestroyInvocationsOnRegion();
+  }
+
+  @Test
+  public void evictDestroyWithConcurrentChangeFromNullToInUseByTransactionInTokenModeDoesNothing()
+      throws RegionClearedException {
+    givenConcurrencyChecks(true);
+    givenEvictionWithMockedEntryMap();
+    givenExistingEvictableEntry(null);
+    givenEvictableEntryIsInUseByTransaction();
+    givenInTokenMode();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isFalse();
+
+    validateNoDestroyInvocationsOnEvictableEntry();
+    validateNoDestroyInvocationsOnRegion();
+  }
+
+  @Test
+  public void destroyWithConcurrentChangeFromNullToValidRetriesAndDoesDestroy()
+      throws RegionClearedException {
+    givenConcurrencyChecks(true);
+    givenEvictionWithMockedEntryMap();
+    givenExistingEvictableEntry("value");
+
+    when(entryMap.get(KEY)).thenReturn(null).thenReturn(evictableEntry);
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isTrue();
+
+    validateInvokedDestroyMethodOnEvictableEntry();
+    validateInvokedDestroyMethodsOnRegion(false);
+  }
+
+  @Test
+  public void destroyInTokenModeWithConcurrentChangeFromNullToRemovePhase2RetriesAndDoesDestroy()
+      throws RegionClearedException {
+    givenConcurrencyChecks(true);
+    givenEvictionWithMockedEntryMap();
+    givenInTokenMode();
+
+    when(evictableEntry.isRemovedPhase2()).thenReturn(true);
+    when(evictableEntry.destroy(any(), any(), anyBoolean(), anyBoolean(), any(), anyBoolean(),
+        anyBoolean())).thenReturn(true);
+    when(entryMap.get(KEY)).thenReturn(null);
+    when(entryMap.putIfAbsent(eq(KEY), any())).thenReturn(evictableEntry).thenReturn(null);
+
+    // isEviction is false despite having eviction enabled
+    isEviction = false;
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isTrue();
+
+    verify(entryMap).remove(eq(KEY), eq(evictableEntry));
+    verify(entryMap, times(2)).putIfAbsent(eq(KEY), any());
+    verify(evictableEntry, never()).destroy(eq(arm._getOwner()), eq(event), eq(false), anyBoolean(),
+        eq(expectedOldValue), anyBoolean(), anyBoolean());
+
+    validateInvokedDestroyMethodsOnRegion(false);
+  }
+
+  @Test
+  public void destroyWithConcurrentChangeFromTombstoneToValidRetriesAndDoesDestroy()
+      throws RegionClearedException {
+    givenConcurrencyChecks(true);
+    givenEvictionWithMockedEntryMap();
+    givenExistingEvictableEntryWithMockedIsTombstone();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isTrue();
+
+    validateInvokedDestroyMethodOnEvictableEntry();
+    validateInvokedDestroyMethodsOnRegion(false);
+  }
+
+  @Test
+  public void destroyOfExistingEntryInTokenModeAddsAToken() {
+    givenConcurrencyChecks(false);
+    givenEmptyRegionMap();
+    givenExistingEntry();
+    givenInTokenMode();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isTrue();
+
+    validateMapContainsTokenValue(Token.DESTROYED);
+    validateInvokedDestroyMethodsOnRegion(false);
+  }
+
+  @Test
+  public void destroyOfExistingTombstoneInTokenModeWithConcurrencyChecksDoesNothing() {
+    givenConcurrencyChecks(true);
+    givenEmptyRegionMap();
+    givenExistingEntryWithTokenAndVersionTag(Token.TOMBSTONE);
+    givenInTokenMode();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isTrue();
+
+    // why not DESTROY token? since it was already destroyed why do we do the parts?
+    validateMapContainsTokenValue(Token.TOMBSTONE);
+    validateInvokedDestroyMethodsOnRegion(false);
+  }
+
+  @Test
+  public void destroyOfExistingTombstoneWithConcurrencyChecksThrowsEntryNotFound() {
+    givenConcurrencyChecks(true);
+    givenEmptyRegionMap();
+    givenExistingEntryWithTokenAndVersionTag(Token.TOMBSTONE);
+
+    assertThatThrownBy(() -> arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction,
+        expectedOldValue, removeRecoveredEntry)).isInstanceOf(EntryNotFoundException.class);
+  }
+
+  @Test
+  public void destroyOfExistingTombstoneWithConcurrencyChecksAndRemoveRecoveredEntryDoesRemove() {
+    givenConcurrencyChecks(true);
+    givenEmptyRegionMap();
+    givenExistingEntryWithTokenAndVersionTag(Token.TOMBSTONE);
+    givenRemoveRecoveredEntry();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isTrue();
+
+    validateMapDoesNotContainKey(event.getKey());
+    validateInvokedDestroyMethodsOnRegion(false);
+  }
+
+  @Test
+  public void destroyOfExistingRemovePhase2WithConcurrencyChecksAndRemoveRecoveredEntryDoesRetryAndThrowsEntryNotFound() {
+    givenConcurrencyChecks(true);
+    givenEmptyRegionMap();
+    givenExistingEntryWithTokenAndVersionTag(Token.REMOVED_PHASE2);
+    givenRemoveRecoveredEntry();
+
+    assertThatThrownBy(() -> arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction,
+        expectedOldValue, removeRecoveredEntry)).isInstanceOf(EntryNotFoundException.class);
+  }
+
+  @Test
+  public void destroyOfExistingEntryRemovesEntryFromMapAndDoesNotifications() {
+    givenConcurrencyChecks(false);
+    givenEmptyRegionMap();
+    givenExistingEntry();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isTrue();
+
+    validateMapDoesNotContainKey(event.getKey());
+    validateInvokedDestroyMethodsOnRegion(false);
+  }
+
+  /**
+   * This might be a bug. It seems like we should have created a tombstone but we have no version
+   * tag so that might be the cause of this bug.
+   */
+  @Test
+  public void destroyOfExistingEntryWithConcurrencyChecksAndNoVersionTagDestroysWithoutTombstone() {
+    givenConcurrencyChecks(true);
+    givenEmptyRegionMap();
+    givenExistingEntry();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isTrue();
+
+    validateMapDoesNotContainKey(event.getKey());
+    validateInvokedDestroyMethodsOnRegion(false);
+  }
+
+  @Test
+  public void destroyOfExistingEntryWithConcurrencyChecksAddsTombstone() {
+    givenConcurrencyChecks(true);
+    givenEmptyRegionMap();
+    givenExistingEntryWithVersionTag();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isTrue();
+
+    validateMapContainsTokenValue(Token.TOMBSTONE);
+    validateInvokedDestroyMethodsOnRegion(false);
+  }
+
+  @Test
+  public void evictDestroyOfExistingEntryWithConcurrencyChecksAddsTombstone() {
+    givenConcurrencyChecks(true);
+    givenEviction();
+    givenExistingEntryWithVersionTag();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isTrue();
+
+    validateMapContainsTokenValue(Token.TOMBSTONE);
+    validateInvokedDestroyMethodsOnRegion(false);
+  }
+
+  @Test
+  public void destroyWithEmptyRegionWithConcurrencyChecksThrowsException() {
+    givenConcurrencyChecks(true);
+    givenEmptyRegionMap();
+
+    assertThatThrownBy(() -> arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction,
+        expectedOldValue, removeRecoveredEntry)).isInstanceOf(EntryNotFoundException.class);
+  }
+
+  /**
+   * This seems to be a bug. We should not leave an evictableEntry in the entryMap added by the
+   * destroy call if destroy returns false.
+   */
+  @Test
+  public void evictDestroyWithEmptyRegionWithConcurrencyChecksDoesNothing() {
+    givenConcurrencyChecks(true);
+    givenEviction();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isFalse();
+
+    validateMapContainsTokenValue(Token.REMOVED_PHASE1);
+    validateNoDestroyInvocationsOnRegion();
+  }
+
+  @Test
+  public void evictDestroyWithEmptyRegionDoesNothing() {
+    givenConcurrencyChecks(false);
+    givenEviction();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isFalse();
+
+    validateMapDoesNotContainKey(event.getKey());
+    validateNoDestroyInvocationsOnRegion();
+  }
+
+  @Test
+  public void destroyWithEmptyRegionWithConcurrencyChecksAddsATombstone() {
+    givenConcurrencyChecks(true);
+    givenEmptyRegionMap();
+    givenRemoteEventWithVersionTag();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isTrue();
+
+    validateMapContainsTokenValue(Token.TOMBSTONE);
+    validateInvokedDestroyMethodsOnRegion(false);
+  }
+
+  /**
+   * instead of a TOMBSTONE we leave an evictableEntry whose value is REMOVE_PHASE1 this looks like
+   * a bug. It is caused by some code in: AbstractRegionEntry.destroy() that calls removePhase1 when
+   * the versionTag is null. It seems like this code path needs to tell the higher levels to call
+   * removeEntry
+   */
+  @Test
+  public void destroyWithEmptyRegionWithConcurrencyChecksAndNullVersionTagAddsATombstone() {
+    givenConcurrencyChecks(true);
+    givenEmptyRegionMap();
+    givenOriginIsRemote();
+
+    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction, expectedOldValue,
+        removeRecoveredEntry)).isTrue();
+
+    validateMapContainsKey(event.getKey());
+    validateMapContainsTokenValue(Token.REMOVED_PHASE1);
+    validateInvokedDestroyMethodsOnRegion(false);
+  }
+
+  private void validateInvokedDestroyMethodOnEvictableEntry() throws RegionClearedException {
+    verify(evictableEntry, times(1)).destroy(eq(arm._getOwner()), eq(event), eq(false),
+        anyBoolean(), eq(expectedOldValue), anyBoolean(), anyBoolean());
+  }
+
+  private void validateMapContainsKey(Object key) {
+    assertThat(arm.getEntryMap()).containsKey(key);
+  }
+
+  private void validateMapDoesNotContainKey(Object key) {
+    assertThat(arm.getEntryMap()).doesNotContainKey(key);
+  }
+
+  private void validateNoDestroyInvocationsOnEvictableEntry() throws RegionClearedException {
+    verify(evictableEntry, never()).destroy(any(), any(), anyBoolean(), anyBoolean(), any(),
+        anyBoolean(), anyBoolean());
+  }
+
+  private void validateMapContainsTokenValue(Token token) {
+    assertThat(arm.getEntryMap()).containsKey(event.getKey());
+    RegionEntry re = (RegionEntry) arm.getEntryMap().get(event.getKey());
+    assertThat(re.getValueAsToken()).isEqualTo(token);
+  }
+
+  private void validateInvokedDestroyMethodsOnRegion(boolean conflictWithClear) {
+    invokeCallbacks = true;
+    verify(arm._getOwner(), times(1)).basicDestroyPart2(any(), eq(event), eq(inTokenMode),
+        eq(conflictWithClear), eq(duringRI), eq(invokeCallbacks));
+    verify(arm._getOwner(), times(1)).basicDestroyPart3(any(), eq(event), eq(inTokenMode),
+        eq(duringRI), eq(invokeCallbacks), eq(expectedOldValue));
+  }
+
+  private void validateNoDestroyInvocationsOnRegion() {
+    verify(arm._getOwner(), never()).basicDestroyPart2(any(), any(), anyBoolean(), anyBoolean(),
+        anyBoolean(), anyBoolean());
+    verify(arm._getOwner(), never()).basicDestroyPart3(any(), any(), anyBoolean(), anyBoolean(),
+        anyBoolean(), any());
+  }
+
+  private EntryEventImpl createEventForDestroy(LocalRegion lr) {
+    when(lr.getKeyInfo(KEY)).thenReturn(new KeyInfo(KEY, null, null));
+    return EntryEventImpl.create(lr, Operation.DESTROY, KEY, false, null, true, false);
+  }
+
+  /**
+   * SimpleRegionMap
+   */
+  private class SimpleRegionMap extends AbstractRegionMap {
+
+    SimpleRegionMap() {
+      super(null);
+      initialize(owner, attributes, null, false);
+    }
+
+    SimpleRegionMap(CustomEntryConcurrentHashMap<Object, Object> entryMap,
+        RegionEntryFactory factory) {
+      super(null);
+      initialize(owner, attributes, null, false);
+      setEntryMap(entryMap);
+      setEntryFactory(factory);
+    }
+  }
+
+  /**
+   * EvictableRegionMapWithMockedEntryMap
+   */
+  private class EvictableRegionMapWithMockedEntryMap extends VMLRURegionMap {
+
+    EvictableRegionMapWithMockedEntryMap() {
+      super(owner, attributes, null, evictionController);
+      setEntryMap(entryMap);
+    }
+  }
+
+  /**
+   * EvictableRegionMap
+   */
+  private class EvictableRegionMap extends VMLRURegionMap {
+
+    EvictableRegionMap() {
+      super(owner, attributes, null, evictionController);
+    }
+  }
+
+}

-- 
To stop receiving notification emails like this one, please contact
klund@apache.org.