You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:36:38 UTC
[76/83] [abbrv] incubator-geode git commit: GEODE-917: Merge branch
'feature/GEODE-917' into develop
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index 0000000,6fe60ce..699de2f
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@@ -1,0 -1,4164 +1,4164 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package com.gemstone.gemfire.internal.cache;
+
+
+ import java.io.IOException;
+ import java.lang.reflect.Method;
+ import java.util.Collection;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicInteger;
+
+ import com.gemstone.gemfire.internal.cache.region.entry.RegionEntryFactoryBuilder;
+ import org.apache.logging.log4j.Logger;
+
+ import com.gemstone.gemfire.GemFireIOException;
+ import com.gemstone.gemfire.InvalidDeltaException;
+ import com.gemstone.gemfire.cache.CacheRuntimeException;
+ import com.gemstone.gemfire.cache.CacheWriter;
+ import com.gemstone.gemfire.cache.CacheWriterException;
+ import com.gemstone.gemfire.cache.CustomEvictionAttributes;
+ import com.gemstone.gemfire.cache.DiskAccessException;
+ import com.gemstone.gemfire.cache.EntryExistsException;
+ import com.gemstone.gemfire.cache.EntryNotFoundException;
+ import com.gemstone.gemfire.cache.Operation;
+ import com.gemstone.gemfire.cache.RegionDestroyedException;
+ import com.gemstone.gemfire.cache.TimeoutException;
+ import com.gemstone.gemfire.cache.TransactionId;
+ import com.gemstone.gemfire.cache.query.IndexMaintenanceException;
+ import com.gemstone.gemfire.cache.query.QueryException;
+ import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexProtocol;
+ import com.gemstone.gemfire.distributed.DistributedMember;
+ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+ import com.gemstone.gemfire.internal.Assert;
+ import com.gemstone.gemfire.internal.ClassPathLoader;
+ import com.gemstone.gemfire.internal.cache.DiskInitFile.DiskRegionFlag;
+ import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo;
+ import com.gemstone.gemfire.internal.cache.delta.Delta;
+ import com.gemstone.gemfire.internal.cache.ha.HAContainerWrapper;
+ import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
+ import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.HAEventWrapper;
+ import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
+ import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
+ import com.gemstone.gemfire.internal.cache.versions.VersionHolder;
+ import com.gemstone.gemfire.internal.cache.versions.VersionSource;
+ import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+ import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+ import com.gemstone.gemfire.internal.concurrent.MapCallbackAdapter;
+ import com.gemstone.gemfire.internal.concurrent.MapResult;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
-import com.gemstone.gemfire.internal.offheap.Chunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+ import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper;
+ import com.gemstone.gemfire.internal.offheap.ReferenceCountHelper;
+ import com.gemstone.gemfire.internal.offheap.annotations.Released;
+ import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+ import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
+ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap;
+ import com.gemstone.gemfire.pdx.PdxInstance;
+ import com.gemstone.gemfire.pdx.PdxSerializationException;
+ import com.gemstone.gemfire.pdx.internal.ConvertableToBytes;
+
+ /**
+ * Abstract implementation of {@link RegionMap}that has all the common
+ * behavior.
+ *
+ * @since 3.5.1
+ *
+ * @author Darrel Schneider
+ *
+ */
+
+ //Asif: In case of sqlFabric System, we are creating a different set of RegionEntry
+ // which are derived from the concrete GFE RegionEntry classes.
+ // In future if any new concrete RegionEntry class is defined, the new SqlFabric
+ // RegionEntry Classes need to be created. There is a junit test in sqlfabric
+ // which checks for RegionEntry classes of GFE and validates the same with its
+ // own classes.
+
+ public abstract class AbstractRegionMap implements RegionMap {
+
+ private static final Logger logger = LogService.getLogger();
+
+ /** The underlying map for this region. */
+ protected CustomEntryConcurrentHashMap<Object, Object> map;
+ /** An internal Listener for index maintenance for SQLFabric. */
+ private final IndexUpdater indexUpdater;
+
+ /**
+ * This test hook is used to force the conditions for defect 48182.
+ * This hook is used by Bug48182JUnitTest.
+ */
+ static Runnable testHookRunnableFor48182 = null;
+
+ private RegionEntryFactory entryFactory;
+ private Attributes attr;
+ private transient Object owner; // the region that owns this map
+
+ protected AbstractRegionMap(InternalRegionArguments internalRegionArgs) {
+ if (internalRegionArgs != null) {
+ this.indexUpdater = internalRegionArgs.getIndexUpdater();
+ }
+ else {
+ this.indexUpdater = null;
+ }
+ }
+
+ public final IndexUpdater getIndexUpdater() {
+ return this.indexUpdater;
+ }
+
+ protected void initialize(Object owner,
+ Attributes attr,
+ InternalRegionArguments internalRegionArgs,
+ boolean isLRU) {
+ _setAttributes(attr);
+ setOwner(owner);
+ _setMap(createConcurrentMap(attr.initialCapacity, attr.loadFactor,
+ attr.concurrencyLevel, false,
+ new AbstractRegionEntry.HashRegionEntryCreator()));
+
+ final GemFireCacheImpl cache;
+ boolean isDisk;
+ boolean withVersioning = false;
+ boolean offHeap = false;
+ if (owner instanceof LocalRegion) {
+ LocalRegion region = (LocalRegion)owner;
+ isDisk = region.getDiskRegion() != null;
+ cache = region.getGemFireCache();
+ withVersioning = region.getConcurrencyChecksEnabled();
+ offHeap = region.getOffHeap();
+ }
+ else if (owner instanceof PlaceHolderDiskRegion) {
+ offHeap = ((PlaceHolderDiskRegion) owner).getOffHeap();
+ isDisk = true;
+ withVersioning = ((PlaceHolderDiskRegion)owner).getFlags().contains(
+ DiskRegionFlag.IS_WITH_VERSIONING);
+ cache = GemFireCacheImpl.getInstance();
+ }
+ else {
+ throw new IllegalStateException(
+ "expected LocalRegion or PlaceHolderDiskRegion");
+ }
+
+ if (cache != null && cache.isSqlfSystem()) {
+ String provider = GemFireCacheImpl.SQLF_ENTRY_FACTORY_PROVIDER;
+ try {
+ Class<?> factoryProvider = ClassPathLoader.getLatest().forName(provider);
+ Method method = factoryProvider.getDeclaredMethod(
+ "getRegionEntryFactory", new Class[] { Boolean.TYPE, Boolean.TYPE,
+ Boolean.TYPE, Object.class, InternalRegionArguments.class });
+ RegionEntryFactory ref = (RegionEntryFactory)method.invoke(null,
+ new Object[] { Boolean.valueOf(attr.statisticsEnabled),
+ Boolean.valueOf(isLRU), Boolean.valueOf(isDisk), owner,
+ internalRegionArgs });
+
+ // TODO need to have the SQLF entry factory support version stamp storage
+ setEntryFactory(ref);
+
+ }
+ catch (Exception e) {
+ throw new CacheRuntimeException(
+ "Exception in obtaining RegionEntry Factory" + " provider class ",
+ e) {
+ };
+ }
+ }
+ else {
+ setEntryFactory(new RegionEntryFactoryBuilder().getRegionEntryFactoryOrNull(attr.statisticsEnabled,isLRU,isDisk,withVersioning,offHeap));
+ }
+ }
+
+ protected CustomEntryConcurrentHashMap<Object, Object> createConcurrentMap(
+ int initialCapacity, float loadFactor, int concurrencyLevel,
+ boolean isIdentityMap,
+ CustomEntryConcurrentHashMap.HashEntryCreator<Object, Object> entryCreator) {
+ if (entryCreator != null) {
+ return new CustomEntryConcurrentHashMap<Object, Object>(initialCapacity, loadFactor,
+ concurrencyLevel, isIdentityMap, entryCreator);
+ }
+ else {
+ return new CustomEntryConcurrentHashMap<Object, Object>(initialCapacity,
+ loadFactor, concurrencyLevel, isIdentityMap);
+ }
+ }
+
+ public void changeOwner(LocalRegion r) {
+ if (r == _getOwnerObject()) {
+ return;
+ }
+ setOwner(r);
+ }
+
+ @Override
+ public final void setEntryFactory(RegionEntryFactory f) {
+ this.entryFactory = f;
+ }
+
+ public final RegionEntryFactory getEntryFactory() {
+ return this.entryFactory;
+ }
+
+ protected final void _setAttributes(Attributes a) {
+ this.attr = a;
+ }
+
+ public final Attributes getAttributes() {
+ return this.attr;
+ }
+
+ protected final LocalRegion _getOwner() {
+ return (LocalRegion)this.owner;
+ }
+
+ protected boolean _isOwnerALocalRegion() {
+ return this.owner instanceof LocalRegion;
+ }
+
+ protected final Object _getOwnerObject() {
+ return this.owner;
+ }
+
+ public final void setOwner(Object r) {
+ this.owner = r;
+ }
+
+ protected final CustomEntryConcurrentHashMap<Object, Object> _getMap() {
+ return this.map;
+ }
+
+ protected final void _setMap(CustomEntryConcurrentHashMap<Object, Object> m) {
+ this.map = m;
+ }
+
+ public int size()
+ {
+ return _getMap().size();
+ }
+
+ // this is currently used by stats and eviction
+ @Override
+ public int sizeInVM() {
+ return _getMap().size();
+ }
+
+ public boolean isEmpty()
+ {
+ return _getMap().isEmpty();
+ }
+
+ public Set keySet()
+ {
+ return _getMap().keySet();
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public Collection<RegionEntry> regionEntries() {
+ return (Collection)_getMap().values();
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public Collection<RegionEntry> regionEntriesInVM() {
+ return (Collection)_getMap().values();
+ }
+
+ public final boolean containsKey(Object key) {
+ RegionEntry re = getEntry(key);
+ if (re == null) {
+ return false;
+ }
+ if (re.isRemoved()) {
+ return false;
+ }
+ return true;
+ }
+
+ public RegionEntry getEntry(Object key) {
+ RegionEntry re = (RegionEntry)_getMap().get(key);
+ if (re != null && re.isMarkedForEviction()) {
+ // entry has been faulted in from HDFS
+ return null;
+ }
+ return re;
+ }
+
+ protected RegionEntry getEntry(EntryEventImpl event) {
+ return getEntry(event.getKey());
+ }
+
+
+ @Override
+ public final RegionEntry getEntryInVM(Object key) {
+ return (RegionEntry)_getMap().get(key);
+ }
+
+
+ public final RegionEntry putEntryIfAbsent(Object key, RegionEntry re) {
+ RegionEntry value = (RegionEntry)_getMap().putIfAbsent(key, re);
+ if (value == null && (re instanceof OffHeapRegionEntry)
+ && _isOwnerALocalRegion() && _getOwner().isThisRegionBeingClosedOrDestroyed()) {
+ // prevent orphan during concurrent destroy (#48068)
+ if (_getMap().remove(key, re)) {
+ ((OffHeapRegionEntry)re).release();
+ }
+ _getOwner().checkReadiness(); // throw RegionDestroyedException
+ }
+ return value;
+ }
+
+ @Override
+ public final RegionEntry getOperationalEntryInVM(Object key) {
+ RegionEntry re = (RegionEntry)_getMap().get(key);
+ if (re != null && re.isMarkedForEviction()) {
+ // entry has been faulted in from HDFS
+ return null;
+ }
+ return re;
+ }
+
+
+ public final void removeEntry(Object key, RegionEntry re, boolean updateStat) {
+ if (re.isTombstone() && _getMap().get(key) == re && !re.isMarkedForEviction()){
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace"));
+ return; // can't remove tombstones except from the tombstone sweeper
+ }
+ if (_getMap().remove(key, re)) {
+ re.removePhase2();
+ if (updateStat) {
+ incEntryCount(-1);
+ }
+ }
+ }
+
+ public final void removeEntry(Object key, RegionEntry re, boolean updateStat,
+ EntryEventImpl event, final LocalRegion owner,
+ final IndexUpdater indexUpdater) {
+ boolean success = false;
+ if (re.isTombstone()&& _getMap().get(key) == re && !re.isMarkedForEviction()) {
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace"));
+ return; // can't remove tombstones except from the tombstone sweeper
+ }
+ try {
+ if (indexUpdater != null) {
+ indexUpdater.onEvent(owner, event, re);
+ }
+
+ //This is messy, but custom eviction calls removeEntry
+ //rather than re.destroy I think to avoid firing callbacks, etc.
+ //However, the value still needs to be set to removePhase1
+ //in order to remove the entry from disk.
+ if(event.isCustomEviction() && !re.isRemoved()) {
+ try {
+ re.removePhase1(owner, false);
+ } catch (RegionClearedException e) {
+ //that's ok, we were just trying to do evict incoming eviction
+ }
+ }
+
+ if (_getMap().remove(key, re)) {
+ re.removePhase2();
+ success = true;
+ if (updateStat) {
+ incEntryCount(-1);
+ }
+ }
+ } finally {
+ if (indexUpdater != null) {
+ indexUpdater.postEvent(owner, event, re, success);
+ }
+ }
+ }
+
+ protected final void incEntryCount(int delta) {
+ LocalRegion lr = _getOwner();
+ if (lr != null) {
+ CachePerfStats stats = lr.getCachePerfStats();
+ if (stats != null) {
+ stats.incEntryCount(delta);
+ }
+ }
+ }
+
+ final void incClearCount(LocalRegion lr) {
+ if (lr != null && !(lr instanceof HARegion)) {
+ CachePerfStats stats = lr.getCachePerfStats();
+ if (stats != null) {
+ stats.incClearCount();
+ }
+ }
+ }
+
+ private void _mapClear() {
+ _getMap().clear();
+ }
+
+ public void close() {
+ /*
+ for (SuspectEntryList l: this.suspectEntries.values()) {
+ for (EntryEventImpl e: l) {
+ e.release();
+ }
+ }
+ */
+ clear(null);
+ }
+
+ /**
+ * Clear the region and, if an RVV is given, return a collection of the
+ * version sources in all remaining tags
+ */
+ public Set<VersionSource> clear(RegionVersionVector rvv)
+ {
+ Set<VersionSource> result = new HashSet<VersionSource>();
+
+ if(!_isOwnerALocalRegion()) {
+ //Fix for #41333. Just clear the the map
+ //if we failed during initialization.
+ _mapClear();
+ return null;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Clearing entries for {} rvv={}", _getOwner(), " rvv=" + rvv);
+ }
+ LocalRegion lr = _getOwner();
+ RegionVersionVector localRvv = lr.getVersionVector();
+ incClearCount(lr);
+ // lock for size calcs if the region might have tombstones
+ Object lockObj = lr.getConcurrencyChecksEnabled()? lr.getSizeGuard() : new Object();
+ synchronized (lockObj) {
+ if (rvv == null) {
+ int delta = 0;
+ try {
+ delta = sizeInVM(); // TODO soplog need to determine if stats should
+ // reflect only size in memory or the complete thing
+ } catch (GemFireIOException e) {
+ // ignore rather than throwing an exception during cache close
+ }
+ int tombstones = lr.getTombstoneCount();
+ _mapClear();
+ _getOwner().updateSizeOnClearRegion(delta - tombstones);
+ _getOwner().incTombstoneCount(-tombstones);
+ if (delta != 0) {
+ incEntryCount(-delta);
+ }
+ } else {
+ int delta = 0;
+ int tombstones = 0;
+ VersionSource myId = _getOwner().getVersionMember();
+ if (localRvv != rvv) {
+ localRvv.recordGCVersions(rvv);
+ }
+ final boolean isTraceEnabled = logger.isTraceEnabled();
+ for (RegionEntry re : regionEntries()) {
+ synchronized(re) {
+ Token value = re.getValueAsToken();
+ // if it's already being removed or the entry is being created we leave it alone
+ if (value == Token.REMOVED_PHASE1 || value == Token.REMOVED_PHASE2) {
+ continue;
+ }
+
+ VersionSource id = re.getVersionStamp().getMemberID();
+ if (id == null) {
+ id = myId;
+ }
+ if (rvv.contains(id, re.getVersionStamp().getRegionVersion())) {
+ if (isTraceEnabled) {
+ logger.trace("region clear op is removing {} {}", re.getKey(), re.getVersionStamp());
+ }
+
+ boolean tombstone = re.isTombstone();
+ // note: it.remove() did not reliably remove the entry so we use remove(K,V) here
+ if (_getMap().remove(re.getKey(), re)) {
+ if (OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()) {
+ GatewaySenderEventImpl.release(re._getValue()); // OFFHEAP _getValue ok
+ }
+ //If this is an overflow only region, we need to free the entry on
+ //disk at this point.
+ try {
+ re.removePhase1(lr, true);
+ } catch (RegionClearedException e) {
+ //do nothing, it's already cleared.
+ }
+ re.removePhase2();
+ lruEntryDestroy(re);
+ if (tombstone) {
+ _getOwner().incTombstoneCount(-1);
+ tombstones += 1;
+ } else {
+ delta += 1;
+ }
+ }
+ } else { // rvv does not contain this entry so it is retained
+ result.add(id);
+ }
+ }
+ }
+ _getOwner().updateSizeOnClearRegion(delta);
+ incEntryCount(-delta);
+ incEntryCount(-tombstones);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Size after clearing = {}", _getMap().size());
+ }
+ if (isTraceEnabled && _getMap().size() < 20) {
+ _getOwner().dumpBackingMap();
+ }
+ }
+ }
+ return result;
+ }
+
+ public void lruUpdateCallback()
+ {
+ // By default do nothing; LRU maps needs to override this method
+ }
+ public void lruUpdateCallback(boolean b)
+ {
+ // By default do nothing; LRU maps needs to override this method
+ }
+ public void lruUpdateCallback(int i)
+ {
+ // By default do nothing; LRU maps needs to override this method
+ }
+
+ public boolean disableLruUpdateCallback()
+ {
+ // By default do nothing; LRU maps needs to override this method
+ return false;
+ }
+
+ public void enableLruUpdateCallback()
+ {
+ // By default do nothing; LRU maps needs to override this method
+ }
+
+ public void resetThreadLocals()
+ {
+ // By default do nothing; LRU maps needs to override this method
+ }
+
+ /**
+ * Tell an LRU that a new entry has been created
+ */
+ protected void lruEntryCreate(RegionEntry e)
+ {
+ // do nothing by default
+ }
+
+ /**
+ * Tell an LRU that an existing entry has been destroyed
+ */
+ protected void lruEntryDestroy(RegionEntry e)
+ {
+ // do nothing by default
+ }
+
+ /**
+ * Tell an LRU that an existing entry has been modified
+ */
+ protected void lruEntryUpdate(RegionEntry e)
+ {
+ // do nothing by default
+ }
+
+ @Override
+ public void decTxRefCount(RegionEntry e)
+ {
+ LocalRegion lr = null;
+ if (_isOwnerALocalRegion()) {
+ lr = _getOwner();
+ }
+ e.decRefCount(null, lr);
+ }
+
+ public boolean lruLimitExceeded() {
+ return false;
+ }
+
+ public void lruCloseStats() {
+ // do nothing by default
+ }
+
+ public void lruEntryFaultIn(LRUEntry entry) {
+ // do nothing by default
+ }
+
+ /**
+ * Process an incoming version tag for concurrent operation detection.
+ * This must be done before modifying the region entry.
+ * @param re the entry that is to be modified
+ * @param event the modification to the entry
+ * @throws InvalidDeltaException if the event contains a delta that cannot be applied
+ * @throws ConcurrentCacheModificationException if the event is in conflict
+ * with a previously applied change
+ */
+ private void processVersionTag(RegionEntry re, EntryEventImpl event) {
+ if (re.getVersionStamp() != null) {
+ re.getVersionStamp().processVersionTag(event);
+
+ // during initialization we record version tag info to detect ops the
+ // image provider hasn't seen
+ VersionTag<?> tag = event.getVersionTag();
+ if (tag != null && !event.getRegion().isInitialized()) {
+ ImageState is = event.getRegion().getImageState();
+ if (is != null && !event.getRegion().isUsedForPartitionedRegionBucket()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("recording version tag in image state: {}", tag);
+ }
+ is.addVersionTag(event.getKey(), tag);
+ }
+ }
+ }
+ }
+
+ private void processVersionTagForGII(RegionEntry re, LocalRegion owner, VersionTag entryVersion, boolean isTombstone, InternalDistributedMember sender, boolean checkConflicts) {
+
+ re.getVersionStamp().processVersionTag(_getOwner(), entryVersion, isTombstone, false, owner.getMyId(), sender, checkConflicts);
+ }
+
+ public void copyRecoveredEntries(RegionMap rm) {
+ //We need to sort the tombstones before scheduling them,
+ //so that they will be in the correct order.
+ OrderedTombstoneMap<RegionEntry> tombstones = new OrderedTombstoneMap<RegionEntry>();
+ if (rm != null) {
+ CustomEntryConcurrentHashMap<Object, Object> other = ((AbstractRegionMap)rm)._getMap();
+ Iterator<Map.Entry<Object, Object>> it = other
+ .entrySetWithReusableEntries().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Object, Object> me = it.next();
+ it.remove(); // This removes the RegionEntry from "rm" but it does not decrement its refcount to an offheap value.
+ RegionEntry oldRe = (RegionEntry)me.getValue();
+ Object key = me.getKey();
+
+ @Retained @Released Object value = oldRe._getValueRetain((RegionEntryContext) ((AbstractRegionMap) rm)._getOwnerObject(), true);
+
+ try {
+ if (value == Token.NOT_AVAILABLE) {
+ // fix for bug 43993
+ value = null;
+ }
+ if (value == Token.TOMBSTONE && !_getOwner().getConcurrencyChecksEnabled()) {
+ continue;
+ }
+ RegionEntry newRe = getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value);
+ copyRecoveredEntry(oldRe, newRe);
+ // newRe is now in this._getMap().
+ if (newRe.isTombstone()) {
+ VersionTag tag = newRe.getVersionStamp().asVersionTag();
+ tombstones.put(tag, newRe);
+ }
+ _getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe));
+ incEntryCount(1);
+ lruEntryUpdate(newRe);
+ } finally {
+ if (OffHeapHelper.release(value)) {
+ ((OffHeapRegionEntry)oldRe).release();
+ }
+ }
+ lruUpdateCallback();
+ }
+ } else {
+ incEntryCount(size());
+ for (Iterator<RegionEntry> iter = regionEntries().iterator(); iter.hasNext(); ) {
+ RegionEntry re = iter.next();
+ if (re.isTombstone()) {
+ if (re.getVersionStamp() == null) { // bug #50992 - recovery from versioned to non-versioned
+ incEntryCount(-1);
+ iter.remove();
+ continue;
+ } else {
+ tombstones.put(re.getVersionStamp().asVersionTag(), re);
+ }
+ }
+ _getOwner().updateSizeOnCreate(re.getKey(), _getOwner().calculateRegionEntryValueSize(re));
+ }
+ // Since lru was not being done during recovery call it now.
+ lruUpdateCallback();
+ }
+
+ //Schedule all of the tombstones, now that we have sorted them
+ Map.Entry<VersionTag, RegionEntry> entry;
+ while((entry = tombstones.take()) != null) {
+ // refresh the tombstone so it doesn't time out too soon
+ _getOwner().scheduleTombstone(entry.getValue(), entry.getKey());
+ }
+
+ }
+
+ protected void copyRecoveredEntry(RegionEntry oldRe, RegionEntry newRe) {
+ if(newRe.getVersionStamp() != null) {
+ newRe.getVersionStamp().setMemberID(oldRe.getVersionStamp().getMemberID());
+ newRe.getVersionStamp().setVersions(oldRe.getVersionStamp().asVersionTag());
+ }
+
+ if (newRe instanceof AbstractOplogDiskRegionEntry) {
+ ((AbstractOplogDiskRegionEntry)newRe).setDiskId(oldRe);
+ _getOwner().getDiskRegion().replaceIncompatibleEntry((DiskEntry) oldRe, (DiskEntry) newRe);
+ }
+ _getMap().put(newRe.getKey(), newRe);
+ }
+
+ @Retained // Region entry may contain an off-heap value
+ public final RegionEntry initRecoveredEntry(Object key, DiskEntry.RecoveredEntry value) {
+ boolean needsCallback = false;
+ @Retained RegionEntry newRe = getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value);
+ synchronized (newRe) {
+ if (value.getVersionTag()!=null && newRe.getVersionStamp()!=null) {
+ newRe.getVersionStamp().setVersions(value.getVersionTag());
+ }
+ RegionEntry oldRe = putEntryIfAbsent(key, newRe);
+ while (oldRe != null) {
+ synchronized (oldRe) {
+ if (oldRe.isRemoved() && !oldRe.isTombstone()) {
+ oldRe = putEntryIfAbsent(key, newRe);
+ if (oldRe != null) {
+ if (_isOwnerALocalRegion()) {
+ _getOwner().getCachePerfStats().incRetries();
+ }
+ }
+ }
+ /*
+ * Entry already exists which should be impossible.
+ * Free the current entry (if off-heap) and
+ * throw an exception.
+ */
+ else {
+ if (newRe instanceof OffHeapRegionEntry) {
+ ((OffHeapRegionEntry) newRe).release();
+ }
+
+ throw new IllegalStateException("Could not recover entry for key " + key + ". The entry already exists!");
+ }
+ } // synchronized
+ }
+ if (_isOwnerALocalRegion()) {
+ _getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe));
+ if (newRe.isTombstone()) {
+ // refresh the tombstone so it doesn't time out too soon
+ _getOwner().scheduleTombstone(newRe, newRe.getVersionStamp().asVersionTag());
+ }
+
+ incEntryCount(1); // we are creating an entry that was recovered from disk including tombstone
+ }
+ lruEntryUpdate(newRe);
+ needsCallback = true;
+ }
+ if (needsCallback) {
+ lruUpdateCallback();
+ }
+
+ EntryLogger.logRecovery(_getOwnerObject(), key, value);
+
+ return newRe;
+ }
+
+ public final RegionEntry updateRecoveredEntry(Object key, DiskEntry.RecoveredEntry value) {
+ boolean needsCallback = false;
+ RegionEntry re = getEntry(key);
+ if (re == null) {
+ return null;
+ }
+ synchronized (re) {
+ if (re.isRemoved() && !re.isTombstone()) {
+ return null;
+ }
+ if (value.getVersionTag()!=null && re.getVersionStamp()!=null) {
+ re.getVersionStamp().setVersions(value.getVersionTag());
+ }
+ try {
+ if (_isOwnerALocalRegion()) {
+ if (re.isTombstone()) {
+ // when a tombstone is to be overwritten, unschedule it first
+ _getOwner().unscheduleTombstone(re);
+ }
+ final int oldSize = _getOwner().calculateRegionEntryValueSize(re);
+ re.setValue(_getOwner(), value); // OFFHEAP no need to call AbstractRegionMap.prepareValueForCache because setValue is overridden for disk and that code takes apart value (RecoveredEntry) and prepares its nested value for the cache
+ if (re.isTombstone()) {
+ _getOwner().scheduleTombstone(re, re.getVersionStamp().asVersionTag());
+ }
+ _getOwner().updateSizeOnPut(key, oldSize, _getOwner().calculateRegionEntryValueSize(re));
+ } else {
+ DiskEntry.Helper.updateRecoveredEntry((PlaceHolderDiskRegion)_getOwnerObject(),
+ (DiskEntry)re, value, (RegionEntryContext) _getOwnerObject());
+ }
+ } catch (RegionClearedException rce) {
+ throw new IllegalStateException("RegionClearedException should never happen in this context", rce);
+ }
+ lruEntryUpdate(re);
+ needsCallback = true;
+ }
+ if (needsCallback) {
+ lruUpdateCallback();
+ }
+
+ EntryLogger.logRecovery(_getOwnerObject(), key, value);
+
+ return re;
+ }
+
+ public final boolean initialImagePut(final Object key,
+ final long lastModified,
+ Object newValue,
+ final boolean wasRecovered,
+ boolean deferLRUCallback,
+ VersionTag entryVersion, InternalDistributedMember sender, boolean isSynchronizing)
+ {
+ boolean result = false;
+ boolean done = false;
+ boolean cleared = false;
+ final LocalRegion owner = _getOwner();
+
+ if (newValue == Token.TOMBSTONE && !owner.getConcurrencyChecksEnabled()) {
+ return false;
+ }
+
+ if (owner instanceof HARegion && newValue instanceof CachedDeserializable) {
+ Object actualVal = ((CachedDeserializable)newValue)
+ .getDeserializedValue(null, null);
+ if (actualVal instanceof HAEventWrapper) {
+ HAEventWrapper haEventWrapper = (HAEventWrapper)actualVal;
+ // Key was removed at sender side so not putting it into the HARegion
+ if (haEventWrapper.getClientUpdateMessage() == null) {
+ return false;
+ }
+ // Getting the instance from singleton CCN..This assumes only one bridge
+ // server in the VM
+ HAContainerWrapper haContainer = (HAContainerWrapper)CacheClientNotifier
+ .getInstance().getHaContainer();
+ Map.Entry entry = null;
+ HAEventWrapper original = null;
+ synchronized (haContainer) {
+ entry = (Map.Entry)haContainer.getEntry(haEventWrapper);
+ if (entry != null) {
+ original = (HAEventWrapper)entry.getKey();
+ original.incAndGetReferenceCount();
+ }
+ else {
+ haEventWrapper.incAndGetReferenceCount();
+ haEventWrapper.setHAContainer(haContainer);
+ haContainer.put(haEventWrapper, haEventWrapper
+ .getClientUpdateMessage());
+ haEventWrapper.setClientUpdateMessage(null);
+ haEventWrapper.setIsRefFromHAContainer(true);
+ }
+ }
+ if (entry != null) {
+ HARegionQueue.addClientCQsAndInterestList(entry, haEventWrapper,
+ haContainer, owner.getName());
+ haEventWrapper.setClientUpdateMessage(null);
+ newValue = CachedDeserializableFactory.create(original,
+ ((CachedDeserializable)newValue).getSizeInBytes());
+ }
+ }
+ }
+
+ try {
+ RegionEntry newRe = getEntryFactory().createEntry(owner, key,
+ Token.REMOVED_PHASE1);
+ EntryEventImpl event = null;
+
+ @Retained @Released Object oldValue = null;
+
+ try {
+ RegionEntry oldRe = null;
+ synchronized (newRe) {
+ try {
+ oldRe = putEntryIfAbsent(key, newRe);
+ while (!done && oldRe != null) {
+ synchronized (oldRe) {
+ if (oldRe.isRemovedPhase2()) {
+ oldRe = putEntryIfAbsent(key, newRe);
+ if (oldRe != null) {
+ owner.getCachePerfStats().incRetries();
+ }
+ }
+ else {
+ boolean acceptedVersionTag = false;
+ if (entryVersion != null && owner.concurrencyChecksEnabled) {
+ Assert.assertTrue(entryVersion.getMemberID() != null, "GII entry versions must have identifiers");
+ try {
+ boolean isTombstone = (newValue == Token.TOMBSTONE);
+ // don't reschedule the tombstone if it hasn't changed
+ boolean isSameTombstone = oldRe.isTombstone() && isTombstone
+ && oldRe.getVersionStamp().asVersionTag()
+ .equals(entryVersion);
+ if (isSameTombstone) {
+ return true;
+ }
+ processVersionTagForGII(oldRe, owner, entryVersion, isTombstone, sender, !wasRecovered || isSynchronizing);
+ acceptedVersionTag = true;
+ } catch (ConcurrentCacheModificationException e) {
+ return false;
+ }
+ }
+ final boolean oldIsTombstone = oldRe.isTombstone();
+ final int oldSize = owner.calculateRegionEntryValueSize(oldRe);
+ // Neeraj: The below if block is to handle the special
+ // scenario witnessed in SqlFabric for now. (Though its
+ // a general scenario). The scenario is that during GII
+ // it is possible that updates start coming before the
+ // base value reaches through GII. In that scenario the deltas
+ // for that particular key is kept on being added to a list
+ // of deltas. When the base value arrives through this path
+ // of GII the oldValue will be that list of deltas. When the
+ // base values arrives the deltas are applied one by one on that list.
+ // The same scenario is applicable for GemFire also but the below
+ // code will be executed only in case of sqlfabric now. Probably
+ // the code can be made more generic for both SQL Fabric and GemFire.
+ if (indexUpdater != null) {
+ oldValue = oldRe.getValueInVM(owner); // OFFHEAP: ListOfDeltas
+ if (oldValue instanceof ListOfDeltas) {
+ // apply the deltas on this new value. update index
+ // Make a new event object
+ // make it an insert operation
+ LocalRegion rgn = owner;
+ if (owner instanceof BucketRegion) {
+ rgn = ((BucketRegion)owner).getPartitionedRegion();
+ }
+ event = EntryEventImpl.create(rgn, Operation.CREATE, key, null,
+ Boolean.TRUE /* indicate that GII is in progress */,
+ false, null);
+ try {
+ event.setOldValue(newValue);
+ if (logger.isDebugEnabled()) {
+ logger.debug("initialImagePut: received base value for list of deltas; event: {}", event);
+ }
+ ((ListOfDeltas)oldValue).apply(event);
+ Object preparedNewValue =oldRe.prepareValueForCache(owner,
+ event.getNewValueAsOffHeapDeserializedOrRaw(), true);
- if(preparedNewValue instanceof Chunk) {
++ if(preparedNewValue instanceof ObjectChunk) {
+ event.setNewValue(preparedNewValue);
+ }
+ oldRe.setValue(owner, preparedNewValue, event);
+ //event.setNewValue(event.getOldValue());
+ event.setOldValue(null);
+ try {
+ indexUpdater.onEvent(owner, event, oldRe);
+ lruEntryUpdate(oldRe);
+ owner.updateSizeOnPut(key, oldSize, owner.calculateRegionEntryValueSize(oldRe));
+ EntryLogger.logInitialImagePut(_getOwnerObject(), key, newValue);
+ result = true;
+ done = true;
+ break;
+ } finally {
+ // this must be done within the oldRe sync block
+ indexUpdater.postEvent(owner, event, oldRe, done);
+ }
+ } finally {
+ if (event != null) {
+ event.release();
+ event = null;
+ }
+ }
+ }
+ }
+ try {
+ if (indexUpdater != null) {
+ event = EntryEventImpl.create(owner, Operation.CREATE, key,
+ newValue,
+ Boolean.TRUE /* indicate that GII is in progress */,
+ false, null);
+ indexUpdater.onEvent(owner, event, oldRe);
+ }
+ result = oldRe.initialImagePut(owner, lastModified, newValue, wasRecovered, acceptedVersionTag);
+ if (result) {
+ if (oldIsTombstone) {
+ owner.unscheduleTombstone(oldRe);
+ if (newValue != Token.TOMBSTONE){
+ lruEntryCreate(oldRe);
+ } else {
+ lruEntryUpdate(oldRe);
+ }
+ }
+ if (newValue == Token.TOMBSTONE) {
+ if (owner.getServerProxy() == null &&
+ owner.getVersionVector().isTombstoneTooOld(entryVersion.getMemberID(), entryVersion.getRegionVersion())) {
+ // the received tombstone has already been reaped, so don't retain it
+ removeTombstone(oldRe, entryVersion, false, false);
+ return false;
+ } else {
+ owner.scheduleTombstone(oldRe, entryVersion);
+ lruEntryDestroy(oldRe);
+ }
+ } else {
+ int newSize = owner.calculateRegionEntryValueSize(oldRe);
+ if(!oldIsTombstone) {
+ owner.updateSizeOnPut(key, oldSize, newSize);
+ } else {
+ owner.updateSizeOnCreate(key, newSize);
+ }
+ EntryLogger.logInitialImagePut(_getOwnerObject(), key, newValue);
+ }
+ }
+ if (owner.getIndexManager() != null) {
+ owner.getIndexManager().updateIndexes(oldRe, oldRe.isRemoved() ? IndexManager.ADD_ENTRY : IndexManager.UPDATE_ENTRY,
+ oldRe.isRemoved() ? IndexProtocol.OTHER_OP : IndexProtocol.AFTER_UPDATE_OP);
+ }
+ done = true;
+ } finally {
+ if (indexUpdater != null) {
+ indexUpdater.postEvent(owner, event, oldRe, result);
+ }
+ if (event != null) {
+ event.release();
+ event = null;
+ }
+ }
+ }
+ }
+ }
+ if (!done) {
+ boolean versionTagAccepted = false;
+ if (entryVersion != null && owner.concurrencyChecksEnabled) {
+ Assert.assertTrue(entryVersion.getMemberID() != null, "GII entry versions must have identifiers");
+ try {
+ boolean isTombstone = (newValue == Token.TOMBSTONE);
+ processVersionTagForGII(newRe, owner, entryVersion, isTombstone, sender, !wasRecovered || isSynchronizing);
+ versionTagAccepted = true;
+ } catch (ConcurrentCacheModificationException e) {
+ return false;
+ }
+ }
+ result = newRe.initialImageInit(owner, lastModified, newValue,
+ true, wasRecovered, versionTagAccepted);
+ try {
+ if (result) {
+ if (indexUpdater != null) {
+ event = EntryEventImpl.create(owner, Operation.CREATE, key,
+ newValue,
+ Boolean.TRUE /* indicate that GII is in progress */,
+ false, null);
+ indexUpdater.onEvent(owner, event, newRe);
+ }
+ if (newValue == Token.TOMBSTONE) {
+ owner.scheduleTombstone(newRe, entryVersion);
+ } else {
+ owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(newRe));
+ EntryLogger.logInitialImagePut(_getOwnerObject(), key, newValue);
+ lruEntryCreate(newRe);
+ }
+ incEntryCount(1);
+ }
+ //Update local indexes
+ if (owner.getIndexManager() != null) {
+ owner.getIndexManager().updateIndexes(newRe, newRe.isRemoved() ? IndexManager.REMOVE_ENTRY : IndexManager.UPDATE_ENTRY,
+ newRe.isRemoved() ? IndexProtocol.OTHER_OP : IndexProtocol.AFTER_UPDATE_OP);
+ }
+ done = true;
+ } finally {
+ if (result && indexUpdater != null) {
+ indexUpdater.postEvent(owner, event, newRe, done);
+ }
+ if (event != null) {
+ event.release();
+ event = null;
+ }
+ }
+ }
+ }
+ finally {
+ if (done && result) {
+ initialImagePutEntry(newRe);
+ }
+ if (!done) {
+ removeEntry(key, newRe, false);
+ if (owner.getIndexManager() != null) {
+ owner.getIndexManager().updateIndexes(newRe, IndexManager.REMOVE_ENTRY, IndexProtocol.OTHER_OP);
+ }
+ }
+ }
+ } // synchronized
+ } finally {
+ if (event != null) event.release();
+ OffHeapHelper.release(oldValue);
+ }
+ } catch(RegionClearedException rce) {
+ //Asif: do not issue any sort of callbacks
+ done = false;
+ cleared= true;
+ }catch(QueryException qe) {
+ done = false;
+ cleared= true;
+ }
+ finally {
+ if (done && !deferLRUCallback) {
+ lruUpdateCallback();
+ }
+ else if (!cleared) {
+ resetThreadLocals();
+ }
+ }
+ return result;
+ }
+
+ protected void initialImagePutEntry(RegionEntry newRe) {
+ }
+
+ boolean confirmEvictionDestroy(RegionEntry re)
+ {
+ /* We arn't in an LRU context, and should never get here */
+ Assert.assertTrue(false,
+ "Not an LRU region, can not confirm LRU eviction operation");
+ return true;
+ }
+
+ public final boolean destroy(EntryEventImpl event,
+ boolean inTokenMode,
+ boolean duringRI,
+ boolean cacheWrite,
+ boolean isEviction,
+ Object expectedOldValue,
+ boolean removeRecoveredEntry)
+ throws CacheWriterException, EntryNotFoundException, TimeoutException {
+
+ final LocalRegion owner = _getOwner();
+
+ if (owner == null) {
+ Assert.assertTrue(false, "The owner for RegionMap " + this // "fix" for bug 32440
+ + " is null for event " + event);
+ }
+
+ boolean retry = true;
+
+ while (retry) {
+ retry = false;
+
+ boolean opCompleted = false;
+ boolean doPart3 = false;
+
+ // We need to acquire the region entry while holding the lock to avoid #45620.
+ // However, we also want to release the lock before distribution to prevent
+ // potential deadlocks. The outer try/finally ensures that the lock will be
+ // released without fail. I'm avoiding indenting just to preserve the ability
+ // to track diffs since the code is fairly complex.
+ boolean doUnlock = true;
+ lockForCacheModification(owner, event);
+ try {
+
+
+ RegionEntry re = getOrCreateRegionEntry(owner, event, Token.REMOVED_PHASE1, null, true, true);
+ RegionEntry tombstone = null;
+ boolean haveTombstone = false;
+ /*
+ * Execute the test hook runnable inline (not threaded) if it is not null.
+ */
+ if(null != testHookRunnableFor48182) {
+ testHookRunnableFor48182.run();
+ }
+
+ try {
+ if (logger.isTraceEnabled(LogMarker.LRU_TOMBSTONE_COUNT) && !(owner instanceof HARegion)) {
+ logger.trace(LogMarker.LRU_TOMBSTONE_COUNT,
+ "ARM.destroy() inTokenMode={}; duringRI={}; riLocalDestroy={}; withRepl={}; fromServer={}; concurrencyEnabled={}; isOriginRemote={}; isEviction={}; operation={}; re={}",
+ inTokenMode, duringRI, event.isFromRILocalDestroy(), owner.dataPolicy.withReplication(), event.isFromServer(),
+ owner.concurrencyChecksEnabled, event.isOriginRemote(), isEviction, event.getOperation(), re);
+ }
+ if (event.isFromRILocalDestroy()) {
+ // for RI local-destroy we don't want to keep tombstones.
+ // In order to simplify things we just set this recovery
+ // flag to true to force the entry to be removed
+ removeRecoveredEntry = true;
+ }
+ // the logic in this method is already very involved, and adding tombstone
+ // permutations to (re != null) greatly complicates it. So, we check
+ // for a tombstone here and, if found, pretend for a bit that the entry is null
+ if (re != null && re.isTombstone() && !removeRecoveredEntry) {
+ tombstone = re;
+ haveTombstone = true;
+ re = null;
+ }
+ IndexManager oqlIndexManager = owner.getIndexManager() ;
+ if (re == null) {
+ // we need to create an entry if in token mode or if we've received
+ // a destroy from a peer or WAN gateway and we need to retain version
+ // information for concurrency checks
+ boolean retainForConcurrency = (!haveTombstone
+ && (owner.dataPolicy.withReplication() || event.isFromServer())
+ && owner.concurrencyChecksEnabled
+ && (event.isOriginRemote() /* destroy received from other must create tombstone */
+ || event.isFromWANAndVersioned() /* wan event must create a tombstone */
+ || event.isBridgeEvent())); /* event from client must create a tombstone so client has a version # */
+ if (inTokenMode
+ || retainForConcurrency) {
+ // removeRecoveredEntry should be false in this case
+ RegionEntry newRe = getEntryFactory().createEntry(owner,
+ event.getKey(),
+ Token.REMOVED_PHASE1);
+ // Fix for Bug #44431. We do NOT want to update the region and wait
+ // later for index INIT as region.clear() can cause inconsistency if
+ // happened in parallel as it also does index INIT.
+ if (oqlIndexManager != null) {
+ oqlIndexManager.waitForIndexInit();
+ }
+ try {
+ synchronized (newRe) {
+ RegionEntry oldRe = putEntryIfAbsent(event.getKey(), newRe);
+ while (!opCompleted && oldRe != null) {
+ synchronized (oldRe) {
+ if (oldRe.isRemovedPhase2()) {
+ oldRe = putEntryIfAbsent(event.getKey(), newRe);
+ if (oldRe != null) {
+ owner.getCachePerfStats().incRetries();
+ }
+ } else {
+ event.setRegionEntry(oldRe);
+
+ // Last transaction related eviction check. This should
+ // prevent
+ // transaction conflict (caused by eviction) when the entry
+ // is being added to transaction state.
+ if (isEviction) {
+ if (!confirmEvictionDestroy(oldRe) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) {
+ opCompleted = false;
+ return opCompleted;
+ }
+ }
+ try {
+ //if concurrency checks are enabled, destroy will
+ //set the version tag
+ boolean destroyed = destroyEntry(oldRe, event, inTokenMode, cacheWrite, expectedOldValue, false, removeRecoveredEntry);
+ if (destroyed) {
+ if (retainForConcurrency) {
+ owner.basicDestroyBeforeRemoval(oldRe, event);
+ }
+ owner.basicDestroyPart2(oldRe, event, inTokenMode,
+ false /* conflict with clear */, duringRI, true);
+ lruEntryDestroy(oldRe);
+ doPart3 = true;
+ }
+ }
+ catch (RegionClearedException rce) {
+ // Ignore. The exception will ensure that we do not update
+ // the LRU List
+ owner.basicDestroyPart2(oldRe, event, inTokenMode,
+ true/* conflict with clear */, duringRI, true);
+ doPart3 = true;
+ } catch (ConcurrentCacheModificationException ccme) {
+ VersionTag tag = event.getVersionTag();
+ if (tag != null && tag.isTimeStampUpdated()) {
+ // Notify gateways of new time-stamp.
+ owner.notifyTimestampsToGateways(event);
+ }
+ throw ccme;
+ }
+ re = oldRe;
+ opCompleted = true;
+ }
+ } // synchronized oldRe
+ } // while
+ if (!opCompleted) {
+ // The following try has a finally that cleans up the newRe.
+ // This is only needed if newRe was added to the map which only
+ // happens if we didn't get completed with oldRe in the above while loop.
+ try {
+ re = newRe;
+ event.setRegionEntry(newRe);
+
+ try {
+ //if concurrency checks are enabled, destroy will
+ //set the version tag
+ if (isEviction) {
+ opCompleted = false;
+ return opCompleted;
+ }
+ opCompleted = destroyEntry(newRe, event, inTokenMode, cacheWrite, expectedOldValue, true, removeRecoveredEntry);
+ if (opCompleted) {
+ // This is a new entry that was created because we are in
+ // token mode or are accepting a destroy operation by adding
+ // a tombstone. There is no oldValue, so we don't need to
+ // call updateSizeOnRemove
+ // owner.recordEvent(event);
+ event.setIsRedestroyedEntry(true); // native clients need to know if the entry didn't exist
+ if (retainForConcurrency) {
+ owner.basicDestroyBeforeRemoval(oldRe, event);
+ }
+ owner.basicDestroyPart2(newRe, event, inTokenMode,
+ false /* conflict with clear */, duringRI, true);
+ doPart3 = true;
+ }
+ }
+ catch (RegionClearedException rce) {
+ // Ignore. The exception will ensure that we do not update
+ // the LRU List
+ opCompleted = true;
+ EntryLogger.logDestroy(event);
+ owner.basicDestroyPart2(newRe, event, inTokenMode, true /* conflict with clear*/, duringRI, true);
+ doPart3 = true;
+ } catch (ConcurrentCacheModificationException ccme) {
+ VersionTag tag = event.getVersionTag();
+ if (tag != null && tag.isTimeStampUpdated()) {
+ // Notify gateways of new time-stamp.
+ owner.notifyTimestampsToGateways(event);
+ }
+ throw ccme;
+ }
+ // Note no need for LRU work since the entry is destroyed
+ // and will be removed when gii completes
+ } finally {
+ if (!opCompleted && !haveTombstone /* to fix bug 51583 do this for all operations */ ) {
+ removeEntry(event.getKey(), newRe, false);
+ }
+ if (!opCompleted && isEviction) {
+ removeEntry(event.getKey(), newRe, false);
+ }
+ }
+ } // !opCompleted
+ } // synchronized newRe
+ } finally {
+ if (oqlIndexManager != null) {
+ oqlIndexManager.countDownIndexUpdaters();
+ }
+ }
+ } // inTokenMode or tombstone creation
+ else {
+ if (!isEviction || owner.concurrencyChecksEnabled) {
+ // The following ensures that there is not a concurrent operation
+ // on the entry and leaves behind a tombstone if concurrencyChecksEnabled.
+ // It fixes bug #32467 by propagating the destroy to the server even though
+ // the entry isn't in the client
+ RegionEntry newRe = haveTombstone? tombstone : getEntryFactory().createEntry(owner, event.getKey(),
+ Token.REMOVED_PHASE1);
+ synchronized(newRe) {
+ if (haveTombstone && !tombstone.isTombstone()) {
+ // we have to check this again under synchronization since it may have changed
+ retry = true;
+ //retryEntry = tombstone; // leave this in place for debugging
+ continue;
+ }
+ re = (RegionEntry)_getMap().putIfAbsent(event.getKey(), newRe);
+ if (re != null && re != tombstone) {
+ // concurrent change - try again
+ retry = true;
+ //retryEntry = tombstone; // leave this in place for debugging
+ continue;
+ }
+ else if (!isEviction) {
+ boolean throwex = false;
+ EntryNotFoundException ex = null;
+ try {
+ if (!cacheWrite) {
+ throwex = true;
+ } else {
+ try {
+ if (!removeRecoveredEntry) {
+ throwex = !owner.bridgeWriteBeforeDestroy(event, expectedOldValue);
+ }
+ } catch (EntryNotFoundException e) {
+ throwex = true;
+ ex = e;
+ }
+ }
+ if (throwex) {
+ if (!event.isOriginRemote() && !event.getOperation().isLocal() &&
+ (event.isFromBridgeAndVersioned() || // if this is a replayed client event that already has a version
+ event.isFromWANAndVersioned())) { // or if this is a WAN event that has been applied in another system
+ // we must distribute these since they will update the version information in peers
+ if (logger.isDebugEnabled()) {
+ logger.debug("ARM.destroy is allowing wan/client destroy of {} to continue", event.getKey());
+ }
+ throwex = false;
+ event.setIsRedestroyedEntry(true);
+ // Distribution of this op happens on re and re might me null here before
+ // distributing this destroy op.
+ if (re == null) {
+ re = newRe;
+ }
+ doPart3 = true;
+ }
+ }
+ if (throwex) {
+ if (ex == null) {
+ // Fix for 48182, check cache state and/or region state before sending entry not found.
+ // this is from the server and any exceptions will propogate to the client
+ owner.checkEntryNotFound(event.getKey());
+ } else {
+ throw ex;
+ }
+ }
+ } finally {
+ // either remove the entry or leave a tombstone
+ try {
+ if (!event.isOriginRemote() && event.getVersionTag() != null && owner.concurrencyChecksEnabled) {
+ // this shouldn't fail since we just created the entry.
+ // it will either generate a tag or apply a server's version tag
+ processVersionTag(newRe, event);
+ if (doPart3) {
+ owner.generateAndSetVersionTag(event, newRe);
+ }
+ try {
+ owner.recordEvent(event);
+ newRe.makeTombstone(owner, event.getVersionTag());
+ } catch (RegionClearedException e) {
+ // that's okay - when writing a tombstone into a disk, the
+ // region has been cleared (including this tombstone)
+ }
+ opCompleted = true;
+ // lruEntryCreate(newRe);
+ } else if (!haveTombstone) {
+ try {
+ assert newRe != tombstone;
+ newRe.setValue(owner, Token.REMOVED_PHASE2);
+ removeEntry(event.getKey(), newRe, false);
+ } catch (RegionClearedException e) {
+ // that's okay - we just need to remove the new entry
+ }
+ } else if (event.getVersionTag() != null ) { // haveTombstone - update the tombstone version info
+ processVersionTag(tombstone, event);
+ if (doPart3) {
+ owner.generateAndSetVersionTag(event, newRe);
+ }
+ // This is not conflict, we need to persist the tombstone again with new version tag
+ try {
+ tombstone.setValue(owner, Token.TOMBSTONE);
+ } catch (RegionClearedException e) {
+ // that's okay - when writing a tombstone into a disk, the
+ // region has been cleared (including this tombstone)
+ }
+ owner.recordEvent(event);
+ owner.rescheduleTombstone(tombstone, event.getVersionTag());
+ owner.basicDestroyPart2(tombstone, event, inTokenMode, true /* conflict with clear*/, duringRI, true);
+ opCompleted = true;
+ }
+ } catch (ConcurrentCacheModificationException ccme) {
+ VersionTag tag = event.getVersionTag();
+ if (tag != null && tag.isTimeStampUpdated()) {
+ // Notify gateways of new time-stamp.
+ owner.notifyTimestampsToGateways(event);
+ }
+ throw ccme;
+ }
+ }
+ }
+ } // synchronized(newRe)
+ }
+ }
+ } // no current entry
+ else { // current entry exists
+ if (oqlIndexManager != null) {
+ oqlIndexManager.waitForIndexInit();
+ }
+ try {
+ synchronized (re) {
+ // if the entry is a tombstone and the event is from a peer or a client
+ // then we allow the operation to be performed so that we can update the
+ // version stamp. Otherwise we would retain an old version stamp and may allow
+ // an operation that is older than the destroy() to be applied to the cache
+ // Bug 45170: If removeRecoveredEntry, we treat tombstone as regular entry to be deleted
+ boolean createTombstoneForConflictChecks = (owner.concurrencyChecksEnabled
+ && (event.isOriginRemote() || event.getContext() != null || removeRecoveredEntry));
+ if (!re.isRemoved() || createTombstoneForConflictChecks) {
+ if (re.isRemovedPhase2()) {
+ retry = true;
+ continue;
+ }
+ if (!event.isOriginRemote() && event.getOperation().isExpiration()) {
+ // If this expiration started locally then only do it if the RE is not being used by a tx.
+ if (re.isInUseByTransaction()) {
+ opCompleted = false;
+ return opCompleted;
+ }
+ }
+ event.setRegionEntry(re);
+
+ // See comment above about eviction checks
+ if (isEviction) {
+ assert expectedOldValue == null;
+ if (!confirmEvictionDestroy(re) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) {
+ opCompleted = false;
+ return opCompleted;
+ }
+ }
+
+ boolean removed = false;
+ try {
+ opCompleted = destroyEntry(re, event, inTokenMode, cacheWrite, expectedOldValue, false, removeRecoveredEntry);
+ if (opCompleted) {
+ // It is very, very important for Partitioned Regions to keep
+ // the entry in the map until after distribution occurs so that other
+ // threads performing a create on this entry wait until the destroy
+ // distribution is finished.
+ // keeping backup copies consistent. Fix for bug 35906.
+ // -- mthomas 07/02/2007 <-- how about that date, kinda cool eh?
+ owner.basicDestroyBeforeRemoval(re, event);
+
+ // do this before basicDestroyPart2 to fix bug 31786
+ if (!inTokenMode) {
+ if ( re.getVersionStamp() == null) {
+ re.removePhase2();
+ removeEntry(event.getKey(), re, true, event, owner,
+ indexUpdater);
+ removed = true;
+ }
+ }
+ if (inTokenMode && !duringRI) {
+ event.inhibitCacheListenerNotification(true);
+ }
+ doPart3 = true;
+ owner.basicDestroyPart2(re, event, inTokenMode, false /* conflict with clear*/, duringRI, true);
+ // if (!re.isTombstone() || isEviction) {
+ lruEntryDestroy(re);
+ // } else {
+ // lruEntryUpdate(re);
+ // lruUpdateCallback = true;
+ // }
+ } else {
+ if (!inTokenMode) {
+ EntryLogger.logDestroy(event);
+ owner.recordEvent(event);
+ if (re.getVersionStamp() == null) {
+ re.removePhase2();
+ removeEntry(event.getKey(), re, true, event, owner,
+ indexUpdater);
+ lruEntryDestroy(re);
+ } else {
+ if (re.isTombstone()) {
+ // the entry is already a tombstone, but we're destroying it
+ // again, so we need to reschedule the tombstone's expiration
+ if (event.isOriginRemote()) {
+ owner.rescheduleTombstone(re, re.getVersionStamp().asVersionTag());
+ }
+ }
+ }
+ lruEntryDestroy(re);
+ opCompleted = true;
+ }
+ }
+ }
+ catch (RegionClearedException rce) {
+ // Ignore. The exception will ensure that we do not update
+ // the LRU List
+ opCompleted = true;
+ owner.recordEvent(event);
+ if (inTokenMode && !duringRI) {
+ event.inhibitCacheListenerNotification(true);
+ }
+ owner.basicDestroyPart2(re, event, inTokenMode, true /*conflict with clear*/, duringRI, true);
+ doPart3 = true;
+ }
+ finally {
+ if (re.isRemoved() && !re.isTombstone()) {
+ if (!removed) {
+ removeEntry(event.getKey(), re, true, event, owner,
+ indexUpdater);
+ }
+ }
+ }
+ } // !isRemoved
+ else { // already removed
+ if (owner.isHDFSReadWriteRegion() && re.isRemovedPhase2()) {
+ // For HDFS region there may be a race with eviction
+ // so retry the operation. fixes bug 49150
+ retry = true;
+ continue;
+ }
+ if (re.isTombstone() && event.getVersionTag() != null) {
+ // if we're dealing with a tombstone and this is a remote event
+ // (e.g., from cache client update thread) we need to update
+ // the tombstone's version information
+ // TODO use destroyEntry() here
+ processVersionTag(re, event);
+ try {
+ re.makeTombstone(owner, event.getVersionTag());
+ } catch (RegionClearedException e) {
+ // that's okay - when writing a tombstone into a disk, the
+ // region has been cleared (including this tombstone)
+ }
+ }
+ if (expectedOldValue != null) {
+ // if re is removed then there is no old value, so return false
+ return false;
+ }
+
+ if (!inTokenMode && !isEviction) {
+ owner.checkEntryNotFound(event.getKey());
+ }
+ }
+ } // synchronized re
+ } catch (ConcurrentCacheModificationException ccme) {
+ VersionTag tag = event.getVersionTag();
+ if (tag != null && tag.isTimeStampUpdated()) {
+ // Notify gateways of new time-stamp.
+ owner.notifyTimestampsToGateways(event);
+ }
+ throw ccme;
+ } finally {
+ if (oqlIndexManager != null) {
+ oqlIndexManager.countDownIndexUpdaters();
+ }
+ }
+ // No need to call lruUpdateCallback since the only lru action
+ // we may have taken was lruEntryDestroy. This fixes bug 31759.
+
+ } // current entry exists
+ if(opCompleted) {
+ EntryLogger.logDestroy(event);
+ }
+ return opCompleted;
+ }
+ finally {
+ releaseCacheModificationLock(owner, event);
+ doUnlock = false;
+
+ try {
+ // If concurrency conflict is there and event contains gateway version tag then
+ // do NOT distribute.
+ if (event.isConcurrencyConflict() &&
+ (event.getVersionTag() != null && event.getVersionTag().isGatewayTag())) {
+ doPart3 = false;
+ }
+ // distribution and listener notification
+ if (doPart3) {
+ owner.basicDestroyPart3(re, event, inTokenMode, duringRI, true, expectedOldValue);
+ }
+ } finally {
+ if (opCompleted) {
+ if (re != null) {
+ owner.cancelExpiryTask(re);
+ } else if (tombstone != null) {
+ owner.cancelExpiryTask(tombstone);
+ }
+ }
+ }
+ }
+
+ } finally { // failsafe on the read lock...see comment above
+ if (doUnlock) {
+ releaseCacheModificationLock(owner, event);
+ }
+ }
+ } // retry loop
+ return false;
+ }
+
+ public final void txApplyDestroy(Object key, TransactionId txId,
+ TXRmtEvent txEvent, boolean inTokenMode, boolean inRI, Operation op,
+ EventID eventId, Object aCallbackArgument,List<EntryEventImpl> pendingCallbacks,FilterRoutingInfo filterRoutingInfo,ClientProxyMembershipID bridgeContext,
+ boolean isOriginRemote, TXEntryState txEntryState, VersionTag versionTag, long tailKey)
+ {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+
+ final LocalRegion owner = _getOwner();
+ owner.checkBeforeEntrySync(txEvent);
+
+ final boolean isRegionReady = !inTokenMode;
+ final boolean hasRemoteOrigin = !((TXId)txId).getMemberId().equals(owner.getMyId());
+ boolean cbEventInPending = false;
+ lockForTXCacheModification(owner, versionTag);
+ IndexManager oqlIndexManager = owner.getIndexManager() ;
+ try {
+ RegionEntry re = getEntry(key);
+ if (re != null) {
+ // Fix for Bug #44431. We do NOT want to update the region and wait
+ // later for index INIT as region.clear() can cause inconsistency if
+ // happened in parallel as it also does index INIT.
+ if (oqlIndexManager != null) {
+ oqlIndexManager.waitForIndexInit();
+ }
+ try {
+ synchronized (re) {
+ if (!re.isRemoved() || re.isTombstone()) {
+ EntryEventImpl sqlfEvent = null;
+ @Retained @Released Object oldValue = re.getValueInVM(owner);
+ try {
+ final int oldSize = owner.calculateRegionEntryValueSize(re);
+ // Create an entry event only if the calling context is
+ // a receipt of a TXCommitMessage AND there are callbacks installed
+ // for this region
+ boolean invokeCallbacks = shouldCreateCBEvent(owner, false/*isInvalidate*/, isRegionReady || inRI);
+ EntryEventImpl cbEvent = createCBEvent(owner, op,
+ key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+ try {
+
+ if (/* owner.isUsedForPartitionedRegionBucket() && */
+ indexUpdater != null) {
+ sqlfEvent = cbEvent;
+ } else {
+ if (owner.isUsedForPartitionedRegionBucket()) {
+ txHandleWANEvent(owner, cbEvent, txEntryState);
+ }
+ cbEvent.setRegionEntry(re);
+ }
+ cbEvent.setOldValue(oldValue);
+ if (isDebugEnabled) {
+ logger.debug("txApplyDestroy cbEvent={}", cbEvent);
+ }
+
+ txRemoveOldIndexEntry(Operation.DESTROY, re);
+ if (txEvent != null) {
+ txEvent.addDestroy(owner, re, re.getKey(),aCallbackArgument);
+ }
+ boolean clearOccured = false;
+ try {
+ processAndGenerateTXVersionTag(owner, cbEvent, re, txEntryState);
+ if (inTokenMode) {
+ if (oldValue == Token.TOMBSTONE) {
+ owner.unscheduleTombstone(re);
+ }
+ re.setValue(owner, Token.DESTROYED);
+ }
+ else {
+ if (!re.isTombstone()) {
+ if (sqlfEvent != null) {
+ re.removePhase1(owner, false); // fix for bug 43063
+ re.removePhase2();
+ removeEntry(key, re, true, sqlfEvent, owner, indexUpdater);
+ } else {
+ if (shouldPerformConcurrencyChecks(owner, cbEvent) && cbEvent.getVersionTag() != null) {
+ re.makeTombstone(owner, cbEvent.getVersionTag());
+ } else {
+ re.removePhase1(owner, false); // fix for bug 43063
+ re.removePhase2();
+ removeEntry(key, re, false);
+ }
+ }
+ } else {
+ owner.rescheduleTombstone(re, re.getVersionStamp().asVersionTag());
+ }
+ }
+ EntryLogger.logTXDestroy(_getOwnerObject(), key);
+ owner.updateSizeOnRemove(key, oldSize);
+ }
+ catch (RegionClearedException rce) {
+ clearOccured = true;
+ }
+ owner.txApplyDestroyPart2(re, re.getKey(), inTokenMode,
+ clearOccured /* Clear Conflciting with the operation */);
+ if (invokeCallbacks) {
+ switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
+ if(pendingCallbacks==null) {
+ owner.invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY,
+ cbEvent, true/*callDispatchListenerEvent*/);
+ } else {
+ pendingCallbacks.add(cbEvent);
+ cbEventInPending = true;
+ }
+ }
+ if (!clearOccured) {
+ lruEntryDestroy(re);
+ }
+ if (owner.concurrencyChecksEnabled && txEntryState != null && cbEvent!= null) {
+ txEntryState.setVersionTag(cbEvent.getVersionTag());
+ }
+ } finally {
+ if (!cbEventInPending) cbEvent.release();
+ }
+ } finally {
+ OffHeapHelper.release(oldValue);
+ }
+ }
+ }
+ } finally {
+ if (oqlIndexManager != null) {
+ oqlIndexManager.countDownIndexUpdaters();
+ }
+ }
+ } else if (inTokenMode || owner.concurrencyChecksEnabled) {
+ // treating tokenMode and re == null as same, since we now want to
+ // generate versions and Tombstones for destroys
+ boolean dispatchListenerEvent = inTokenMode;
+ boolean opCompleted = false;
+ RegionEntry newRe = getEntryFactory().createEntry(owner, key,
+ Token.DESTROYED);
+ if ( oqlIndexManager != null) {
+ oqlIndexManager.waitForIndexInit();
+ }
+ EntryEventImpl cbEvent = null;
+ try {
+ synchronized (newRe) {
+ RegionEntry oldRe = putEntryIfAbsent(key, newRe);
+ while (!opCompleted && oldRe != null) {
+ synchronized (oldRe) {
+ if (oldRe.isRemovedPhase2()) {
+ oldRe = putEntryIfAbsent(key, newRe);
+ if (oldRe != null) {
+ owner.getCachePerfStats().incRetries();
+ }
+ }
+ else {
+ try {
+ boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady || inRI);
+ cbEvent = createCBEvent(owner, op,
+ key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+ try {
+ cbEvent.setRegionEntry(oldRe);
+ cbEvent.setOldValue(Token.NOT_AVAILABLE);
+ if (isDebugEnabled) {
+ logger.debug("txApplyDestroy token mode cbEvent={}", cbEvent);
+ }
+ if (owner.isUsedForPartitionedRegionBucket()) {
+ txHandleWANEvent(owner, cbEvent, txEntryState);
+ }
+ processAndGenerateTXVersionTag(owner, cbEvent, oldRe, txEntryState);
+ if (invokeCallbacks) {
+ switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
+ if(pendingCallbacks==null) {
+ owner.invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY,
+ cbEvent, dispatchListenerEvent);
+ } else {
+ pendingCallbacks.add(cbEvent);
+ cbEventInPending = true;
+ }
+ }
+ int oldSize = 0;
+ boolean wasTombstone = oldRe.isTombstone();
+ {
+ if (!wasTombstone) {
+ oldSize = owner.calculateRegionEntryValueSize(oldRe);
+ }
+ }
+ oldRe.setValue(owner, Token.DESTROYED);
+ EntryLogger.logTXDestroy(_getOwnerObject(), key);
+ if (wasTombstone) {
+ owner.unscheduleTombstone(oldRe);
+ }
+ owner.updateSizeOnRemove(oldRe.getKey(), oldSize);
+ owner.txApplyDestroyPart2(oldRe, oldRe.getKey(), inTokenMode,
+ false /* Clear Conflicting with the operation */);
+ lruEntryDestroy(oldRe);
+ } finally {
+ if (!cbEventInPending) cbEvent.release();
+ }
+ }
+ catch (RegionClearedException rce) {
+ owner.txApplyDestroyPart2(oldRe, oldRe.getKey(), inTokenMode,
+ true /* Clear Conflicting with the operation */);
+ }
+ if (shouldPerformConcurrencyChecks(owner, cbEvent) && cbEvent.getVersionTag() != null) {
+ oldRe.makeTombstone(owner, cbEvent.getVersionTag());
+ } else if (!inTokenMode) {
+ // only remove for NORMAL regions if they do not generate versions see 51781
+ oldRe.removePhase1(owner, false); // fix for bug 43063
+ oldRe.removePhase2();
+ removeEntry(key, oldRe, false);
+ }
+ opCompleted = true;
+ }
+ }
+ }
+ if (!opCompleted) {
+ // already has value set to Token.DESTROYED
+ opCompleted = true;
+ boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady || inRI);
+ cbEvent = createCBEvent(owner, op,
+ key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+ try {
+ cbEvent.setRegionEntry(newRe);
+ cbEvent.setOldValue(Token.NOT_AVAILABLE);
+ if (isDebugEnabled) {
+ logger.debug("txApplyDestroy token mode cbEvent={}", cbEvent);
+ }
+ if (owner.isUsedForPartitionedRegionBucket()) {
+ txHandleWANEvent(owner, cbEvent, txEntryState);
+ }
+ processAndGenerateTXVersionTag(owner, cbEvent, newRe, txEntryState);
+ if (invokeCallbacks) {
+ switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
+ if(pendingCallbacks==null) {
+ owner.invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY,
+ cbEvent, dispatchListenerEvent);
+ } else {
+ pendingCallbacks.add(cbEvent);
+ cbEventInPending = true;
+ }
+ }
+ EntryLogger.logTXDestroy(_getOwnerObject(), key);
+ owner.updateSizeOnCreate(newRe.getKey(), 0);
+ if (shouldPerformConcurrencyChecks(owner, cbEvent) && cbEvent.getVersionTag() != null) {
+ newRe.makeTombstone(owner, cbEvent.getVersionTag());
+ } else if (!inTokenMode) {
+ // only remove for NORMAL regions if they do not generate versions see 51781
+ newRe.removePhase1(owner, false); // fix for bug 43063
+ newRe.removePhase2();
+ removeEntry(key, newRe, false);
+ }
+ owner
+ .txApplyDestroyPart2(newRe, newRe.getKey(), inTokenMode,
+ false /*clearConflict*/);
+ // Note no need for LRU work since the entry is destroyed
+ // and will be removed when gii completes
+ } finally {
+ if (!cbEventInPending) cbEvent.release();
+ }
+ }
+ if (owner.concurrencyChecksEnabled && txEntryState != null && cbEvent != null) {
+ txEntryState.setVersionTag(cbEvent.getVersionTag());
+ }
+ }
+ } catch (RegionClearedException e) {
+ // TODO
+ } finally {
+ if (oqlIndexManager != null) {
+ oqlIndexManager.countDownIndexUpdaters();
+ }
+ }
+ } else if (re == null) {
+ // Fix bug#43594
+ // In cases where bucket region is re-created, it may so happen that
+ // the destroy is already applied on the Initial image provider, thus
+ // causing region entry to be absent.
+ // Notify clients with client events.
+ EntryEventImpl cbEvent = createCBEvent(owner, op,
+ key, null, txId, txEvent, eventId, aCallbackArgument,
+ filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+ try {
+ if (owner.isUsedForPartitionedRegionBucket()) {
+ txHandleWANEvent(owner, cbEvent, txEntryState);
+ }
+ switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
+ if (pendingCallbacks == null) {
+ owner.invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY,cbEvent,false);
+ } else {
+ pendingCallbacks.add(cbEvent);
+ cbEventInPending = true;
+ }
+ } finally {
+ if (!cbEventInPending) cbEvent.release();
+ }
+ }
+ } catch( DiskAccessException dae) {
+ owner.handleDiskAccessException(dae);
+ throw dae;
+ }
+ finally {
+ releaseTXCacheModificationLock(owner, versionTag);
+ }
+ }
+
+ /**
+ * If true then invalidates that throw EntryNotFoundException
+ * or that are already invalid will first call afterInvalidate on CacheListeners.
+ * The old value on the event passed to afterInvalidate will be null.
+ */
+ public static boolean FORCE_INVALIDATE_EVENT = Boolean.getBoolean("gemfire.FORCE_INVALIDATE_EVENT");
+
+ /**
+ * If the FORCE_INVALIDATE_EVENT flag is true
+ * then invoke callbacks on the given event.
+ */
+ void forceInvalidateEvent(EntryEventImpl event) {
+ if (FORCE_INVALIDATE_EVENT) {
+ event.invokeCallbacks(_getOwner(), false, false);
+ }
+ }
+
+ public final boolean invalidate(EntryEventImpl event,
+ boolean invokeCallbacks, boolean forceNewEntry, boolean forceCallbacks)
+ throws EntryNotFoundException
+ {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+
+ final LocalRegion owner = _getOwner();
+ if (owner == null) {
+ // "fix" for bug 32440
+ Ass
<TRUNCATED>