You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:36:38 UTC

[76/83] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index 0000000,6fe60ce..699de2f
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@@ -1,0 -1,4164 +1,4164 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package com.gemstone.gemfire.internal.cache;
+ 
+ 
+ import java.io.IOException;
+ import java.lang.reflect.Method;
+ import java.util.Collection;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
+ import com.gemstone.gemfire.internal.cache.region.entry.RegionEntryFactoryBuilder;
+ import org.apache.logging.log4j.Logger;
+ 
+ import com.gemstone.gemfire.GemFireIOException;
+ import com.gemstone.gemfire.InvalidDeltaException;
+ import com.gemstone.gemfire.cache.CacheRuntimeException;
+ import com.gemstone.gemfire.cache.CacheWriter;
+ import com.gemstone.gemfire.cache.CacheWriterException;
+ import com.gemstone.gemfire.cache.CustomEvictionAttributes;
+ import com.gemstone.gemfire.cache.DiskAccessException;
+ import com.gemstone.gemfire.cache.EntryExistsException;
+ import com.gemstone.gemfire.cache.EntryNotFoundException;
+ import com.gemstone.gemfire.cache.Operation;
+ import com.gemstone.gemfire.cache.RegionDestroyedException;
+ import com.gemstone.gemfire.cache.TimeoutException;
+ import com.gemstone.gemfire.cache.TransactionId;
+ import com.gemstone.gemfire.cache.query.IndexMaintenanceException;
+ import com.gemstone.gemfire.cache.query.QueryException;
+ import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexProtocol;
+ import com.gemstone.gemfire.distributed.DistributedMember;
+ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+ import com.gemstone.gemfire.internal.Assert;
+ import com.gemstone.gemfire.internal.ClassPathLoader;
+ import com.gemstone.gemfire.internal.cache.DiskInitFile.DiskRegionFlag;
+ import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo;
+ import com.gemstone.gemfire.internal.cache.delta.Delta;
+ import com.gemstone.gemfire.internal.cache.ha.HAContainerWrapper;
+ import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
+ import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.HAEventWrapper;
+ import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
+ import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
+ import com.gemstone.gemfire.internal.cache.versions.VersionHolder;
+ import com.gemstone.gemfire.internal.cache.versions.VersionSource;
+ import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+ import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+ import com.gemstone.gemfire.internal.concurrent.MapCallbackAdapter;
+ import com.gemstone.gemfire.internal.concurrent.MapResult;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
 -import com.gemstone.gemfire.internal.offheap.Chunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+ import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper;
+ import com.gemstone.gemfire.internal.offheap.ReferenceCountHelper;
+ import com.gemstone.gemfire.internal.offheap.annotations.Released;
+ import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+ import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
+ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap;
+ import com.gemstone.gemfire.pdx.PdxInstance;
+ import com.gemstone.gemfire.pdx.PdxSerializationException;
+ import com.gemstone.gemfire.pdx.internal.ConvertableToBytes;
+ 
+ /**
+  * Abstract implementation of {@link RegionMap}that has all the common
+  * behavior.
+  *
+  * @since 3.5.1
+  *
+  * @author Darrel Schneider
+  *
+  */
+ 
+ //Asif: In case of sqlFabric System, we are creating a different set of RegionEntry 
+ // which are derived from the concrete  GFE RegionEntry classes.
+ // In future if any new concrete  RegionEntry class is defined, the new  SqlFabric
+ // RegionEntry Classes need to be created. There is a junit test in sqlfabric
+ // which checks for RegionEntry classes of GFE and validates the same with its 
+ // own classes.
+ 
+ public abstract class AbstractRegionMap implements RegionMap {
+ 
+   private static final Logger logger = LogService.getLogger();
+   
+   /** The underlying map for this region. */
+   protected CustomEntryConcurrentHashMap<Object, Object> map;
+   /** An internal Listener for index maintenance for SQLFabric. */
+   private final IndexUpdater indexUpdater;
+ 
+   /**
+    * This test hook is used to force the conditions for defect 48182.
+    * This hook is used by Bug48182JUnitTest.
+    */
+   static Runnable testHookRunnableFor48182 =  null;
+   
+   private RegionEntryFactory entryFactory;
+   private Attributes attr;
+   private transient Object owner; // the region that owns this map
+   
+   protected AbstractRegionMap(InternalRegionArguments internalRegionArgs) {
+     if (internalRegionArgs != null) {
+       this.indexUpdater = internalRegionArgs.getIndexUpdater();
+     }
+     else {
+       this.indexUpdater = null;
+     }
+   }
+ 
+   public final IndexUpdater getIndexUpdater() {
+     return this.indexUpdater;
+   }
+ 
+   protected void initialize(Object owner,
+                             Attributes attr,
+                             InternalRegionArguments internalRegionArgs,
+                             boolean isLRU) {
+     _setAttributes(attr);
+     setOwner(owner);
+     _setMap(createConcurrentMap(attr.initialCapacity, attr.loadFactor,
+         attr.concurrencyLevel, false,
+         new AbstractRegionEntry.HashRegionEntryCreator()));
+ 
+     final GemFireCacheImpl cache;
+     boolean isDisk;
+     boolean withVersioning = false;
+     boolean offHeap = false;
+     if (owner instanceof LocalRegion) {
+       LocalRegion region = (LocalRegion)owner;
+       isDisk = region.getDiskRegion() != null;
+       cache = region.getGemFireCache();
+       withVersioning = region.getConcurrencyChecksEnabled();
+       offHeap = region.getOffHeap();
+     }
+     else if (owner instanceof PlaceHolderDiskRegion) {
+       offHeap = ((PlaceHolderDiskRegion) owner).getOffHeap();
+       isDisk = true;
+       withVersioning = ((PlaceHolderDiskRegion)owner).getFlags().contains(
+           DiskRegionFlag.IS_WITH_VERSIONING);
+       cache = GemFireCacheImpl.getInstance();
+     }
+     else {
+       throw new IllegalStateException(
+           "expected LocalRegion or PlaceHolderDiskRegion");
+     }
+ 
+     if (cache != null && cache.isSqlfSystem()) {
+       String provider = GemFireCacheImpl.SQLF_ENTRY_FACTORY_PROVIDER;
+       try {
+         Class<?> factoryProvider = ClassPathLoader.getLatest().forName(provider);
+         Method method = factoryProvider.getDeclaredMethod(
+             "getRegionEntryFactory", new Class[] { Boolean.TYPE, Boolean.TYPE,
+                 Boolean.TYPE, Object.class, InternalRegionArguments.class });
+         RegionEntryFactory ref = (RegionEntryFactory)method.invoke(null,
+             new Object[] { Boolean.valueOf(attr.statisticsEnabled),
+                 Boolean.valueOf(isLRU), Boolean.valueOf(isDisk), owner,
+                 internalRegionArgs });
+ 
+         // TODO need to have the SQLF entry factory support version stamp storage
+         setEntryFactory(ref);
+ 
+       }
+       catch (Exception e) {
+         throw new CacheRuntimeException(
+             "Exception in obtaining RegionEntry Factory" + " provider class ",
+             e) {
+         };
+       }
+     }
+     else {
+       setEntryFactory(new RegionEntryFactoryBuilder().getRegionEntryFactoryOrNull(attr.statisticsEnabled,isLRU,isDisk,withVersioning,offHeap));
+     }
+   }
+ 
+   protected CustomEntryConcurrentHashMap<Object, Object> createConcurrentMap(
+       int initialCapacity, float loadFactor, int concurrencyLevel,
+       boolean isIdentityMap,
+       CustomEntryConcurrentHashMap.HashEntryCreator<Object, Object> entryCreator) {
+     if (entryCreator != null) {
+       return new CustomEntryConcurrentHashMap<Object, Object>(initialCapacity, loadFactor,
+           concurrencyLevel, isIdentityMap, entryCreator);
+     }
+     else {
+       return new CustomEntryConcurrentHashMap<Object, Object>(initialCapacity,
+           loadFactor, concurrencyLevel, isIdentityMap);
+     }
+   }
+ 
+   public void changeOwner(LocalRegion r) {
+     if (r == _getOwnerObject()) {
+       return;
+     }
+     setOwner(r);
+   }
+ 
+   @Override
+   public final void setEntryFactory(RegionEntryFactory f) {
+     this.entryFactory = f;
+   }
+ 
+   public final RegionEntryFactory getEntryFactory() {
+     return this.entryFactory;
+   }
+ 
+   protected final void _setAttributes(Attributes a) {
+     this.attr = a;
+   }
+ 
+   public final Attributes getAttributes() {
+     return this.attr;
+   }
+   
+   protected final LocalRegion _getOwner() {
+     return (LocalRegion)this.owner;
+   }
+ 
+   protected boolean _isOwnerALocalRegion() {
+     return this.owner instanceof LocalRegion;
+   }
+ 
+   protected final Object _getOwnerObject() {
+     return this.owner;
+   }
+ 
+   public final void setOwner(Object r) {
+     this.owner = r;
+   }
+   
+   protected final CustomEntryConcurrentHashMap<Object, Object> _getMap() {
+     return this.map;
+   }
+ 
+   protected final void _setMap(CustomEntryConcurrentHashMap<Object, Object> m) {
+     this.map = m;
+   }
+ 
+   public int size()
+   {
+     return _getMap().size();
+   }
+ 
+   // this is currently used by stats and eviction
+   @Override
+   public int sizeInVM() {
+     return _getMap().size();
+   }
+ 
+   public boolean isEmpty()
+   {
+     return _getMap().isEmpty();
+   }
+ 
+   public Set keySet()
+   {
+     return _getMap().keySet();
+   }
+ 
+   @SuppressWarnings({ "unchecked", "rawtypes" })
+   public Collection<RegionEntry> regionEntries() {
+     return (Collection)_getMap().values();
+   }
+ 
+   @SuppressWarnings({ "unchecked", "rawtypes" })
+   @Override
+   public Collection<RegionEntry> regionEntriesInVM() {
+     return (Collection)_getMap().values();
+   }
+ 
+   public final boolean containsKey(Object key) {
+     RegionEntry re = getEntry(key);
+     if (re == null) {
+       return false;
+     }
+     if (re.isRemoved()) {
+       return false;
+     }
+     return true;
+   }
+ 
+   public RegionEntry getEntry(Object key) {
+     RegionEntry re = (RegionEntry)_getMap().get(key);
+     if (re != null && re.isMarkedForEviction()) {
+       // entry has been faulted in from HDFS
+       return null;
+     }
+     return re;
+   }
+ 
+   protected RegionEntry getEntry(EntryEventImpl event) {
+     return getEntry(event.getKey());
+   }
+ 
+ 
+   @Override
+   public final RegionEntry getEntryInVM(Object key) {
+     return (RegionEntry)_getMap().get(key);
+   }
+ 
+ 
+   public final RegionEntry putEntryIfAbsent(Object key, RegionEntry re) {
+     RegionEntry value = (RegionEntry)_getMap().putIfAbsent(key, re);
+     if (value == null && (re instanceof OffHeapRegionEntry) 
+         && _isOwnerALocalRegion() && _getOwner().isThisRegionBeingClosedOrDestroyed()) {
+       // prevent orphan during concurrent destroy (#48068)
+       if (_getMap().remove(key, re)) {
+         ((OffHeapRegionEntry)re).release();
+       }
+       _getOwner().checkReadiness(); // throw RegionDestroyedException
+     }
+     return value;
+   }
+ 
+   @Override
+   public final RegionEntry getOperationalEntryInVM(Object key) {
+     RegionEntry re = (RegionEntry)_getMap().get(key);
+     if (re != null && re.isMarkedForEviction()) {
+       // entry has been faulted in from HDFS
+       return null;
+     }
+     return re;
+   }
+  
+ 
+   public final void removeEntry(Object key, RegionEntry re, boolean updateStat) {
+     if (re.isTombstone() && _getMap().get(key) == re && !re.isMarkedForEviction()){
+       logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace"));
+       return; // can't remove tombstones except from the tombstone sweeper
+     }
+     if (_getMap().remove(key, re)) {
+       re.removePhase2();
+       if (updateStat) {
+         incEntryCount(-1);
+       }
+     }
+   }
+ 
+   public final void removeEntry(Object key, RegionEntry re, boolean updateStat,
+       EntryEventImpl event, final LocalRegion owner,
+       final IndexUpdater indexUpdater) {
+     boolean success = false;
+     if (re.isTombstone()&& _getMap().get(key) == re && !re.isMarkedForEviction()) {
+       logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace"));
+       return; // can't remove tombstones except from the tombstone sweeper
+     }
+     try {
+       if (indexUpdater != null) {
+         indexUpdater.onEvent(owner, event, re);
+       }
+ 
+       //This is messy, but custom eviction calls removeEntry
+       //rather than re.destroy I think to avoid firing callbacks, etc.
+       //However, the value still needs to be set to removePhase1
+       //in order to remove the entry from disk.
+       if(event.isCustomEviction() && !re.isRemoved()) {
+         try {
+           re.removePhase1(owner, false);
+         } catch (RegionClearedException e) {
+           //that's ok, we were just trying to do evict incoming eviction
+         }
+       }
+       
+       if (_getMap().remove(key, re)) {
+         re.removePhase2();
+         success = true;
+         if (updateStat) {
+           incEntryCount(-1);
+         }
+       }
+     } finally {
+       if (indexUpdater != null) {
+         indexUpdater.postEvent(owner, event, re, success);
+       }
+     }
+   }
+ 
+   protected final void incEntryCount(int delta) {
+     LocalRegion lr = _getOwner();
+     if (lr != null) {
+       CachePerfStats stats = lr.getCachePerfStats();
+       if (stats != null) {
+         stats.incEntryCount(delta);
+       }
+     }
+   }
+   
+   final void incClearCount(LocalRegion lr) {
+     if (lr != null && !(lr instanceof HARegion)) {
+       CachePerfStats stats = lr.getCachePerfStats();
+       if (stats != null) {
+         stats.incClearCount();
+       }
+     }
+   }
+ 
+   private void _mapClear() {
+     _getMap().clear();
+   }
+   
+   public void close() {
+     /*
+     for (SuspectEntryList l: this.suspectEntries.values()) {
+       for (EntryEventImpl e: l) {
+         e.release();
+       }
+     }
+     */
+     clear(null);
+   }
+   
+   /**
+    * Clear the region and, if an RVV is given, return a collection of the
+    * version sources in all remaining tags
+    */
+   public Set<VersionSource> clear(RegionVersionVector rvv)
+   {
+     Set<VersionSource> result = new HashSet<VersionSource>();
+     
+     if(!_isOwnerALocalRegion()) {
+       //Fix for #41333. Just clear the the map
+       //if we failed during initialization.
+       _mapClear();
+       return null;
+     }
+     if (logger.isDebugEnabled()) {
+       logger.debug("Clearing entries for {} rvv={}", _getOwner(), " rvv=" + rvv);
+     }
+     LocalRegion lr = _getOwner();
+     RegionVersionVector localRvv = lr.getVersionVector();
+     incClearCount(lr);
+     // lock for size calcs if the region might have tombstones
+     Object lockObj = lr.getConcurrencyChecksEnabled()? lr.getSizeGuard() : new Object();
+     synchronized (lockObj) {
+       if (rvv == null) {
+         int delta = 0;
+         try {
+           delta = sizeInVM(); // TODO soplog need to determine if stats should
+                               // reflect only size in memory or the complete thing
+         } catch (GemFireIOException e) {
+           // ignore rather than throwing an exception during cache close
+         }
+         int tombstones = lr.getTombstoneCount();
+         _mapClear();
+         _getOwner().updateSizeOnClearRegion(delta - tombstones);
+         _getOwner().incTombstoneCount(-tombstones);
+         if (delta != 0) {
+           incEntryCount(-delta);
+         }
+       } else {
+         int delta = 0;
+         int tombstones = 0;
+         VersionSource myId = _getOwner().getVersionMember();
+         if (localRvv != rvv) {
+           localRvv.recordGCVersions(rvv);
+         }
+         final boolean isTraceEnabled = logger.isTraceEnabled();
+         for (RegionEntry re : regionEntries()) {
+           synchronized(re) {
+             Token value = re.getValueAsToken();
+             // if it's already being removed or the entry is being created we leave it alone
+             if (value == Token.REMOVED_PHASE1 || value == Token.REMOVED_PHASE2) {
+               continue;
+             }
+             
+             VersionSource id = re.getVersionStamp().getMemberID();
+             if (id == null) {
+               id = myId;
+             }
+             if (rvv.contains(id, re.getVersionStamp().getRegionVersion())) {
+               if (isTraceEnabled) {
+                 logger.trace("region clear op is removing {} {}", re.getKey(), re.getVersionStamp());
+               }
+ 
+               boolean tombstone = re.isTombstone();
+               // note: it.remove() did not reliably remove the entry so we use remove(K,V) here
+               if (_getMap().remove(re.getKey(), re)) {
+                 if (OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()) {
+                   GatewaySenderEventImpl.release(re._getValue()); // OFFHEAP _getValue ok
+                 }
+                 //If this is an overflow only region, we need to free the entry on
+                 //disk at this point.
+                 try {
+                   re.removePhase1(lr, true);
+                 } catch (RegionClearedException e) {
+                   //do nothing, it's already cleared.
+                 }
+                 re.removePhase2();
+                 lruEntryDestroy(re);
+                 if (tombstone) {
+                   _getOwner().incTombstoneCount(-1);
+                   tombstones += 1;
+                 } else {
+                   delta += 1;
+                 }
+               }
+             } else { // rvv does not contain this entry so it is retained
+               result.add(id);
+             }
+           }
+         }
+         _getOwner().updateSizeOnClearRegion(delta);
+         incEntryCount(-delta);
+         incEntryCount(-tombstones);
+         if (logger.isDebugEnabled()) {
+           logger.debug("Size after clearing = {}", _getMap().size());
+         }
+         if (isTraceEnabled && _getMap().size() < 20) {
+           _getOwner().dumpBackingMap();
+         }
+       }
+     }
+     return result;
+   }
+   
+   public void lruUpdateCallback()
+   {
+     // By default do nothing; LRU maps needs to override this method
+   }
+   public void lruUpdateCallback(boolean b)
+   {
+     // By default do nothing; LRU maps needs to override this method
+   }
+   public void lruUpdateCallback(int i)
+   {
+     // By default do nothing; LRU maps needs to override this method
+   }
+ 
+   public boolean disableLruUpdateCallback()
+   {
+     // By default do nothing; LRU maps needs to override this method
+     return false;
+   }
+ 
+   public void enableLruUpdateCallback()
+   {
+     // By default do nothing; LRU maps needs to override this method
+   }
+ 
+   public void resetThreadLocals()
+   {
+     // By default do nothing; LRU maps needs to override this method
+   }
+ 
+   /**
+    * Tell an LRU that a new entry has been created
+    */
+   protected void lruEntryCreate(RegionEntry e)
+   {
+     // do nothing by default
+   }
+ 
+   /**
+    * Tell an LRU that an existing entry has been destroyed
+    */
+   protected void lruEntryDestroy(RegionEntry e)
+   {
+     // do nothing by default
+   }
+ 
+   /**
+    * Tell an LRU that an existing entry has been modified
+    */
+   protected void lruEntryUpdate(RegionEntry e)
+   {
+     // do nothing by default
+   }
+ 
+   @Override
+   public void decTxRefCount(RegionEntry e)
+   {
+     LocalRegion lr = null;
+     if (_isOwnerALocalRegion()) {
+       lr = _getOwner();
+     }
+     e.decRefCount(null, lr);
+   }
+ 
+   public boolean lruLimitExceeded() {
+     return false;
+   }
+ 
+   public void lruCloseStats() {
+     // do nothing by default
+   }
+   
+   public void lruEntryFaultIn(LRUEntry entry) {
+     // do nothing by default
+   }
+   
+   /**
+    * Process an incoming version tag for concurrent operation detection.
+    * This must be done before modifying the region entry.
+    * @param re the entry that is to be modified
+    * @param event the modification to the entry
+    * @throws InvalidDeltaException if the event contains a delta that cannot be applied
+    * @throws ConcurrentCacheModificationException if the event is in conflict
+    *    with a previously applied change
+    */
+   private void processVersionTag(RegionEntry re, EntryEventImpl event) {
+     if (re.getVersionStamp() != null) {
+       re.getVersionStamp().processVersionTag(event);
+       
+       // during initialization we record version tag info to detect ops the
+       // image provider hasn't seen
+       VersionTag<?> tag = event.getVersionTag();
+       if (tag != null && !event.getRegion().isInitialized()) {
+         ImageState is = event.getRegion().getImageState();
+         if (is != null && !event.getRegion().isUsedForPartitionedRegionBucket()) {
+           if (logger.isTraceEnabled()) {
+             logger.trace("recording version tag in image state: {}", tag);
+           }
+           is.addVersionTag(event.getKey(), tag);
+         }
+       }
+     }
+   }
+   
+   private void processVersionTagForGII(RegionEntry re, LocalRegion owner, VersionTag entryVersion, boolean isTombstone, InternalDistributedMember sender, boolean checkConflicts) {
+     
+     re.getVersionStamp().processVersionTag(_getOwner(), entryVersion, isTombstone, false, owner.getMyId(), sender, checkConflicts);
+   }
+ 
+   public void copyRecoveredEntries(RegionMap rm) {
+     //We need to sort the tombstones before scheduling them,
+     //so that they will be in the correct order.
+     OrderedTombstoneMap<RegionEntry> tombstones = new OrderedTombstoneMap<RegionEntry>();
+     if (rm != null) {
+       CustomEntryConcurrentHashMap<Object, Object> other = ((AbstractRegionMap)rm)._getMap();
+       Iterator<Map.Entry<Object, Object>> it = other
+           .entrySetWithReusableEntries().iterator();
+       while (it.hasNext()) {
+         Map.Entry<Object, Object> me = it.next();
+         it.remove(); // This removes the RegionEntry from "rm" but it does not decrement its refcount to an offheap value.
+         RegionEntry oldRe = (RegionEntry)me.getValue();
+         Object key = me.getKey();
+         
+         @Retained @Released Object value = oldRe._getValueRetain((RegionEntryContext) ((AbstractRegionMap) rm)._getOwnerObject(), true);
+ 
+         try {
+           if (value == Token.NOT_AVAILABLE) {
+             // fix for bug 43993
+             value = null;
+           }
+           if (value == Token.TOMBSTONE && !_getOwner().getConcurrencyChecksEnabled()) {
+             continue;
+           }
+           RegionEntry newRe = getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value);
+           copyRecoveredEntry(oldRe, newRe);
+           // newRe is now in this._getMap().
+           if (newRe.isTombstone()) {
+             VersionTag tag = newRe.getVersionStamp().asVersionTag();
+             tombstones.put(tag, newRe);
+           }
+           _getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe));
+           incEntryCount(1);
+           lruEntryUpdate(newRe);
+         } finally {
+           if (OffHeapHelper.release(value)) {
+             ((OffHeapRegionEntry)oldRe).release();
+           }
+         }
+         lruUpdateCallback();
+       }
+     } else {
+       incEntryCount(size());
+       for (Iterator<RegionEntry> iter = regionEntries().iterator(); iter.hasNext(); ) {
+         RegionEntry re = iter.next();
+         if (re.isTombstone()) {
+           if (re.getVersionStamp() == null) { // bug #50992 - recovery from versioned to non-versioned
+             incEntryCount(-1);
+             iter.remove();
+             continue;
+           } else {
+             tombstones.put(re.getVersionStamp().asVersionTag(), re);
+           }
+         }
+         _getOwner().updateSizeOnCreate(re.getKey(), _getOwner().calculateRegionEntryValueSize(re));
+       }
+       // Since lru was not being done during recovery call it now.
+       lruUpdateCallback();
+     }
+     
+     //Schedule all of the tombstones, now that we have sorted them
+     Map.Entry<VersionTag, RegionEntry> entry;
+     while((entry = tombstones.take()) != null) {
+       // refresh the tombstone so it doesn't time out too soon
+       _getOwner().scheduleTombstone(entry.getValue(), entry.getKey());
+     }
+     
+   }
+   
+   protected void copyRecoveredEntry(RegionEntry oldRe, RegionEntry newRe) {
+     if(newRe.getVersionStamp() != null) {
+       newRe.getVersionStamp().setMemberID(oldRe.getVersionStamp().getMemberID());
+       newRe.getVersionStamp().setVersions(oldRe.getVersionStamp().asVersionTag());
+     }
+     
+     if (newRe instanceof AbstractOplogDiskRegionEntry) {
+       ((AbstractOplogDiskRegionEntry)newRe).setDiskId(oldRe);
+       _getOwner().getDiskRegion().replaceIncompatibleEntry((DiskEntry) oldRe, (DiskEntry) newRe);
+     }
+     _getMap().put(newRe.getKey(), newRe);
+   }
+ 
+   @Retained     // Region entry may contain an off-heap value
+   public final RegionEntry initRecoveredEntry(Object key, DiskEntry.RecoveredEntry value) {
+     boolean needsCallback = false;
+     @Retained RegionEntry newRe = getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value);
+     synchronized (newRe) {
+       if (value.getVersionTag()!=null && newRe.getVersionStamp()!=null) {
+         newRe.getVersionStamp().setVersions(value.getVersionTag());
+       }
+       RegionEntry oldRe = putEntryIfAbsent(key, newRe);
+       while (oldRe != null) {
+         synchronized (oldRe) {
+           if (oldRe.isRemoved() && !oldRe.isTombstone()) {
+             oldRe = putEntryIfAbsent(key, newRe);
+             if (oldRe != null) {
+               if (_isOwnerALocalRegion()) {
+                 _getOwner().getCachePerfStats().incRetries();
+               }
+             }
+           } 
+           /*
+            * Entry already exists which should be impossible.
+            * Free the current entry (if off-heap) and
+            * throw an exception.
+            */
+           else {
+             if (newRe instanceof OffHeapRegionEntry) {
+               ((OffHeapRegionEntry) newRe).release();
+             }
+ 
+             throw new IllegalStateException("Could not recover entry for key " + key + ".  The entry already exists!");
+           }
+         } // synchronized
+       }
+       if (_isOwnerALocalRegion()) {
+         _getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe));
+         if (newRe.isTombstone()) {
+           // refresh the tombstone so it doesn't time out too soon
+           _getOwner().scheduleTombstone(newRe, newRe.getVersionStamp().asVersionTag());
+         }
+         
+         incEntryCount(1); // we are creating an entry that was recovered from disk including tombstone
+       }
+       lruEntryUpdate(newRe);
+       needsCallback = true;
+     }
+     if (needsCallback) {
+       lruUpdateCallback();
+     }
+     
+     EntryLogger.logRecovery(_getOwnerObject(), key, value);
+     
+     return newRe;
+   }
+   
+   public final RegionEntry updateRecoveredEntry(Object key, DiskEntry.RecoveredEntry value) {
+     boolean needsCallback = false;
+     RegionEntry re = getEntry(key);
+     if (re == null) {
+       return null;
+     }
+     synchronized (re) {
+       if (re.isRemoved() && !re.isTombstone()) {
+         return null;
+       }
+       if (value.getVersionTag()!=null && re.getVersionStamp()!=null) {
+         re.getVersionStamp().setVersions(value.getVersionTag());
+       }
+       try {
+         if (_isOwnerALocalRegion()) {
+           if (re.isTombstone()) {
+             // when a tombstone is to be overwritten, unschedule it first
+             _getOwner().unscheduleTombstone(re);
+           }
+           final int oldSize = _getOwner().calculateRegionEntryValueSize(re);
+           re.setValue(_getOwner(), value); // OFFHEAP no need to call AbstractRegionMap.prepareValueForCache because setValue is overridden for disk and that code takes apart value (RecoveredEntry) and prepares its nested value for the cache
+           if (re.isTombstone()) {
+             _getOwner().scheduleTombstone(re, re.getVersionStamp().asVersionTag());
+           }
+           _getOwner().updateSizeOnPut(key, oldSize, _getOwner().calculateRegionEntryValueSize(re));
+         } else {
+           DiskEntry.Helper.updateRecoveredEntry((PlaceHolderDiskRegion)_getOwnerObject(),
+               (DiskEntry)re, value, (RegionEntryContext) _getOwnerObject());
+         }
+       } catch (RegionClearedException rce) {
+         throw new IllegalStateException("RegionClearedException should never happen in this context", rce);
+       }
+       lruEntryUpdate(re);
+       needsCallback = true;
+     }
+     if (needsCallback) {
+       lruUpdateCallback();
+     }
+     
+     EntryLogger.logRecovery(_getOwnerObject(), key, value);
+     
+     return re;
+   }
+ 
+   public final boolean initialImagePut(final Object key,
+                                        final long lastModified,
+                                        Object newValue,
+                                        final boolean wasRecovered,
+                                        boolean deferLRUCallback,
+                                        VersionTag entryVersion, InternalDistributedMember sender, boolean isSynchronizing)
+   {
+     boolean result = false;
+     boolean done = false;
+     boolean cleared = false;
+     final LocalRegion owner = _getOwner();
+     
+     if (newValue == Token.TOMBSTONE && !owner.getConcurrencyChecksEnabled()) {
+       return false;
+     }
+ 
+     if (owner instanceof HARegion && newValue instanceof CachedDeserializable) {
+       Object actualVal = ((CachedDeserializable)newValue)
+         .getDeserializedValue(null, null);
+       if (actualVal instanceof HAEventWrapper) {
+         HAEventWrapper haEventWrapper = (HAEventWrapper)actualVal;
+         // Key was removed at sender side so not putting it into the HARegion
+         if (haEventWrapper.getClientUpdateMessage() == null) {
+           return false;
+         }
+         // Getting the instance from singleton CCN..This assumes only one bridge
+         // server in the VM
+         HAContainerWrapper haContainer = (HAContainerWrapper)CacheClientNotifier
+             .getInstance().getHaContainer();
+         Map.Entry entry = null;
+         HAEventWrapper original = null;
+         synchronized (haContainer) {
+           entry = (Map.Entry)haContainer.getEntry(haEventWrapper);
+           if (entry != null) {
+             original = (HAEventWrapper)entry.getKey();
+             original.incAndGetReferenceCount();
+           }
+           else {
+             haEventWrapper.incAndGetReferenceCount();
+             haEventWrapper.setHAContainer(haContainer);
+             haContainer.put(haEventWrapper, haEventWrapper
+                 .getClientUpdateMessage());
+             haEventWrapper.setClientUpdateMessage(null);
+             haEventWrapper.setIsRefFromHAContainer(true);
+           }
+         }
+         if (entry != null) {
+           HARegionQueue.addClientCQsAndInterestList(entry, haEventWrapper,
+               haContainer, owner.getName());
+           haEventWrapper.setClientUpdateMessage(null);
+           newValue = CachedDeserializableFactory.create(original,
+               ((CachedDeserializable)newValue).getSizeInBytes());
+         }
+       }
+     }
+     
+     try {
+       RegionEntry newRe = getEntryFactory().createEntry(owner, key,
+           Token.REMOVED_PHASE1);
+       EntryEventImpl event = null;
+       
+       @Retained @Released Object oldValue = null;
+       
+       try {
+       RegionEntry oldRe = null;
+       synchronized (newRe) {
+         try {
+           oldRe = putEntryIfAbsent(key, newRe);
+           while (!done && oldRe != null) {
+             synchronized (oldRe) {
+               if (oldRe.isRemovedPhase2()) {
+                 oldRe = putEntryIfAbsent(key, newRe);
+                 if (oldRe != null) {
+                   owner.getCachePerfStats().incRetries();
+                 }
+               }
+               else {
+                 boolean acceptedVersionTag = false;
+                 if (entryVersion != null && owner.concurrencyChecksEnabled) {
+                   Assert.assertTrue(entryVersion.getMemberID() != null, "GII entry versions must have identifiers");
+                   try {
+                     boolean isTombstone = (newValue == Token.TOMBSTONE);
+                     // don't reschedule the tombstone if it hasn't changed
+                     boolean isSameTombstone = oldRe.isTombstone() && isTombstone
+                               && oldRe.getVersionStamp().asVersionTag()
+                                 .equals(entryVersion);
+                     if (isSameTombstone) {
+                       return true;
+                     }
+                     processVersionTagForGII(oldRe, owner, entryVersion, isTombstone, sender, !wasRecovered || isSynchronizing);
+                     acceptedVersionTag = true;
+                   } catch (ConcurrentCacheModificationException e) {
+                     return false;
+                   }
+                 }
+                 final boolean oldIsTombstone = oldRe.isTombstone();
+                 final int oldSize = owner.calculateRegionEntryValueSize(oldRe);
+                 // Neeraj: The below if block is to handle the special
+                 // scenario witnessed in SqlFabric for now. (Though its
+                 // a general scenario). The scenario is that during GII
+                 // it is possible that updates start coming before the
+                 // base value reaches through GII. In that scenario the deltas
+                 // for that particular key is kept on being added to a list
+                 // of deltas. When the base value arrives through this path
+                 // of GII the oldValue will be that list of deltas. When the
+                 // base values arrives the deltas are applied one by one on that list.
+                 // The same scenario is applicable for GemFire also but the below 
+                 // code will be executed only in case of sqlfabric now. Probably
+                 // the code can be made more generic for both SQL Fabric and GemFire.
+                 if (indexUpdater != null) {
+                   oldValue = oldRe.getValueInVM(owner); // OFFHEAP: ListOfDeltas
+                   if (oldValue instanceof ListOfDeltas) {
+                   // apply the deltas on this new value. update index
+                   // Make a new event object
+                   // make it an insert operation
+                   LocalRegion rgn = owner;
+                   if (owner instanceof BucketRegion) {
+                     rgn = ((BucketRegion)owner).getPartitionedRegion();
+                   }
+                   event = EntryEventImpl.create(rgn, Operation.CREATE, key, null,
+                       Boolean.TRUE /* indicate that GII is in progress */,
+                       false, null);
+                   try {
+                   event.setOldValue(newValue);
+                   if (logger.isDebugEnabled()) {
+                     logger.debug("initialImagePut: received base value for list of deltas; event: {}", event);
+                   }
+                   ((ListOfDeltas)oldValue).apply(event);
+                   Object preparedNewValue =oldRe.prepareValueForCache(owner,
+                       event.getNewValueAsOffHeapDeserializedOrRaw(), true);
 -                  if(preparedNewValue instanceof Chunk) {
++                  if(preparedNewValue instanceof ObjectChunk) {
+                     event.setNewValue(preparedNewValue);
+                   }
+                   oldRe.setValue(owner, preparedNewValue, event);
+                   //event.setNewValue(event.getOldValue());
+                   event.setOldValue(null);
+                   try {
+                     indexUpdater.onEvent(owner, event, oldRe);
+                     lruEntryUpdate(oldRe);
+                     owner.updateSizeOnPut(key, oldSize, owner.calculateRegionEntryValueSize(oldRe));
+                     EntryLogger.logInitialImagePut(_getOwnerObject(), key, newValue);
+                     result = true;
+                     done = true;
+                     break;
+                   } finally {
+                     // this must be done within the oldRe sync block
+                     indexUpdater.postEvent(owner, event, oldRe, done);
+                   }
+                   } finally {
+                     if (event != null) {
+                       event.release();
+                       event = null;
+                     }
+                   }
+                   }
+                 }
+                 try {
+                   if (indexUpdater != null) {
+                     event = EntryEventImpl.create(owner, Operation.CREATE, key,
+                         newValue,
+                         Boolean.TRUE /* indicate that GII is in progress */,
+                         false, null);
+                     indexUpdater.onEvent(owner, event, oldRe);
+                   }
+                   result = oldRe.initialImagePut(owner, lastModified, newValue, wasRecovered, acceptedVersionTag);
+                   if (result) {
+                     if (oldIsTombstone) {
+                       owner.unscheduleTombstone(oldRe);
+                       if (newValue != Token.TOMBSTONE){
+                         lruEntryCreate(oldRe);
+                       } else {
+                         lruEntryUpdate(oldRe);
+                       }
+                     }
+                     if (newValue == Token.TOMBSTONE) {
+                       if (owner.getServerProxy() == null &&
+                           owner.getVersionVector().isTombstoneTooOld(entryVersion.getMemberID(), entryVersion.getRegionVersion())) {
+                         // the received tombstone has already been reaped, so don't retain it
+                         removeTombstone(oldRe, entryVersion, false, false);
+                         return false;
+                       } else {
+                         owner.scheduleTombstone(oldRe, entryVersion);
+                         lruEntryDestroy(oldRe);
+                       }
+                     } else {
+                       int newSize = owner.calculateRegionEntryValueSize(oldRe);
+                       if(!oldIsTombstone) {
+                         owner.updateSizeOnPut(key, oldSize, newSize);
+                       } else {
+                         owner.updateSizeOnCreate(key, newSize);
+                       }
+                       EntryLogger.logInitialImagePut(_getOwnerObject(), key, newValue);
+                     }
+                   }
+                   if (owner.getIndexManager() != null) {
+                     owner.getIndexManager().updateIndexes(oldRe, oldRe.isRemoved() ? IndexManager.ADD_ENTRY : IndexManager.UPDATE_ENTRY, 
+                         oldRe.isRemoved() ? IndexProtocol.OTHER_OP : IndexProtocol.AFTER_UPDATE_OP);
+                   }
+                   done = true;
+                 } finally {
+                   if (indexUpdater != null) {
+                     indexUpdater.postEvent(owner, event, oldRe, result);
+                   }
+                   if (event != null) {
+                     event.release();
+                     event = null;
+                   }
+                 }
+               }
+             }
+           }
+           if (!done) {
+             boolean versionTagAccepted = false;
+             if (entryVersion != null && owner.concurrencyChecksEnabled) {
+               Assert.assertTrue(entryVersion.getMemberID() != null, "GII entry versions must have identifiers");
+               try {
+                 boolean isTombstone = (newValue == Token.TOMBSTONE);
+                 processVersionTagForGII(newRe, owner, entryVersion, isTombstone, sender, !wasRecovered || isSynchronizing);
+                 versionTagAccepted = true;
+               } catch (ConcurrentCacheModificationException e) {
+                 return false;
+               }
+             }
+             result = newRe.initialImageInit(owner, lastModified, newValue,
+                 true, wasRecovered, versionTagAccepted);
+             try {
+               if (result) {
+                 if (indexUpdater != null) {
+                   event = EntryEventImpl.create(owner, Operation.CREATE, key,
+                       newValue,
+                       Boolean.TRUE /* indicate that GII is in progress */,
+                       false, null);
+                   indexUpdater.onEvent(owner, event, newRe);
+                 }
+                 if (newValue == Token.TOMBSTONE) {
+                   owner.scheduleTombstone(newRe, entryVersion);
+                 } else {
+                   owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(newRe));
+                   EntryLogger.logInitialImagePut(_getOwnerObject(), key, newValue);
+                   lruEntryCreate(newRe);
+                 }
+                 incEntryCount(1);
+               }
+               //Update local indexes
+               if (owner.getIndexManager() != null) {
+                 owner.getIndexManager().updateIndexes(newRe, newRe.isRemoved() ? IndexManager.REMOVE_ENTRY : IndexManager.UPDATE_ENTRY, 
+                     newRe.isRemoved() ? IndexProtocol.OTHER_OP : IndexProtocol.AFTER_UPDATE_OP);
+               }
+               done = true;
+             } finally {
+               if (result && indexUpdater != null) {
+                 indexUpdater.postEvent(owner, event, newRe, done);
+               }
+               if (event != null) {
+                 event.release();
+                 event = null;
+               }
+             }
+           }
+         }
+         finally {
+           if (done && result) {
+             initialImagePutEntry(newRe);
+           }
+           if (!done) {
+             removeEntry(key, newRe, false);
+             if (owner.getIndexManager() != null) {
+               owner.getIndexManager().updateIndexes(newRe, IndexManager.REMOVE_ENTRY, IndexProtocol.OTHER_OP);
+             }
+           }
+        } 
+       } // synchronized
+       } finally {
+         if (event != null) event.release();
+         OffHeapHelper.release(oldValue);
+       }
+     } catch(RegionClearedException rce) {
+       //Asif: do not issue any sort of callbacks
+       done = false;
+       cleared= true;
+     }catch(QueryException qe) {
+       done = false;
+       cleared= true;
+     }
+     finally {
+       if (done && !deferLRUCallback) {
+         lruUpdateCallback();
+       }
+       else if (!cleared) {
+         resetThreadLocals();
+       }
+     }
+     return result;
+   }
+ 
+   protected void initialImagePutEntry(RegionEntry newRe) {
+   }
+ 
+   boolean confirmEvictionDestroy(RegionEntry re)
+   {
+     /* We arn't in an LRU context, and should never get here */
+     Assert.assertTrue(false,
+         "Not an LRU region, can not confirm LRU eviction operation");
+     return true;
+   }
+ 
+   public final boolean destroy(EntryEventImpl event,
+                                boolean inTokenMode,
+                                boolean duringRI,
+                                boolean cacheWrite,
+                                boolean isEviction,
+                                Object expectedOldValue,
+                                boolean removeRecoveredEntry)
+   throws CacheWriterException, EntryNotFoundException, TimeoutException {
+     
+     final LocalRegion owner = _getOwner();
+ 
+     if (owner == null) {
+       Assert.assertTrue(false, "The owner for RegionMap " + this    // "fix" for bug 32440
+           + " is null for event " + event);
+     }
+     
+     boolean retry = true;
+     
+     while (retry) {
+       retry = false;
+ 
+       boolean opCompleted = false;
+       boolean doPart3 = false;
+ 
+       // We need to acquire the region entry while holding the lock to avoid #45620.
+       // However, we also want to release the lock before distribution to prevent
+       // potential deadlocks.  The outer try/finally ensures that the lock will be
+       // released without fail.  I'm avoiding indenting just to preserve the ability
+       // to track diffs since the code is fairly complex.
+       boolean doUnlock = true;
+       lockForCacheModification(owner, event);
+       try {
+ 
+ 
+         RegionEntry re = getOrCreateRegionEntry(owner, event, Token.REMOVED_PHASE1, null, true, true); 
+         RegionEntry tombstone = null;
+         boolean haveTombstone = false;
+         /*
+          * Execute the test hook runnable inline (not threaded) if it is not null. 
+          */
+         if(null != testHookRunnableFor48182) {
+           testHookRunnableFor48182.run();
+         }    
+ 
+         try {
+           if (logger.isTraceEnabled(LogMarker.LRU_TOMBSTONE_COUNT) && !(owner instanceof HARegion)) {
+             logger.trace(LogMarker.LRU_TOMBSTONE_COUNT,
+                 "ARM.destroy() inTokenMode={}; duringRI={}; riLocalDestroy={}; withRepl={}; fromServer={}; concurrencyEnabled={}; isOriginRemote={}; isEviction={}; operation={}; re={}",
+                 inTokenMode, duringRI, event.isFromRILocalDestroy(), owner.dataPolicy.withReplication(), event.isFromServer(),
+                 owner.concurrencyChecksEnabled, event.isOriginRemote(), isEviction, event.getOperation(), re);
+           }
+           if (event.isFromRILocalDestroy()) {
+             // for RI local-destroy we don't want to keep tombstones.
+             // In order to simplify things we just set this recovery
+             // flag to true to force the entry to be removed
+             removeRecoveredEntry = true;
+           }
+           // the logic in this method is already very involved, and adding tombstone
+           // permutations to (re != null) greatly complicates it.  So, we check
+           // for a tombstone here and, if found, pretend for a bit that the entry is null
+           if (re != null && re.isTombstone() && !removeRecoveredEntry) {
+             tombstone = re;
+             haveTombstone = true;
+             re = null;
+           }
+           IndexManager oqlIndexManager = owner.getIndexManager() ; 
+           if (re == null) {
+             // we need to create an entry if in token mode or if we've received
+             // a destroy from a peer or WAN gateway and we need to retain version
+             // information for concurrency checks
+             boolean retainForConcurrency = (!haveTombstone
+                 && (owner.dataPolicy.withReplication() || event.isFromServer())
+                 && owner.concurrencyChecksEnabled
+                 && (event.isOriginRemote() /* destroy received from other must create tombstone */
+                     || event.isFromWANAndVersioned() /* wan event must create a tombstone */
+                     || event.isBridgeEvent())); /* event from client must create a tombstone so client has a version # */ 
+             if (inTokenMode
+                 || retainForConcurrency) { 
+               // removeRecoveredEntry should be false in this case
+               RegionEntry newRe = getEntryFactory().createEntry(owner,
+                   event.getKey(),
+                   Token.REMOVED_PHASE1);
+               // Fix for Bug #44431. We do NOT want to update the region and wait
+               // later for index INIT as region.clear() can cause inconsistency if
+               // happened in parallel as it also does index INIT.
+               if (oqlIndexManager != null) {
+                 oqlIndexManager.waitForIndexInit();
+               }
+               try {
+                 synchronized (newRe) {
+                   RegionEntry oldRe = putEntryIfAbsent(event.getKey(), newRe);
+                   while (!opCompleted && oldRe != null) {
+                     synchronized (oldRe) {
+                       if (oldRe.isRemovedPhase2()) {
+                         oldRe = putEntryIfAbsent(event.getKey(), newRe);
+                         if (oldRe != null) {
+                           owner.getCachePerfStats().incRetries();
+                         }
+                       } else {
+                         event.setRegionEntry(oldRe);
+ 
+                         // Last transaction related eviction check. This should
+                         // prevent
+                         // transaction conflict (caused by eviction) when the entry
+                         // is being added to transaction state.
+                         if (isEviction) {
+                           if (!confirmEvictionDestroy(oldRe) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) {
+                             opCompleted = false;
+                             return opCompleted;
+                           }
+                         }
+                         try {
+                           //if concurrency checks are enabled, destroy will
+                           //set the version tag
+                           boolean destroyed = destroyEntry(oldRe, event, inTokenMode, cacheWrite, expectedOldValue, false, removeRecoveredEntry);
+                           if (destroyed) {
+                             if (retainForConcurrency) {
+                               owner.basicDestroyBeforeRemoval(oldRe, event);
+                             }
+                             owner.basicDestroyPart2(oldRe, event, inTokenMode,
+                                 false /* conflict with clear */, duringRI, true);
+                             lruEntryDestroy(oldRe);
+                             doPart3 = true;
+                           }
+                         }
+                         catch (RegionClearedException rce) {
+                           // Ignore. The exception will ensure that we do not update
+                           // the LRU List
+                           owner.basicDestroyPart2(oldRe, event, inTokenMode,
+                               true/* conflict with clear */, duringRI, true);
+                           doPart3 = true;
+                         } catch (ConcurrentCacheModificationException ccme) {
+                           VersionTag tag = event.getVersionTag();
+                           if (tag != null && tag.isTimeStampUpdated()) {
+                             // Notify gateways of new time-stamp.
+                             owner.notifyTimestampsToGateways(event);
+                           }
+                           throw ccme;
+                         }
+                         re = oldRe;
+                         opCompleted = true;
+                       }
+                     } // synchronized oldRe
+                   } // while
+                   if (!opCompleted) {
+                     // The following try has a finally that cleans up the newRe.
+                     // This is only needed if newRe was added to the map which only
+                     // happens if we didn't get completed with oldRe in the above while loop.
+                     try {
+                       re = newRe;
+                       event.setRegionEntry(newRe);
+ 
+                       try {
+                         //if concurrency checks are enabled, destroy will
+                         //set the version tag
+                         if (isEviction) {
+                           opCompleted = false;
+                           return opCompleted; 
+                         }
+                         opCompleted = destroyEntry(newRe, event, inTokenMode, cacheWrite, expectedOldValue, true, removeRecoveredEntry);
+                         if (opCompleted) {
+                           // This is a new entry that was created because we are in
+                           // token mode or are accepting a destroy operation by adding
+                           // a tombstone.  There is no oldValue, so we don't need to
+                           // call updateSizeOnRemove
+                           //                    owner.recordEvent(event);
+                           event.setIsRedestroyedEntry(true);  // native clients need to know if the entry didn't exist
+                           if (retainForConcurrency) {
+                             owner.basicDestroyBeforeRemoval(oldRe, event);
+                           }
+                           owner.basicDestroyPart2(newRe, event, inTokenMode,
+                               false /* conflict with clear */, duringRI, true);
+                           doPart3 = true;
+                         }
+                       }
+                       catch (RegionClearedException rce) {
+                         // Ignore. The exception will ensure that we do not update
+                         // the LRU List
+                         opCompleted = true;
+                         EntryLogger.logDestroy(event);
+                         owner.basicDestroyPart2(newRe, event, inTokenMode, true /* conflict with clear*/, duringRI, true);
+                         doPart3 = true;
+                       } catch (ConcurrentCacheModificationException ccme) {
+                         VersionTag tag = event.getVersionTag();
+                         if (tag != null && tag.isTimeStampUpdated()) {
+                           // Notify gateways of new time-stamp.
+                           owner.notifyTimestampsToGateways(event);
+                         }
+                         throw ccme;
+                       }
+                       // Note no need for LRU work since the entry is destroyed
+                       // and will be removed when gii completes
+                     } finally {
+                       if (!opCompleted && !haveTombstone  /* to fix bug 51583 do this for all operations */ ) {
+                         removeEntry(event.getKey(), newRe, false);
+                       }
+                       if (!opCompleted && isEviction) {
+                         removeEntry(event.getKey(), newRe, false);
+                       }
+                     }
+                   } // !opCompleted
+                 } // synchronized newRe
+               } finally {
+                 if (oqlIndexManager != null) {
+                   oqlIndexManager.countDownIndexUpdaters();
+                 }
+               }
+             } // inTokenMode or tombstone creation
+             else {
+               if (!isEviction || owner.concurrencyChecksEnabled) {                                 
+                 // The following ensures that there is not a concurrent operation
+                 // on the entry and leaves behind a tombstone if concurrencyChecksEnabled.
+                 // It fixes bug #32467 by propagating the destroy to the server even though
+                 // the entry isn't in the client
+                 RegionEntry newRe = haveTombstone? tombstone : getEntryFactory().createEntry(owner, event.getKey(),
+                     Token.REMOVED_PHASE1);
+                 synchronized(newRe) {
+                   if (haveTombstone && !tombstone.isTombstone()) {
+                     // we have to check this again under synchronization since it may have changed
+                     retry = true;
+                     //retryEntry = tombstone; // leave this in place for debugging
+                     continue;
+                   }
+                   re = (RegionEntry)_getMap().putIfAbsent(event.getKey(), newRe);
+                   if (re != null && re != tombstone) {
+                     // concurrent change - try again
+                     retry = true;
+                     //retryEntry = tombstone; // leave this in place for debugging
+                     continue;
+                   }
+                   else if (!isEviction) {
+                     boolean throwex = false;
+                     EntryNotFoundException ex =  null;
+                     try {
+                       if (!cacheWrite) {
+                         throwex = true;
+                       } else {
+                         try {
+                           if (!removeRecoveredEntry) {
+                             throwex = !owner.bridgeWriteBeforeDestroy(event, expectedOldValue);
+                           }
+                         } catch (EntryNotFoundException e) {
+                           throwex = true;
+                           ex = e; 
+                         }
+                       }
+                       if (throwex) {
+                         if (!event.isOriginRemote() && !event.getOperation().isLocal() &&
+                             (event.isFromBridgeAndVersioned() ||  // if this is a replayed client event that already has a version
+                                 event.isFromWANAndVersioned())) { // or if this is a WAN event that has been applied in another system
+                           // we must distribute these since they will update the version information in peers
+                           if (logger.isDebugEnabled()) {
+                             logger.debug("ARM.destroy is allowing wan/client destroy of {} to continue", event.getKey());
+                           }
+                           throwex = false;
+                           event.setIsRedestroyedEntry(true);
+                           // Distribution of this op happens on re and re might me null here before
+                           // distributing this destroy op.
+                           if (re == null) {
+                             re = newRe;
+                           }
+                           doPart3 = true;
+                         }
+                       }
+                       if (throwex) {                    
+                         if (ex == null) {
+                           // Fix for 48182, check cache state and/or region state before sending entry not found.
+                           // this is from the server and any exceptions will propogate to the client
+                           owner.checkEntryNotFound(event.getKey());
+                         } else {
+                           throw ex;
+                         }
+                       }
+                     } finally {
+                       // either remove the entry or leave a tombstone
+                       try {
+                         if (!event.isOriginRemote() && event.getVersionTag() != null && owner.concurrencyChecksEnabled) {
+                           // this shouldn't fail since we just created the entry.
+                           // it will either generate a tag or apply a server's version tag
+                           processVersionTag(newRe, event);
+                           if (doPart3) {
+                             owner.generateAndSetVersionTag(event, newRe);
+                           }
+                           try {
+                             owner.recordEvent(event);
+                             newRe.makeTombstone(owner, event.getVersionTag());
+                           } catch (RegionClearedException e) {
+                             // that's okay - when writing a tombstone into a disk, the
+                             // region has been cleared (including this tombstone)
+                           }
+                           opCompleted = true;
+                           //                    lruEntryCreate(newRe);
+                         } else if (!haveTombstone) {
+                           try {
+                             assert newRe != tombstone;
+                             newRe.setValue(owner, Token.REMOVED_PHASE2);
+                             removeEntry(event.getKey(), newRe, false);
+                           } catch (RegionClearedException e) {
+                             // that's okay - we just need to remove the new entry
+                           }
+                         } else if (event.getVersionTag() != null ) { // haveTombstone - update the tombstone version info
+                           processVersionTag(tombstone, event);
+                           if (doPart3) {
+                             owner.generateAndSetVersionTag(event, newRe);
+                           }
+                           // This is not conflict, we need to persist the tombstone again with new version tag 
+                           try {
+                             tombstone.setValue(owner, Token.TOMBSTONE);
+                           } catch (RegionClearedException e) {
+                             // that's okay - when writing a tombstone into a disk, the
+                             // region has been cleared (including this tombstone)
+                           }
+                           owner.recordEvent(event);
+                           owner.rescheduleTombstone(tombstone, event.getVersionTag());
+                           owner.basicDestroyPart2(tombstone, event, inTokenMode, true /* conflict with clear*/, duringRI, true);
+                           opCompleted = true;
+                         }
+                       } catch (ConcurrentCacheModificationException ccme) {
+                         VersionTag tag = event.getVersionTag();
+                         if (tag != null && tag.isTimeStampUpdated()) {
+                           // Notify gateways of new time-stamp.
+                           owner.notifyTimestampsToGateways(event);
+                         }
+                         throw ccme;
+                       }
+                     }
+                   }
+                 } // synchronized(newRe)
+               }
+             }
+           } // no current entry
+           else { // current entry exists
+             if (oqlIndexManager != null) {
+               oqlIndexManager.waitForIndexInit();
+             }
+             try {
+               synchronized (re) {
+                 // if the entry is a tombstone and the event is from a peer or a client
+                 // then we allow the operation to be performed so that we can update the
+                 // version stamp.  Otherwise we would retain an old version stamp and may allow
+                 // an operation that is older than the destroy() to be applied to the cache
+                 // Bug 45170: If removeRecoveredEntry, we treat tombstone as regular entry to be deleted
+                 boolean createTombstoneForConflictChecks = (owner.concurrencyChecksEnabled
+                     && (event.isOriginRemote() || event.getContext() != null || removeRecoveredEntry));
+                 if (!re.isRemoved() || createTombstoneForConflictChecks) {
+                   if (re.isRemovedPhase2()) {
+                     retry = true;
+                     continue;
+                   }
+                   if (!event.isOriginRemote() && event.getOperation().isExpiration()) {
+                     // If this expiration started locally then only do it if the RE is not being used by a tx.
+                     if (re.isInUseByTransaction()) {
+                       opCompleted = false;
+                       return opCompleted;
+                     }
+                   }
+                   event.setRegionEntry(re);
+ 
+                   // See comment above about eviction checks
+                   if (isEviction) {
+                     assert expectedOldValue == null;
+                     if (!confirmEvictionDestroy(re) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) {
+                       opCompleted = false;
+                       return opCompleted;
+                     }
+                   }
+ 
+                   boolean removed = false;
+                   try {
+                     opCompleted = destroyEntry(re, event, inTokenMode, cacheWrite, expectedOldValue, false, removeRecoveredEntry);
+                     if (opCompleted) {
+                       // It is very, very important for Partitioned Regions to keep
+                       // the entry in the map until after distribution occurs so that other
+                       // threads performing a create on this entry wait until the destroy
+                       // distribution is finished.
+                       // keeping backup copies consistent. Fix for bug 35906.
+                       // -- mthomas 07/02/2007 <-- how about that date, kinda cool eh?
+                       owner.basicDestroyBeforeRemoval(re, event);
+ 
+                       // do this before basicDestroyPart2 to fix bug 31786
+                       if (!inTokenMode) {
+                         if ( re.getVersionStamp() == null) {
+                           re.removePhase2();
+                           removeEntry(event.getKey(), re, true, event, owner,
+                               indexUpdater);
+                           removed = true;
+                         }
+                       }
+                       if (inTokenMode && !duringRI) {
+                         event.inhibitCacheListenerNotification(true);
+                       }
+                       doPart3 = true;
+                       owner.basicDestroyPart2(re, event, inTokenMode, false /* conflict with clear*/, duringRI, true);
+                       //                  if (!re.isTombstone() || isEviction) {
+                       lruEntryDestroy(re);
+                       //                  } else {
+                       //                    lruEntryUpdate(re);
+                       //                    lruUpdateCallback = true;
+                       //                  }
+                     } else {
+                       if (!inTokenMode) {
+                         EntryLogger.logDestroy(event);
+                         owner.recordEvent(event);
+                         if (re.getVersionStamp() == null) {
+                           re.removePhase2();
+                           removeEntry(event.getKey(), re, true, event, owner,
+                               indexUpdater);
+                           lruEntryDestroy(re);
+                         } else {
+                           if (re.isTombstone()) {
+                             // the entry is already a tombstone, but we're destroying it
+                             // again, so we need to reschedule the tombstone's expiration
+                             if (event.isOriginRemote()) {
+                               owner.rescheduleTombstone(re, re.getVersionStamp().asVersionTag());
+                             }
+                           }
+                         }
+                         lruEntryDestroy(re);
+                         opCompleted = true;
+                       }
+                     }
+                   }
+                   catch (RegionClearedException rce) {
+                     // Ignore. The exception will ensure that we do not update
+                     // the LRU List
+                     opCompleted = true;
+                     owner.recordEvent(event);
+                     if (inTokenMode && !duringRI) {
+                       event.inhibitCacheListenerNotification(true);
+                     }
+                     owner.basicDestroyPart2(re, event, inTokenMode, true /*conflict with clear*/, duringRI, true);
+                     doPart3 = true;
+                   }
+                   finally {
+                     if (re.isRemoved() && !re.isTombstone()) {
+                       if (!removed) {
+                         removeEntry(event.getKey(), re, true, event, owner,
+                             indexUpdater);
+                       }
+                     }
+                   }
+                 } // !isRemoved
+                 else { // already removed
+                   if (owner.isHDFSReadWriteRegion() && re.isRemovedPhase2()) {
+                     // For HDFS region there may be a race with eviction
+                     // so retry the operation. fixes bug 49150
+                     retry = true;
+                     continue;
+                   }
+                   if (re.isTombstone() && event.getVersionTag() != null) {
+                     // if we're dealing with a tombstone and this is a remote event
+                     // (e.g., from cache client update thread) we need to update
+                     // the tombstone's version information
+                     // TODO use destroyEntry() here
+                     processVersionTag(re, event);
+                     try {
+                       re.makeTombstone(owner, event.getVersionTag());
+                     } catch (RegionClearedException e) {
+                       // that's okay - when writing a tombstone into a disk, the
+                       // region has been cleared (including this tombstone)
+                     }
+                   }
+                   if (expectedOldValue != null) {
+                     // if re is removed then there is no old value, so return false
+                     return false;
+                   }
+ 
+                   if (!inTokenMode && !isEviction) {
+                     owner.checkEntryNotFound(event.getKey());
+                   }
+                 }
+               } // synchronized re
+             }  catch (ConcurrentCacheModificationException ccme) {
+               VersionTag tag = event.getVersionTag();
+               if (tag != null && tag.isTimeStampUpdated()) {
+                 // Notify gateways of new time-stamp.
+                 owner.notifyTimestampsToGateways(event);
+               }
+               throw ccme;
+             } finally {
+               if (oqlIndexManager != null) {
+                 oqlIndexManager.countDownIndexUpdaters();
+               }
+             }
+             // No need to call lruUpdateCallback since the only lru action
+             // we may have taken was lruEntryDestroy. This fixes bug 31759.
+ 
+           } // current entry exists
+           if(opCompleted) {
+             EntryLogger.logDestroy(event);
+           }
+           return opCompleted;
+         }
+         finally {
+           releaseCacheModificationLock(owner, event);
+           doUnlock = false;
+ 
+           try {
+             // If concurrency conflict is there and event contains gateway version tag then
+             // do NOT distribute.
+             if (event.isConcurrencyConflict() &&
+                 (event.getVersionTag() != null && event.getVersionTag().isGatewayTag())) {
+               doPart3 = false;
+             }
+             // distribution and listener notification
+             if (doPart3) {
+               owner.basicDestroyPart3(re, event, inTokenMode, duringRI, true, expectedOldValue);
+             }
+           } finally {
+             if (opCompleted) {
+               if (re != null) {
+                 owner.cancelExpiryTask(re);
+               } else if (tombstone != null) {
+                 owner.cancelExpiryTask(tombstone);
+               }
+             }
+           }
+         }
+ 
+       } finally { // failsafe on the read lock...see comment above
+         if (doUnlock) {
+           releaseCacheModificationLock(owner, event);
+         }
+       }
+     } // retry loop
+     return false;
+   }
+ 
+   public final void txApplyDestroy(Object key, TransactionId txId,
+       TXRmtEvent txEvent, boolean inTokenMode, boolean inRI, Operation op, 
+       EventID eventId, Object aCallbackArgument,List<EntryEventImpl> pendingCallbacks,FilterRoutingInfo filterRoutingInfo,ClientProxyMembershipID bridgeContext,
+       boolean isOriginRemote, TXEntryState txEntryState, VersionTag versionTag, long tailKey)
+   {
+     final boolean isDebugEnabled = logger.isDebugEnabled();
+     
+     final LocalRegion owner = _getOwner();
+     owner.checkBeforeEntrySync(txEvent);
+ 
+     final boolean isRegionReady = !inTokenMode;
+     final boolean hasRemoteOrigin = !((TXId)txId).getMemberId().equals(owner.getMyId());
+     boolean cbEventInPending = false;
+     lockForTXCacheModification(owner, versionTag);
+     IndexManager oqlIndexManager = owner.getIndexManager() ; 
+     try {
+       RegionEntry re = getEntry(key);
+       if (re != null) {
+         // Fix for Bug #44431. We do NOT want to update the region and wait
+         // later for index INIT as region.clear() can cause inconsistency if
+         // happened in parallel as it also does index INIT.
+         if (oqlIndexManager != null) {
+           oqlIndexManager.waitForIndexInit();
+         }
+         try {
+           synchronized (re) {
+             if (!re.isRemoved() || re.isTombstone()) {
+               EntryEventImpl sqlfEvent = null;
+               @Retained @Released Object oldValue = re.getValueInVM(owner);
+               try {
+               final int oldSize = owner.calculateRegionEntryValueSize(re);
+               // Create an entry event only if the calling context is
+               // a receipt of a TXCommitMessage AND there are callbacks installed
+               // for this region
+               boolean invokeCallbacks = shouldCreateCBEvent(owner, false/*isInvalidate*/, isRegionReady || inRI);
+               EntryEventImpl cbEvent = createCBEvent(owner, op,
+                   key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+               try {
+               
+               if (/* owner.isUsedForPartitionedRegionBucket() && */ 
+                   indexUpdater != null) {
+                  sqlfEvent = cbEvent;
+               } else {
+                 if (owner.isUsedForPartitionedRegionBucket()) {
+                   txHandleWANEvent(owner, cbEvent, txEntryState);
+                 }
+                 cbEvent.setRegionEntry(re);
+               }
+               cbEvent.setOldValue(oldValue);
+               if (isDebugEnabled) {
+                 logger.debug("txApplyDestroy cbEvent={}", cbEvent);
+               }
+               
+               txRemoveOldIndexEntry(Operation.DESTROY, re);
+               if (txEvent != null) {
+                 txEvent.addDestroy(owner, re, re.getKey(),aCallbackArgument);
+               }
+               boolean clearOccured = false;
+               try {
+                 processAndGenerateTXVersionTag(owner, cbEvent, re, txEntryState);
+                 if (inTokenMode) {
+                   if (oldValue == Token.TOMBSTONE) {
+                     owner.unscheduleTombstone(re);
+                   }
+                   re.setValue(owner, Token.DESTROYED);
+                 }
+                 else {
+                   if (!re.isTombstone()) {
+                     if (sqlfEvent != null) {
+                       re.removePhase1(owner, false); // fix for bug 43063
+                       re.removePhase2();
+                       removeEntry(key, re, true, sqlfEvent, owner, indexUpdater);
+                     } else {
+                       if (shouldPerformConcurrencyChecks(owner, cbEvent) && cbEvent.getVersionTag() != null) {
+                         re.makeTombstone(owner, cbEvent.getVersionTag());
+                       } else {
+                         re.removePhase1(owner, false); // fix for bug 43063
+                         re.removePhase2();
+                         removeEntry(key, re, false);
+                       }
+                     }
+                   } else {
+                     owner.rescheduleTombstone(re, re.getVersionStamp().asVersionTag());
+                   }
+                 }
+                 EntryLogger.logTXDestroy(_getOwnerObject(), key);
+                 owner.updateSizeOnRemove(key, oldSize);
+               }
+               catch (RegionClearedException rce) {
+                 clearOccured = true;
+               }
+               owner.txApplyDestroyPart2(re, re.getKey(), inTokenMode,
+                   clearOccured /* Clear Conflciting with the operation */);
+               if (invokeCallbacks) {
+                 switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
+                 if(pendingCallbacks==null) {
+                   owner.invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY,
+                       cbEvent, true/*callDispatchListenerEvent*/);
+                 } else {
+                   pendingCallbacks.add(cbEvent);
+                   cbEventInPending = true;
+                 }
+               }
+               if (!clearOccured) {
+                 lruEntryDestroy(re);
+               }
+               if (owner.concurrencyChecksEnabled && txEntryState != null && cbEvent!= null) {
+                 txEntryState.setVersionTag(cbEvent.getVersionTag());
+               }
+               } finally {
+                 if (!cbEventInPending) cbEvent.release();
+               }
+               } finally {
+                 OffHeapHelper.release(oldValue);
+               }
+             }
+           }
+         } finally {
+           if (oqlIndexManager != null) {
+             oqlIndexManager.countDownIndexUpdaters();
+           }
+         }
+       } else if (inTokenMode || owner.concurrencyChecksEnabled) {
+         // treating tokenMode and re == null as same, since we now want to
+         // generate versions and Tombstones for destroys
+         boolean dispatchListenerEvent = inTokenMode;
+         boolean opCompleted = false;
+         RegionEntry newRe = getEntryFactory().createEntry(owner, key,
+             Token.DESTROYED);
+         if ( oqlIndexManager != null) {
+           oqlIndexManager.waitForIndexInit();
+         }
+         EntryEventImpl cbEvent = null;
+         try {
+           synchronized (newRe) {
+             RegionEntry oldRe = putEntryIfAbsent(key, newRe);
+             while (!opCompleted && oldRe != null) {
+               synchronized (oldRe) {
+                 if (oldRe.isRemovedPhase2()) {
+                   oldRe = putEntryIfAbsent(key, newRe);
+                   if (oldRe != null) {
+                     owner.getCachePerfStats().incRetries();
+                   }
+                 }
+                 else {
+                   try {
+                     boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady || inRI);
+                     cbEvent = createCBEvent(owner, op,
+                         key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+                     try {
+                     cbEvent.setRegionEntry(oldRe);
+                     cbEvent.setOldValue(Token.NOT_AVAILABLE);
+                     if (isDebugEnabled) {
+                       logger.debug("txApplyDestroy token mode cbEvent={}", cbEvent);
+                     }
+                     if (owner.isUsedForPartitionedRegionBucket()) {
+                       txHandleWANEvent(owner, cbEvent, txEntryState);
+                     }
+                     processAndGenerateTXVersionTag(owner, cbEvent, oldRe, txEntryState);
+                     if (invokeCallbacks) {
+                       switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
+                       if(pendingCallbacks==null) {
+                         owner.invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY,
+                             cbEvent, dispatchListenerEvent);
+                       } else {
+                         pendingCallbacks.add(cbEvent);
+                         cbEventInPending = true;
+                       }
+                     }
+                     int oldSize = 0;
+                     boolean wasTombstone = oldRe.isTombstone();
+                     {
+                       if (!wasTombstone) {
+                         oldSize = owner.calculateRegionEntryValueSize(oldRe);
+                       }
+                     }
+                     oldRe.setValue(owner, Token.DESTROYED);
+                     EntryLogger.logTXDestroy(_getOwnerObject(), key);
+                     if (wasTombstone) {
+                       owner.unscheduleTombstone(oldRe);
+                     }
+                     owner.updateSizeOnRemove(oldRe.getKey(), oldSize);
+                     owner.txApplyDestroyPart2(oldRe, oldRe.getKey(), inTokenMode,
+                         false /* Clear Conflicting with the operation */);
+                     lruEntryDestroy(oldRe);
+                     } finally {
+                       if (!cbEventInPending) cbEvent.release();
+                     }
+                   }
+                   catch (RegionClearedException rce) {
+                     owner.txApplyDestroyPart2(oldRe, oldRe.getKey(), inTokenMode,
+                         true /* Clear Conflicting with the operation */);
+                   }
+                   if (shouldPerformConcurrencyChecks(owner, cbEvent) && cbEvent.getVersionTag() != null) {
+                     oldRe.makeTombstone(owner, cbEvent.getVersionTag());
+                   } else if (!inTokenMode) {
+                     // only remove for NORMAL regions if they do not generate versions see 51781
+                     oldRe.removePhase1(owner, false); // fix for bug 43063
+                     oldRe.removePhase2();
+                     removeEntry(key, oldRe, false);
+                   }
+                   opCompleted = true;
+                 }
+               }
+             }
+             if (!opCompleted) {
+               // already has value set to Token.DESTROYED
+               opCompleted = true;
+               boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady || inRI);
+               cbEvent = createCBEvent(owner, op,
+                   key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+               try {
+               cbEvent.setRegionEntry(newRe);
+               cbEvent.setOldValue(Token.NOT_AVAILABLE);
+               if (isDebugEnabled) {
+                 logger.debug("txApplyDestroy token mode cbEvent={}", cbEvent);
+               }
+               if (owner.isUsedForPartitionedRegionBucket()) {
+                 txHandleWANEvent(owner, cbEvent, txEntryState);
+               }
+               processAndGenerateTXVersionTag(owner, cbEvent, newRe, txEntryState);
+               if (invokeCallbacks) {
+                 switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
+                 if(pendingCallbacks==null) {
+                   owner.invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY,
+                       cbEvent, dispatchListenerEvent);
+                 } else {
+                   pendingCallbacks.add(cbEvent);
+                   cbEventInPending = true;
+                 }
+               }
+               EntryLogger.logTXDestroy(_getOwnerObject(), key);
+               owner.updateSizeOnCreate(newRe.getKey(), 0);
+               if (shouldPerformConcurrencyChecks(owner, cbEvent) && cbEvent.getVersionTag() != null) {
+                 newRe.makeTombstone(owner, cbEvent.getVersionTag());
+               } else if (!inTokenMode) {
+                 // only remove for NORMAL regions if they do not generate versions see 51781
+                 newRe.removePhase1(owner, false); // fix for bug 43063
+                 newRe.removePhase2();
+                 removeEntry(key, newRe, false);
+               }
+               owner
+                   .txApplyDestroyPart2(newRe, newRe.getKey(), inTokenMode,
+                       false /*clearConflict*/);
+               // Note no need for LRU work since the entry is destroyed
+               // and will be removed when gii completes
+               } finally {
+                 if (!cbEventInPending) cbEvent.release();
+               }
+             }
+             if (owner.concurrencyChecksEnabled && txEntryState != null && cbEvent != null) {
+               txEntryState.setVersionTag(cbEvent.getVersionTag());
+             }
+           }
+         } catch (RegionClearedException e) {
+           // TODO 
+         } finally {
+           if (oqlIndexManager != null) {
+             oqlIndexManager.countDownIndexUpdaters();
+           }
+         }
+       } else if (re == null) {
+         // Fix bug#43594
+         // In cases where bucket region is re-created, it may so happen that 
+         // the destroy is already applied on the Initial image provider, thus 
+         // causing region entry to be absent. 
+         // Notify clients with client events.
+         EntryEventImpl cbEvent = createCBEvent(owner, op,
+             key, null, txId, txEvent, eventId, aCallbackArgument, 
+             filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+         try {
+         if (owner.isUsedForPartitionedRegionBucket()) {
+           txHandleWANEvent(owner, cbEvent, txEntryState);
+         }
+         switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
+         if (pendingCallbacks == null) {
+           owner.invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY,cbEvent,false);
+         } else {
+           pendingCallbacks.add(cbEvent);
+           cbEventInPending = true;
+         }
+         } finally {
+           if (!cbEventInPending) cbEvent.release();
+         }
+       }
+     } catch( DiskAccessException dae) {
+       owner.handleDiskAccessException(dae);
+       throw dae;
+     }
+     finally {
+       releaseTXCacheModificationLock(owner, versionTag);
+     }
+   }
+ 
+   /**
+    * If true then invalidates that throw EntryNotFoundException
+    * or that are already invalid will first call afterInvalidate on CacheListeners. 
+    * The old value on the event passed to afterInvalidate will be null.
+    */
+   public static boolean FORCE_INVALIDATE_EVENT = Boolean.getBoolean("gemfire.FORCE_INVALIDATE_EVENT");
+ 
+   /**
+    * If the FORCE_INVALIDATE_EVENT flag is true
+    * then invoke callbacks on the given event.
+    */
+   void forceInvalidateEvent(EntryEventImpl event) {
+     if (FORCE_INVALIDATE_EVENT) {
+       event.invokeCallbacks(_getOwner(), false, false);
+     }
+   }
+   
+   public final boolean invalidate(EntryEventImpl event,
+       boolean invokeCallbacks, boolean forceNewEntry, boolean forceCallbacks)
+       throws EntryNotFoundException
+   {
+     final boolean isDebugEnabled = logger.isDebugEnabled();
+     
+     final LocalRegion owner = _getOwner();
+     if (owner == null) {
+       // "fix" for bug 32440
+       Ass

<TRUNCATED>