You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:36:39 UTC
[77/83] [abbrv] incubator-geode git commit: GEODE-917: Merge branch
'feature/GEODE-917' into develop
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
index 0000000,dd33b15..558ea37
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
@@@ -1,0 -1,2243 +1,2243 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package com.gemstone.gemfire.internal.cache;
+
+ import java.io.IOException;
+ import java.util.Arrays;
+
+ import org.apache.logging.log4j.Logger;
+
+ import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE;
+ import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE;
+
+ import com.gemstone.gemfire.CancelException;
+ import com.gemstone.gemfire.InvalidDeltaException;
+ import com.gemstone.gemfire.SystemFailure;
+ import com.gemstone.gemfire.cache.CacheWriterException;
+ import com.gemstone.gemfire.cache.EntryEvent;
+ import com.gemstone.gemfire.cache.EntryNotFoundException;
+ import com.gemstone.gemfire.cache.Operation;
+ import com.gemstone.gemfire.cache.TimeoutException;
+ import com.gemstone.gemfire.cache.query.IndexMaintenanceException;
+ import com.gemstone.gemfire.cache.query.QueryException;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexProtocol;
+ import com.gemstone.gemfire.cache.util.GatewayConflictHelper;
+ import com.gemstone.gemfire.cache.util.GatewayConflictResolver;
+ import com.gemstone.gemfire.distributed.internal.DM;
+ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+ import com.gemstone.gemfire.internal.Assert;
+ import com.gemstone.gemfire.internal.ByteArrayDataInput;
+ import com.gemstone.gemfire.internal.HeapDataOutputStream;
+ import com.gemstone.gemfire.internal.InternalDataSerializer;
+ import com.gemstone.gemfire.internal.InternalStatisticsDisabledException;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.cache.lru.LRUClockNode;
+ import com.gemstone.gemfire.internal.cache.lru.NewLRUClockHand;
+ import com.gemstone.gemfire.internal.cache.persistence.DiskStoreID;
+ import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
+ import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
+ import com.gemstone.gemfire.internal.cache.versions.VersionSource;
+ import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+ import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.lang.StringUtils;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
-import com.gemstone.gemfire.internal.offheap.Chunk;
-import com.gemstone.gemfire.internal.offheap.ChunkWithHeapForm;
-import com.gemstone.gemfire.internal.offheap.GemFireChunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunkWithHeapForm;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ import com.gemstone.gemfire.internal.offheap.MemoryAllocator;
+ import com.gemstone.gemfire.internal.offheap.OffHeapCachedDeserializable;
+ import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+ import com.gemstone.gemfire.internal.offheap.ReferenceCountHelper;
+ import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
+ import com.gemstone.gemfire.internal.offheap.StoredObject;
+ import com.gemstone.gemfire.internal.offheap.annotations.Released;
+ import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+ import com.gemstone.gemfire.internal.util.BlobHelper;
+ import com.gemstone.gemfire.internal.util.Versionable;
+ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap;
+ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap.HashEntry;
+ import com.gemstone.gemfire.pdx.PdxInstance;
+ import com.gemstone.gemfire.pdx.PdxSerializable;
+ import com.gemstone.gemfire.pdx.PdxSerializationException;
+ import com.gemstone.gemfire.pdx.PdxSerializer;
+ import com.gemstone.gemfire.pdx.internal.ConvertableToBytes;
+ import com.gemstone.gemfire.pdx.internal.PdxInstanceImpl;
+
+ /**
+ * Abstract implementation class of RegionEntry interface.
+ * This is the topmost implementation class so common behavior
+ * lives here.
+ *
+ * @since 3.5.1
+ *
+ * @author Darrel Schneider
+ * @author bruce
+ *
+ */
+ public abstract class AbstractRegionEntry implements RegionEntry,
+ HashEntry<Object, Object> {
+
+ private static final Logger logger = LogService.getLogger();
+
+ /**
+ * Whether to disable last access time update when a put occurs. The default
+ * is false (enable last access time update on put). To disable it, set the
+ * 'gemfire.disableAccessTimeUpdateOnPut' system property.
+ */
+ protected static final boolean DISABLE_ACCESS_TIME_UPDATE_ON_PUT = Boolean
+ .getBoolean("gemfire.disableAccessTimeUpdateOnPut");
+
+ /*
+ * Flags for a Region Entry.
+ * These flags are stored in the msb of the long used to also store the lastModicationTime.
+ */
+ private static final long VALUE_RESULT_OF_SEARCH = 0x01L<<56;
+ private static final long UPDATE_IN_PROGRESS = 0x02L<<56;
+ private static final long TOMBSTONE_SCHEDULED = 0x04L<<56;
+ private static final long LISTENER_INVOCATION_IN_PROGRESS = 0x08L<<56;
+ /** used for LRUEntry instances. */
+ protected static final long RECENTLY_USED = 0x10L<<56;
+ /** used for LRUEntry instances. */
+ protected static final long EVICTED = 0x20L<<56;
+ /**
+ * Set if the entry is being used by a transactions.
+ * Some features (eviction and expiration) will not modify an entry when a tx is using it
+ * to prevent the tx to fail do to conflict.
+ */
+ protected static final long IN_USE_BY_TX = 0x40L<<56;
+
+
+ protected static final long MARKED_FOR_EVICTION = 0x80L<<56;
+ // public Exception removeTrace; // debugging hot loop in AbstractRegionMap.basicPut()
+
+ protected AbstractRegionEntry(RegionEntryContext context,
+ @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object value) {
+
+ setValue(context,this.prepareValueForCache(context, value, false),false);
+ // setLastModified(System.currentTimeMillis()); [bruce] this must be set later so we can use ==0 to know this is a new entry in checkForConflicts
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ ////////////////////////// instance methods /////////////////////////
+ /////////////////////////////////////////////////////////////////////
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IMSE_DONT_CATCH_IMSE")
+ public boolean dispatchListenerEvents(final EntryEventImpl event) throws InterruptedException {
+ final LocalRegion rgn = event.getRegion();
+
+ if (event.callbacksInvoked()) {
+ return true;
+ }
+
+ // don't wait for certain events to reach the head of the queue before
+ // dispatching listeners. However, we must not notify the gateways for
+ // remote-origin ops out of order. Otherwise the other systems will have
+ // inconsistent content.
+
+ event.setCallbacksInvokedByCurrentThread();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} dispatching event {}", this, event);
+ }
+ // All the following code that sets "thr" is to workaround
+ // spurious IllegalMonitorStateExceptions caused by JVM bugs.
+ try {
+ // call invokeCallbacks while synced on RegionEntry
+ event.invokeCallbacks(rgn, event.inhibitCacheListenerNotification(), false);
+ return true;
+
+ } finally {
+ if (isRemoved() && !isTombstone() && !event.isEvicted()) {
+ // Phase 2 of region entry removal is done here. The first phase is done
+ // by the RegionMap. It is unclear why this code is needed. ARM destroy
+ // does this also and we are now doing it as phase3 of the ARM destroy.
+ removePhase2();
+ rgn.getRegionMap().removeEntry(event.getKey(), this, true, event, rgn, rgn.getIndexUpdater());
+ }
+ }
+ }
+
+ public long getLastAccessed() throws InternalStatisticsDisabledException {
+ throw new InternalStatisticsDisabledException();
+ }
+
+ public long getHitCount() throws InternalStatisticsDisabledException {
+ throw new InternalStatisticsDisabledException();
+ }
+
+ public long getMissCount() throws InternalStatisticsDisabledException {
+ throw new InternalStatisticsDisabledException();
+ }
+
+ protected void setLastModified(long lastModified) {
+ _setLastModified(lastModified);
+ }
+
+ public void txDidDestroy(long currTime) {
+ setLastModified(currTime);
+ }
+
+ public final void updateStatsForPut(long lastModifiedTime) {
+ setLastModified(lastModifiedTime);
+ }
+
+ public void setRecentlyUsed() {
+ // do nothing by default; only needed for LRU
+ }
+
+ public void updateStatsForGet(boolean hit, long time) {
+ // nothing needed
+ }
+
+ public void resetCounts() throws InternalStatisticsDisabledException {
+ throw new InternalStatisticsDisabledException();
+ }
+
+ public void _removePhase1() {
+ _setValue(Token.REMOVED_PHASE1);
+ // debugging for 38467 (hot thread in ARM.basicUpdate)
+ // this.removeTrace = new Exception("stack trace for thread " + Thread.currentThread());
+ }
+ public void removePhase1(LocalRegion r, boolean isClear) throws RegionClearedException {
+ _removePhase1();
+ }
+
+ public void removePhase2() {
+ _setValue(Token.REMOVED_PHASE2);
+ // this.removeTrace = new Exception("stack trace for thread " + Thread.currentThread());
+ }
+
+ public void makeTombstone(LocalRegion r, VersionTag version) throws RegionClearedException {
+ assert r.getVersionVector() != null;
+ assert version != null;
+ if (r.getServerProxy() == null &&
+ r.getVersionVector().isTombstoneTooOld(version.getMemberID(), version.getRegionVersion())) {
+ // distributed gc with higher vector version preempts this operation
+ if (!isTombstone()) {
+ setValue(r, Token.TOMBSTONE);
+ r.incTombstoneCount(1);
+ }
+ r.getRegionMap().removeTombstone(this, version, false, true);
+ } else {
+ if (isTombstone()) {
+ // unschedule the old tombstone
+ r.unscheduleTombstone(this);
+ }
+ setRecentlyUsed();
+ boolean newEntry = (getValueAsToken() == Token.REMOVED_PHASE1);
+ setValue(r, Token.TOMBSTONE);
+ r.scheduleTombstone(this, version);
+ if (newEntry) {
+ // bug #46631 - entry count is decremented by scheduleTombstone but this is a new entry
+ r.getCachePerfStats().incEntryCount(1);
+ }
+ }
+ }
+
+
+ @Override
+ public void setValueWithTombstoneCheck(@Unretained Object v, EntryEvent e) throws RegionClearedException {
+ if (v == Token.TOMBSTONE) {
+ makeTombstone((LocalRegion)e.getRegion(), ((EntryEventImpl)e).getVersionTag());
+ } else {
+ setValue((LocalRegion)e.getRegion(), v, (EntryEventImpl)e);
+ }
+ }
+
+ /**
+ * Return true if the object is removed.
+ *
+ * TODO this method does NOT return true if the object
+ * is Token.DESTROYED. dispatchListenerEvents relies on that
+ * fact to avoid removing destroyed tokens from the map.
+ * We should refactor so that this method calls Token.isRemoved,
+ * and places that don't want a destroyed Token can explicitly check
+ * for a DESTROY token.
+ */
+ public final boolean isRemoved() {
+ Token o = getValueAsToken();
+ return (o == Token.REMOVED_PHASE1) || (o == Token.REMOVED_PHASE2) || (o == Token.TOMBSTONE);
+ }
+
+ public final boolean isDestroyedOrRemoved() {
+ return Token.isRemoved(getValueAsToken());
+ }
+
+ public final boolean isDestroyedOrRemovedButNotTombstone() {
+ Token o = getValueAsToken();
+ return o == Token.DESTROYED || o == Token.REMOVED_PHASE1 || o == Token.REMOVED_PHASE2;
+ }
+
+ public final boolean isTombstone() {
+ return getValueAsToken() == Token.TOMBSTONE;
+ }
+
+ public final boolean isRemovedPhase2() {
+ return getValueAsToken() == Token.REMOVED_PHASE2;
+ }
+
+ public boolean fillInValue(LocalRegion region,
+ @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) InitialImageOperation.Entry dst,
+ ByteArrayDataInput in,
+ DM mgr)
+ {
+ dst.setSerialized(false); // starting default value
+
+ @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) final Object v;
+ if (isTombstone()) {
+ v = Token.TOMBSTONE;
+ } else {
+ v = getValue(region); // OFFHEAP: need to incrc, copy bytes, decrc
+ if (v == null) {
+ return false;
+ }
+ }
+
+ final boolean isEagerDeserialize = dst.isEagerDeserialize();
+ if (isEagerDeserialize) {
+ dst.clearEagerDeserialize();
+ }
+ dst.setLastModified(mgr, getLastModified()); // fix for bug 31059
+ if (v == Token.INVALID) {
+ dst.setInvalid();
+ }
+ else if (v == Token.LOCAL_INVALID) {
+ dst.setLocalInvalid();
+ }
+ else if (v == Token.TOMBSTONE) {
+ dst.setTombstone();
+ }
+ else if (v instanceof CachedDeserializable) {
+ // don't serialize here if it is not already serialized
+ // if(v instanceof ByteSource && CachedDeserializableFactory.preferObject()) {
+ // // For SQLFire we prefer eager deserialized
+ // dst.setEagerDeserialize();
+ // }
+
+ if (v instanceof StoredObject && !((StoredObject) v).isSerialized()) {
+ dst.value = ((StoredObject) v).getDeserializedForReading();
+ } else {
+ /*if (v instanceof ByteSource && CachedDeserializableFactory.preferObject()) {
+ dst.value = v;
+ } else */ {
+ Object tmp = ((CachedDeserializable) v).getValue();
+ if (tmp instanceof byte[]) {
+ byte[] bb = (byte[]) tmp;
+ dst.value = bb;
+ } else {
+ try {
+ HeapDataOutputStream hdos = new HeapDataOutputStream(
+ Version.CURRENT);
+ BlobHelper.serializeTo(tmp, hdos);
+ hdos.trim();
+ dst.value = hdos;
+ } catch (IOException e) {
+ RuntimeException e2 = new IllegalArgumentException(
+ LocalizedStrings.AbstractRegionEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING
+ .toLocalizedString());
+ e2.initCause(e);
+ throw e2;
+ }
+ }
+ dst.setSerialized(true);
+ }
+ }
+ }
+ else if (v instanceof byte[]) {
+ dst.value = v;
+ }
+ else {
+ Object preparedValue = v;
+ if (preparedValue != null) {
+ preparedValue = prepareValueForGII(preparedValue);
+ if (preparedValue == null) {
+ return false;
+ }
+ }
+ if (CachedDeserializableFactory.preferObject()) {
+ dst.value = preparedValue;
+ dst.setEagerDeserialize();
+ }
+ else {
+ try {
+ HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
+ BlobHelper.serializeTo(preparedValue, hdos);
+ hdos.trim();
+ dst.value = hdos;
+ dst.setSerialized(true);
+ } catch (IOException e) {
+ RuntimeException e2 = new IllegalArgumentException(LocalizedStrings.AbstractRegionEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING.toLocalizedString());
+ e2.initCause(e);
+ throw e2;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * To fix bug 49901 if v is a GatewaySenderEventImpl then make
+ * a heap copy of it if it is offheap.
+ * @return the value to provide to the gii request; null if no value should be provided.
+ */
+ public static Object prepareValueForGII(Object v) {
+ assert v != null;
+ if (v instanceof GatewaySenderEventImpl) {
+ return ((GatewaySenderEventImpl) v).makeHeapCopyIfOffHeap();
+ } else {
+ return v;
+ }
+ }
+
+ public boolean isOverflowedToDisk(LocalRegion r, DistributedRegion.DiskPosition dp) {
+ return false;
+ }
+
+ @Override
+ public Object getValue(RegionEntryContext context) {
+ ReferenceCountHelper.createReferenceCountOwner();
+ @Retained Object result = _getValueRetain(context, true);
+ //Asif: If the thread is an Index Creation Thread & the value obtained is
+ //Token.REMOVED , we can skip synchronization block. This is required to prevent
+ // the dead lock caused if an Index Update Thread has gone into a wait holding the
+ // lock of the Entry object. There should not be an issue if the Index creation thread
+ // gets the temporary value of token.REMOVED as the correct value will get indexed
+ // by the Index Update Thread , once the index creation thread has exited.
+ // Part of Bugfix # 33336
+ // if ((result == Token.REMOVED_PHASE1 || result == Token.REMOVED_PHASE2) && !r.isIndexCreationThread()) {
+ // synchronized (this) {
+ // result = _getValue();
+ // }
+ // }
+
+ if (Token.isRemoved(result)) {
+ ReferenceCountHelper.setReferenceCountOwner(null);
+ return null;
+ } else {
+ result = OffHeapHelper.copyAndReleaseIfNeeded(result); // sqlf does not dec ref count in this call
+ ReferenceCountHelper.setReferenceCountOwner(null);
+ setRecentlyUsed();
+ return result;
+ }
+ }
+
+ @Override
+ @Retained
+ public Object getValueRetain(RegionEntryContext context) {
+ @Retained Object result = _getValueRetain(context, true);
+ if (Token.isRemoved(result)) {
+ return null;
+ } else {
+ setRecentlyUsed();
+ return result;
+ }
+ }
+
+ @Override
+ @Released
+ public void setValue(RegionEntryContext context, @Unretained Object value) throws RegionClearedException {
+ // @todo darrel: This will mark new entries as being recently used
+ // It might be better to only mark them when they are modified.
+ // Or should we only mark them on reads?
+ setValue(context,value,true);
+ }
+
+ @Override
+ public void setValue(RegionEntryContext context, Object value, EntryEventImpl event) throws RegionClearedException {
+ setValue(context,value);
+ }
+
+ @Released
+ protected void setValue(RegionEntryContext context, @Unretained Object value, boolean recentlyUsed) {
+ _setValue(value);
+ if (value != null && context != null && (this instanceof OffHeapRegionEntry)
+ && context instanceof LocalRegion && ((LocalRegion)context).isThisRegionBeingClosedOrDestroyed()) {
+ ((OffHeapRegionEntry)this).release();
+ ((LocalRegion)context).checkReadiness();
+ }
+ if (recentlyUsed) {
+ setRecentlyUsed();
+ }
+ }
+
+ /**
+ * This method determines if the value is in a compressed representation and decompresses it if it is.
+ *
+ * @param context the values context.
+ * @param value a region entry value.
+ *
+ * @return the decompressed form of the value parameter.
+ */
+ static Object decompress(RegionEntryContext context,Object value) {
+ if(isCompressible(context, value)) {
+ long time = context.getCachePerfStats().startDecompression();
+ value = EntryEventImpl.deserialize(context.getCompressor().decompress((byte[]) value));
+ context.getCachePerfStats().endDecompression(time);
+ }
+
+ return value;
+ }
+
+ static protected Object compress(RegionEntryContext context,Object value) {
+ return compress(context, value, null);
+ }
+
+ /**
+ * This method determines if the value is compressible and compresses it if it is.
+ *
+ * @param context the values context.
+ * @param value a region entry value.
+ *
+ * @return the compressed form of the value parameter.
+ */
+ static protected Object compress(RegionEntryContext context,Object value, EntryEventImpl event) {
+ if(isCompressible(context, value)) {
+ long time = context.getCachePerfStats().startCompression();
+ byte[] serializedValue;
+ if (event != null && event.getCachedSerializedNewValue() != null) {
+ serializedValue = event.getCachedSerializedNewValue();
+ if (value instanceof CachedDeserializable) {
+ CachedDeserializable cd = (CachedDeserializable) value;
+ if (!(cd.getValue() instanceof byte[])) {
+ // The cd now has the object form so use the cached serialized form in a new cd.
+ // This serialization is much cheaper than reserializing the object form.
+ serializedValue = EntryEventImpl.serialize(CachedDeserializableFactory.create(serializedValue));
+ } else {
+ serializedValue = EntryEventImpl.serialize(cd);
+ }
+ }
+ } else {
+ serializedValue = EntryEventImpl.serialize(value);
+ if (event != null && !(value instanceof byte[])) {
+ // See if we can cache the serialized new value in the event.
+ // If value is a byte[] then we don't cache it since it is not serialized.
+ if (value instanceof CachedDeserializable) {
+ // For a CacheDeserializable we want to only cache the wrapped value;
+ // not the serialized CacheDeserializable.
+ CachedDeserializable cd = (CachedDeserializable) value;
+ Object cdVal = cd.getValue();
+ if (cdVal instanceof byte[]) {
+ event.setCachedSerializedNewValue((byte[])cdVal);
+ }
+ } else {
+ event.setCachedSerializedNewValue(serializedValue);
+ }
+ }
+ }
+ value = context.getCompressor().compress(serializedValue);
+ context.getCachePerfStats().endCompression(time, serializedValue.length, ((byte []) value).length);
+ }
+
+ return value;
+ }
+
+ private static byte[] compressBytes(RegionEntryContext context, byte[] uncompressedBytes) {
+ byte[] result = uncompressedBytes;
+ if (isCompressible(context, uncompressedBytes)) {
+ long time = context.getCachePerfStats().startCompression();
+ result = context.getCompressor().compress(uncompressedBytes);
+ context.getCachePerfStats().endCompression(time, uncompressedBytes.length, result.length);
+ }
+ return result;
+ }
+
+
+ @Retained
+ public final Object getValueInVM(RegionEntryContext context) {
+ ReferenceCountHelper.createReferenceCountOwner();
+ @Retained Object v = _getValueRetain(context, true);
+
+ if (v == null) { // should only be possible if disk entry
+ v = Token.NOT_AVAILABLE;
+ }
+ @Retained Object result = OffHeapHelper.copyAndReleaseIfNeeded(v); // TODO OFFHEAP keep it offheap?
+ ReferenceCountHelper.setReferenceCountOwner(null);
+ return result;
+ }
+
+ @Retained
+ public Object getValueInVMOrDiskWithoutFaultIn(LocalRegion owner) {
+ return getValueInVM(owner);
+ }
+
+ @Override
+ @Retained
+ public Object getValueOffHeapOrDiskWithoutFaultIn(LocalRegion owner) {
+ @Retained Object result = _getValueRetain(owner, true);
+ // if (result instanceof ByteSource) {
+ // // If the ByteSource contains a Delta or ListOfDelta then we want to deserialize it
+ // Object deserVal = ((CachedDeserializable)result).getDeserializedForReading();
+ // if (deserVal != result) {
+ // OffHeapHelper.release(result);
+ // result = deserVal;
+ // }
+ // }
+ return result;
+ }
+
+ public Object getValueOnDisk(LocalRegion r)
+ throws EntryNotFoundException
+ {
+ throw new IllegalStateException(LocalizedStrings.AbstractRegionEntry_CANNOT_GET_VALUE_ON_DISK_FOR_A_REGION_THAT_DOES_NOT_ACCESS_THE_DISK.toLocalizedString());
+ }
+
+ public Object getSerializedValueOnDisk(final LocalRegion r)
+ throws EntryNotFoundException
+ {
+ throw new IllegalStateException(LocalizedStrings.AbstractRegionEntry_CANNOT_GET_VALUE_ON_DISK_FOR_A_REGION_THAT_DOES_NOT_ACCESS_THE_DISK.toLocalizedString());
+ }
+
+ public Object getValueOnDiskOrBuffer(LocalRegion r)
+ throws EntryNotFoundException
+ {
+ throw new IllegalStateException(LocalizedStrings.AbstractRegionEntry_CANNOT_GET_VALUE_ON_DISK_FOR_A_REGION_THAT_DOES_NOT_ACCESS_THE_DISK.toLocalizedString());
+ // @todo darrel if value is Token.REMOVED || Token.DESTROYED throw EntryNotFoundException
+ }
+
+ public final boolean initialImagePut(final LocalRegion region,
+ final long lastModifiedTime,
+ Object newValue,
+ boolean wasRecovered,
+ boolean versionTagAccepted) throws RegionClearedException
+ {
+ // note that the caller has already write synced this RegionEntry
+ return initialImageInit(region, lastModifiedTime, newValue, this.isTombstone(), wasRecovered, versionTagAccepted);
+ }
+
+ public boolean initialImageInit(final LocalRegion region,
+ final long lastModifiedTime,
+ final Object newValue,
+ final boolean create,
+ final boolean wasRecovered,
+ final boolean versionTagAccepted) throws RegionClearedException
+ {
+ // note that the caller has already write synced this RegionEntry
+ boolean result = false;
+ // if it has been destroyed then don't do anything
+ Token vTok = getValueAsToken();
+ if (versionTagAccepted || create || (vTok != Token.DESTROYED || vTok != Token.TOMBSTONE)) { // OFFHEAP noop
+ Object newValueToWrite = newValue;
+ boolean putValue = versionTagAccepted || create
+ || (newValueToWrite != Token.LOCAL_INVALID
+ && (wasRecovered || (vTok == Token.LOCAL_INVALID))); // OFFHEAP noop
+
+ if (region.isUsedForPartitionedRegionAdmin() && newValueToWrite instanceof CachedDeserializable) {
+ // Special case for partitioned region meta data
+ // We do not need the RegionEntry on this case.
+ // Because the pr meta data region will not have an LRU.
+ newValueToWrite = ((CachedDeserializable) newValueToWrite).getDeserializedValue(region, null);
+ if (!create && newValueToWrite instanceof Versionable) {
+ @Retained @Released final Object oldValue = getValueInVM(region); // Heap value should always be deserialized at this point // OFFHEAP will not be deserialized
+ try {
+ // BUGFIX for 35029. If oldValue is null the newValue should be put.
+ if(oldValue == null) {
+ putValue = true;
+ }
+ else if (oldValue instanceof Versionable) {
+ Versionable nv = (Versionable) newValueToWrite;
+ Versionable ov = (Versionable) oldValue;
+ putValue = nv.isNewerThan(ov);
+ }
+ } finally {
+ OffHeapHelper.release(oldValue);
+ }
+ }
+ }
+
+ if (putValue) {
+ // change to INVALID if region itself has been invalidated,
+ // and current value is recovered
+ if (create || versionTagAccepted) {
+ // At this point, since we now always recover from disk first,
+ // we only care about "isCreate" since "isRecovered" is impossible
+ // if we had a regionInvalidate or regionClear
+ ImageState imageState = region.getImageState();
+ // this method is called during loadSnapshot as well as getInitialImage
+ if (imageState.getRegionInvalidated()) {
+ if (newValueToWrite != Token.TOMBSTONE) {
+ newValueToWrite = Token.INVALID;
+ }
+ }
+ else if (imageState.getClearRegionFlag()) {
+ boolean entryOK = false;
+ RegionVersionVector rvv = imageState.getClearRegionVersionVector();
+ if (rvv != null) { // a filtered clear
+ VersionSource id = getVersionStamp().getMemberID();
+ if (id == null) {
+ id = region.getVersionMember();
+ }
+ if (!rvv.contains(id, getVersionStamp().getRegionVersion())) {
+ entryOK = true;
+ }
+ }
+ if (!entryOK) {
+ //Asif: If the region has been issued cleared during
+ // the GII , then those entries loaded before this one would have
+ // been cleared from the Map due to clear operation & for the
+ // currententry whose key may have escaped the clearance , will be
+ // cleansed by the destroy token.
+ newValueToWrite = Token.DESTROYED;
+ imageState.addDestroyedEntry(this.getKey());
+ throw new RegionClearedException(LocalizedStrings.AbstractRegionEntry_DURING_THE_GII_PUT_OF_ENTRY_THE_REGION_GOT_CLEARED_SO_ABORTING_THE_OPERATION.toLocalizedString());
+ }
+ }
+ }
+ setValue(region, this.prepareValueForCache(region, newValueToWrite, false));
+ result = true;
+
+ if (newValueToWrite != Token.TOMBSTONE){
+ if (create) {
+ region.getCachePerfStats().incCreates();
+ }
+ region.updateStatsForPut(this, lastModifiedTime, false);
+ }
+
+ if (logger.isTraceEnabled()) {
+ if (newValueToWrite instanceof CachedDeserializable) {
+ logger.trace("ProcessChunk: region={}; put a CachedDeserializable ({},{})",
+ region.getFullPath(), getKey(),((CachedDeserializable)newValueToWrite).getStringForm());
+ }
+ else {
+ logger.trace("ProcessChunk: region={}; put({},{})", region.getFullPath(), getKey(), StringUtils.forceToString(newValueToWrite));
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @throws EntryNotFoundException if expectedOldValue is
+ * not null and is not equal to current value
+ */
+ @Released
+ public final boolean destroy(LocalRegion region,
+ EntryEventImpl event,
+ boolean inTokenMode,
+ boolean cacheWrite,
+ @Unretained Object expectedOldValue,
+ boolean forceDestroy,
+ boolean removeRecoveredEntry)
+ throws CacheWriterException,
+ EntryNotFoundException,
+ TimeoutException,
+ RegionClearedException {
+ boolean proceed = false;
+ {
+ // A design decision was made to not retrieve the old value from the disk
+ // if the entry has been evicted to only have the CacheListener afterDestroy
+ // method ignore it. We don't want to pay the performance penalty. The
+ // getValueInVM method does not retrieve the value from disk if it has been
+ // evicted. Instead, it uses the NotAvailable token.
+ //
+ // If the region is a WAN queue region, the old value is actually used by the
+ // afterDestroy callback on a secondary. It is not needed on a primary.
+ // Since the destroy that sets WAN_QUEUE_TOKEN always originates on the primary
+ // we only pay attention to WAN_QUEUE_TOKEN if the event is originRemote.
+ //
+ // :ezoerner:20080814 We also read old value from disk or buffer
+ // in the case where there is a non-null expectedOldValue
+ // see PartitionedRegion#remove(Object key, Object value)
+ ReferenceCountHelper.skipRefCountTracking();
+ @Retained @Released Object curValue = _getValueRetain(region, true);
+ ReferenceCountHelper.unskipRefCountTracking();
+ try {
+ if (curValue == null) curValue = Token.NOT_AVAILABLE;
+
+ if (curValue == Token.NOT_AVAILABLE) {
+ // In some cases we need to get the current value off of disk.
+
+ // if the event is transmitted during GII and has an old value, it was
+ // the state of the transmitting cache's entry & should be used here
+ if (event.getCallbackArgument() != null
+ && event.getCallbackArgument().equals(RegionQueue.WAN_QUEUE_TOKEN)
+ && event.isOriginRemote()) { // check originRemote for bug 40508
+ //curValue = getValue(region); can cause deadlock if GII is occurring
+ curValue = getValueOnDiskOrBuffer(region);
+ }
+ else {
+ FilterProfile fp = region.getFilterProfile();
+ // rdubey: Old value also required for SqlfIndexManager.
+ if (fp != null && ((fp.getCqCount() > 0) || expectedOldValue != null
+ || event.getRegion().getIndexUpdater() != null)) {
+ //curValue = getValue(region); can cause deadlock will fault in the value
+ // and will confuse LRU. rdubey.
+ curValue = getValueOnDiskOrBuffer(region);
+ }
+ }
+ }
+
+ if (expectedOldValue != null) {
+ if (!checkExpectedOldValue(expectedOldValue, curValue, region)) {
+ throw new EntryNotFoundException(
+ LocalizedStrings.AbstractRegionEntry_THE_CURRENT_VALUE_WAS_NOT_EQUAL_TO_EXPECTED_VALUE.toLocalizedString());
+ }
+ }
+
+ if (inTokenMode && event.hasOldValue()) {
+ proceed = true;
+ }
+ else {
+ proceed = event.setOldValue(curValue, curValue instanceof GatewaySenderEventImpl) || removeRecoveredEntry
+ || forceDestroy || region.getConcurrencyChecksEnabled() // fix for bug #47868 - create a tombstone
+ || (event.getOperation() == Operation.REMOVE // fix for bug #42242
+ && (curValue == null || curValue == Token.LOCAL_INVALID
+ || curValue == Token.INVALID));
+ }
+ } finally {
+ OffHeapHelper.releaseWithNoTracking(curValue);
+ }
+ } // end curValue block
+
+ if (proceed) {
+ //Generate the version tag if needed. This method should only be
+ //called if we are in fact going to destroy the entry, so it must be
+ //after the entry not found exception above.
+ if(!removeRecoveredEntry) {
+ region.generateAndSetVersionTag(event, this);
+ }
+ if (cacheWrite) {
+ region.cacheWriteBeforeDestroy(event, expectedOldValue);
+ if (event.getRegion().getServerProxy() != null) { // server will return a version tag
+ // update version information (may throw ConcurrentCacheModificationException)
+ VersionStamp stamp = getVersionStamp();
+ if (stamp != null) {
+ stamp.processVersionTag(event);
+ }
+ }
+ }
+ region.recordEvent(event);
+ // don't do index maintenance on a destroy if the value in the
+ // RegionEntry (the old value) is invalid
+ if (!region.isProxy() && !isInvalid()) {
+ IndexManager indexManager = region.getIndexManager();
+ if (indexManager != null) {
+ try {
+ if(isValueNull()) {
+ @Released Object value = getValueOffHeapOrDiskWithoutFaultIn(region);
+ try {
+ _setValue(prepareValueForCache(region, value, false));
+ if (value != null && region != null && (this instanceof OffHeapRegionEntry) && region.isThisRegionBeingClosedOrDestroyed()) {
+ ((OffHeapRegionEntry)this).release();
+ region.checkReadiness();
+ }
+ } finally {
+ OffHeapHelper.release(value);
+ }
+ }
+ indexManager.updateIndexes(this,
+ IndexManager.REMOVE_ENTRY,
+ IndexProtocol.OTHER_OP);
+ }
+ catch (QueryException e) {
+ throw new IndexMaintenanceException(e);
+ }
+ }
+ }
+
+ boolean removeEntry = false;
+ VersionTag v = event.getVersionTag();
+ if (region.concurrencyChecksEnabled && !removeRecoveredEntry
+ && !event.isFromRILocalDestroy()) { // bug #46780, don't retain tombstones for entries destroyed for register-interest
+ // Destroy will write a tombstone instead
+ if (v == null || !v.hasValidVersion()) {
+ // localDestroy and eviction and ops received with no version tag
+ // should create a tombstone using the existing version stamp, as should
+ // (bug #45245) responses from servers that do not have valid version information
+ VersionStamp stamp = this.getVersionStamp();
+ if (stamp != null) { // proxy has no stamps
+ v = stamp.asVersionTag();
+ event.setVersionTag(v);
+ }
+ }
+ removeEntry = (v == null) || !v.hasValidVersion();
+ } else {
+ removeEntry = true;
+ }
+
+ // See #47887, we do not insert a tombstone for evicted HDFS
+ // entries since the value is still present in HDFS
+ // Check if we have to evict or just do destroy.
+ boolean forceRemoveEntry =
+ (event.isEviction() || event.isExpiration())
+ && event.getRegion().isUsedForPartitionedRegionBucket()
+ && event.getRegion().getPartitionedRegion().isHDFSRegion();
+
+ if (removeEntry || forceRemoveEntry) {
+ boolean isThisTombstone = isTombstone();
+ if(inTokenMode && !event.getOperation().isEviction()) {
+ setValue(region, Token.DESTROYED);
+ } else {
+ removePhase1(region, false);
+ }
+ if (isThisTombstone) {
+ region.unscheduleTombstone(this);
+ }
+ } else {
+ makeTombstone(region, v);
+ }
+
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
+
+
+ static boolean checkExpectedOldValue(@Unretained Object expectedOldValue, @Unretained Object actualValue, LocalRegion lr) {
+ if (Token.isInvalid(expectedOldValue)) {
+ return (actualValue == null) || Token.isInvalid(actualValue);
+ } else {
+ boolean isCompressedOffHeap = lr.getAttributes().getOffHeap() && lr.getAttributes().getCompressor() != null;
+ return checkEquals(expectedOldValue, actualValue, isCompressedOffHeap);
+ }
+ }
+
+ private static boolean basicEquals(Object v1, Object v2) {
+ if (v2 != null) {
+ if (v2.getClass().isArray()) {
+ // fix for 52093
+ if (v2 instanceof byte[]) {
+ if (v1 instanceof byte[]) {
+ return Arrays.equals((byte[])v2, (byte[])v1);
+ } else {
+ return false;
+ }
+ } else if (v2 instanceof Object[]) {
+ if (v1 instanceof Object[]) {
+ return Arrays.deepEquals((Object[])v2, (Object[])v1);
+ } else {
+ return false;
+ }
+ } else if (v2 instanceof int[]) {
+ if (v1 instanceof int[]) {
+ return Arrays.equals((int[])v2, (int[])v1);
+ } else {
+ return false;
+ }
+ } else if (v2 instanceof long[]) {
+ if (v1 instanceof long[]) {
+ return Arrays.equals((long[])v2, (long[])v1);
+ } else {
+ return false;
+ }
+ } else if (v2 instanceof boolean[]) {
+ if (v1 instanceof boolean[]) {
+ return Arrays.equals((boolean[])v2, (boolean[])v1);
+ } else {
+ return false;
+ }
+ } else if (v2 instanceof short[]) {
+ if (v1 instanceof short[]) {
+ return Arrays.equals((short[])v2, (short[])v1);
+ } else {
+ return false;
+ }
+ } else if (v2 instanceof char[]) {
+ if (v1 instanceof char[]) {
+ return Arrays.equals((char[])v2, (char[])v1);
+ } else {
+ return false;
+ }
+ } else if (v2 instanceof float[]) {
+ if (v1 instanceof float[]) {
+ return Arrays.equals((float[])v2, (float[])v1);
+ } else {
+ return false;
+ }
+ } else if (v2 instanceof double[]) {
+ if (v1 instanceof double[]) {
+ return Arrays.equals((double[])v2, (double[])v1);
+ } else {
+ return false;
+ }
+ }
+ // fall through and call equals method
+ }
+ return v2.equals(v1);
+ } else {
+ return v1 == null;
+ }
+ }
+
+ static boolean checkEquals(@Unretained Object v1, @Unretained Object v2, boolean isCompressedOffHeap) {
+ // need to give PdxInstance#equals priority
+ if (v1 instanceof PdxInstance) {
+ return checkPdxEquals((PdxInstance)v1, v2);
+ } else if (v2 instanceof PdxInstance) {
+ return checkPdxEquals((PdxInstance)v2, v1);
+ } else if (v1 instanceof OffHeapCachedDeserializable) {
+ return checkOffHeapEquals((OffHeapCachedDeserializable)v1, v2);
+ } else if (v2 instanceof OffHeapCachedDeserializable) {
+ return checkOffHeapEquals((OffHeapCachedDeserializable)v2, v1);
+ } else if (v1 instanceof CachedDeserializable) {
+ return checkCDEquals((CachedDeserializable)v1, v2, isCompressedOffHeap);
+ } else if (v2 instanceof CachedDeserializable) {
+ return checkCDEquals((CachedDeserializable)v2, v1, isCompressedOffHeap);
+ } else {
+ return basicEquals(v1, v2);
+ }
+ }
+ private static boolean checkOffHeapEquals(@Unretained OffHeapCachedDeserializable cd, @Unretained Object obj) {
+ if (cd.isSerializedPdxInstance()) {
+ PdxInstance pi = InternalDataSerializer.readPdxInstance(cd.getSerializedValue(), GemFireCacheImpl.getForPdx("Could not check value equality"));
+ return checkPdxEquals(pi, obj);
+ }
+ if (obj instanceof OffHeapCachedDeserializable) {
+ return cd.checkDataEquals((OffHeapCachedDeserializable)obj);
+ } else {
+ byte[] serializedObj;
+ if (obj instanceof CachedDeserializable) {
+ if (!cd.isSerialized()) {
+ if (obj instanceof StoredObject && !((StoredObject) obj).isSerialized()) {
+ // both are byte[]
+ // obj must be DataAsAddress since it was not OffHeapCachedDeserializable
+ // so its byte[] will be small.
+ byte[] objBytes = (byte[]) ((StoredObject) obj).getDeserializedForReading();
+ return cd.checkDataEquals(objBytes);
+ } else {
+ return false;
+ }
+ }
+ serializedObj = ((CachedDeserializable) obj).getSerializedValue();
+ } else if (obj instanceof byte[]) {
+ if (cd.isSerialized()) {
+ return false;
+ }
+ serializedObj = (byte[]) obj;
+ } else {
+ if (!cd.isSerialized()) {
+ return false;
+ }
+ if (obj == null || obj == Token.NOT_AVAILABLE
+ || Token.isInvalidOrRemoved(obj)) {
+ return false;
+ }
+ serializedObj = EntryEventImpl.serialize(obj);
+ }
+ return cd.checkDataEquals(serializedObj);
+ }
+ }
+
+ private static boolean checkCDEquals(CachedDeserializable cd, Object obj, boolean isCompressedOffHeap) {
+ if (cd instanceof StoredObject && !((StoredObject) cd).isSerialized()) {
+ // cd is an actual byte[].
+ byte[] ba2;
+ if (obj instanceof StoredObject) {
+ if (!((StoredObject) obj).isSerialized()) {
+ return false;
+ }
+ ba2 = (byte[]) ((StoredObject) obj).getDeserializedForReading();
+ } else if (obj instanceof byte[]) {
+ ba2 = (byte[]) obj;
+ } else {
+ return false;
+ }
+ byte[] ba1 = (byte[]) cd.getDeserializedForReading();
+ return Arrays.equals(ba1, ba2);
+ }
+ Object cdVal = cd.getValue();
+ if (cdVal instanceof byte[]) {
+ byte[] cdValBytes = (byte[])cdVal;
+ PdxInstance pi = InternalDataSerializer.readPdxInstance(cdValBytes, GemFireCacheImpl.getForPdx("Could not check value equality"));
+ if (pi != null) {
+ return checkPdxEquals(pi, obj);
+ }
+ if (isCompressedOffHeap) { // fix for bug 52248
+ byte[] serializedObj;
+ if (obj instanceof CachedDeserializable) {
+ serializedObj = ((CachedDeserializable) obj).getSerializedValue();
+ } else {
+ serializedObj = EntryEventImpl.serialize(obj);
+ }
+ return Arrays.equals(cdValBytes, serializedObj);
+ } else {
+ /**
+ * To be more compatible with previous releases do not compare the serialized forms here.
+ * Instead deserialize and call the equals method.
+ */
+ Object deserializedObj;
+ if (obj instanceof CachedDeserializable) {
+ deserializedObj =((CachedDeserializable) obj).getDeserializedForReading();
+ } else {
+ if (obj == null || obj == Token.NOT_AVAILABLE
+ || Token.isInvalidOrRemoved(obj)) {
+ return false;
+ }
+ // TODO OPTIMIZE: Before serializing all of obj we could get the top
+ // level class name of cdVal and compare it to the top level class name of obj.
+ deserializedObj = obj;
+ }
+ return basicEquals(deserializedObj, cd.getDeserializedForReading());
+ }
+ // boolean result = Arrays.equals((byte[])cdVal, serializedObj);
+ // if (!result) {
+ // try {
+ // Object o1 = BlobHelper.deserializeBlob((byte[])cdVal);
+ // Object o2 = BlobHelper.deserializeBlob(serializedObj);
+ // SimpleMemoryAllocatorImpl.debugLog("checkCDEquals o1=<" + o1 + "> o2=<" + o2 + ">", false);
+ // if (o1.equals(o2)) {
+ // SimpleMemoryAllocatorImpl.debugLog("they are equal! a1=<" + Arrays.toString((byte[])cdVal) + "> a2=<" + Arrays.toString(serializedObj) + ">", false);
+ // }
+ // } catch (IOException e) {
+ // // TODO Auto-generated catch block
+ // e.printStackTrace();
+ // } catch (ClassNotFoundException e) {
+ // // TODO Auto-generated catch block
+ // e.printStackTrace();
+ // }
+ // }
+ // return result;
+ } else {
+ // prefer object form
+ if (obj instanceof CachedDeserializable) {
+ // TODO OPTIMIZE: Before deserializing all of obj we could get the top
+ // class name of cdVal and the top level class name of obj and compare.
+ obj = ((CachedDeserializable) obj).getDeserializedForReading();
+ }
+ return basicEquals(cdVal, obj);
+ }
+ }
+ /**
+ * This method fixes bug 43643
+ */
+ private static boolean checkPdxEquals(PdxInstance pdx, Object obj) {
+ if (!(obj instanceof PdxInstance)) {
+ // obj may be a CachedDeserializable in which case we want to convert it to a PdxInstance even if we are not readSerialized.
+ if (obj instanceof CachedDeserializable) {
+ if (obj instanceof StoredObject && !((StoredObject) obj).isSerialized()) {
+ // obj is actually a byte[] which will never be equal to a PdxInstance
+ return false;
+ }
+ Object cdVal = ((CachedDeserializable) obj).getValue();
+ if (cdVal instanceof byte[]) {
+ byte[] cdValBytes = (byte[]) cdVal;
+ PdxInstance pi = InternalDataSerializer.readPdxInstance(cdValBytes, GemFireCacheImpl.getForPdx("Could not check value equality"));
+ if (pi != null) {
+ return pi.equals(pdx);
+ } else {
+ // since obj is serialized as something other than pdx it must not equal our pdx
+ return false;
+ }
+ } else {
+ // remove the cd wrapper so that obj is the actual value we want to compare.
+ obj = cdVal;
+ }
+ }
+ if (obj.getClass().getName().equals(pdx.getClassName())) {
+ GemFireCacheImpl gfc = GemFireCacheImpl.getForPdx("Could not access Pdx registry");
+ if (gfc != null) {
+ PdxSerializer pdxSerializer;
+ if (obj instanceof PdxSerializable) {
+ pdxSerializer = null;
+ } else {
+ pdxSerializer = gfc.getPdxSerializer();
+ }
+ if (pdxSerializer != null || obj instanceof PdxSerializable) {
+ // try to convert obj to a PdxInstance
+ HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
+ try {
+ if (InternalDataSerializer.autoSerialized(obj, hdos) ||
+ InternalDataSerializer.writePdx(hdos, gfc, obj, pdxSerializer)) {
+ PdxInstance pi = InternalDataSerializer.readPdxInstance(hdos.toByteArray(), gfc);
+ if (pi != null) {
+ obj = pi;
+ }
+ }
+ } catch (IOException ignore) {
+ // we are not able to convert it so just fall through
+ } catch (PdxSerializationException ignore) {
+ // we are not able to convert it so just fall through
+ }
+ }
+ }
+ }
+ }
+ return basicEquals(obj, pdx);
+ }
+
+
+ /////////////////////////////////////////////////////////////
+ /////////////////////////// fields //////////////////////////
+ /////////////////////////////////////////////////////////////
+ // Do not add any instance fields to this class.
+ // Instead add them to LeafRegionEntry.cpp
+
+ public static class HashRegionEntryCreator implements
+ CustomEntryConcurrentHashMap.HashEntryCreator<Object, Object> {
+
+ public HashEntry<Object, Object> newEntry(final Object key, final int hash,
+ final HashEntry<Object, Object> next, final Object value) {
+ final AbstractRegionEntry entry = (AbstractRegionEntry)value;
+ // if hash is already set then assert that the two should be same
+ final int entryHash = entry.getEntryHash();
+ if (hash == 0 || entryHash != 0) {
+ if (entryHash != hash) {
+ Assert.fail("unexpected mismatch of hash, expected=" + hash
+ + ", actual=" + entryHash + " for " + entry);
+ }
+ }
+ entry.setEntryHash(hash);
+ entry.setNextEntry(next);
+ return entry;
+ }
+
+ public int keyHashCode(final Object key, final boolean compareValues) {
+ return CustomEntryConcurrentHashMap.keyHash(key, compareValues);
+ }
+ };
+
+ public abstract Object getKey();
+
+ protected static boolean okToStoreOffHeap(Object v, AbstractRegionEntry e) {
+ if (v == null) return false;
+ if (Token.isInvalidOrRemoved(v)) return false;
+ if (v == Token.NOT_AVAILABLE) return false;
+ if (v instanceof DiskEntry.RecoveredEntry) return false; // The disk layer has special logic that ends up storing the nested value in the RecoveredEntry off heap
+ if (!(e instanceof OffHeapRegionEntry)) return false;
+ // TODO should we check for deltas here or is that a user error?
+ return true;
+ }
+
+ /**
+ * Default implementation. Override in subclasses with primitive keys
+ * to prevent creating an Object form of the key for each equality check.
+ */
+ @Override
+ public boolean isKeyEqual(Object k) {
+ return k.equals(getKey());
+ }
+
+ private static final long LAST_MODIFIED_MASK = 0x00FFFFFFFFFFFFFFL;
+
+ protected final void _setLastModified(long lastModifiedTime) {
+ if (lastModifiedTime < 0 || lastModifiedTime > LAST_MODIFIED_MASK) {
+ throw new IllegalStateException("Expected lastModifiedTime " + lastModifiedTime + " to be >= 0 and <= " + LAST_MODIFIED_MASK);
+ }
+ long storedValue;
+ long newValue;
+ do {
+ storedValue = getlastModifiedField();
+ newValue = storedValue & ~LAST_MODIFIED_MASK;
+ newValue |= lastModifiedTime;
+ } while (!compareAndSetLastModifiedField(storedValue, newValue));
+ }
+ protected abstract long getlastModifiedField();
+ protected abstract boolean compareAndSetLastModifiedField(long expectedValue, long newValue);
+ public final long getLastModified() {
+ return getlastModifiedField() & LAST_MODIFIED_MASK;
+ }
+ protected final boolean areAnyBitsSet(long bitMask) {
+ return ( getlastModifiedField() & bitMask ) != 0L;
+ }
+ /**
+ * Any bits in "bitMask" that are 1 will be set.
+ */
+ protected final void setBits(long bitMask) {
+ boolean done = false;
+ do {
+ long bits = getlastModifiedField();
+ long newBits = bits | bitMask;
+ if (bits == newBits) return;
+ done = compareAndSetLastModifiedField(bits, newBits);
+ } while(!done);
+ }
+ /**
+ * Any bits in "bitMask" that are 0 will be cleared.
+ */
+ protected final void clearBits(long bitMask) {
+ boolean done = false;
+ do {
+ long bits = getlastModifiedField();
+ long newBits = bits & bitMask;
+ if (bits == newBits) return;
+ done = compareAndSetLastModifiedField(bits, newBits);
+ } while(!done);
+ }
+
+ @Override
+ @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE)
+ public Object prepareValueForCache(RegionEntryContext r,
+ @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object val,
+ boolean isEntryUpdate) {
+ return prepareValueForCache(r, val, null, isEntryUpdate);
+ }
+
+ @Override
+ @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE)
+ public Object prepareValueForCache(RegionEntryContext r,
+ @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object val,
+ EntryEventImpl event, boolean isEntryUpdate) {
+ if (r != null && r.getOffHeap() && okToStoreOffHeap(val, this)) {
+ if (val instanceof StoredObject) {
+ // Check to see if val has the same compression settings as this region.
+ // The recursive calls in this section are safe because
+ // we only do it after copy the off-heap value to the heap.
+ // This is needed to fix bug 52057.
+ StoredObject soVal = (StoredObject) val;
+ assert !soVal.isCompressed();
+ if (r.getCompressor() != null) {
+ // val is uncompressed and we need a compressed value.
+ // So copy the off-heap value to the heap in a form that can be compressed.
+ byte[] valAsBytes = soVal.getValueAsHeapByteArray();
+ Object heapValue;
+ if (soVal.isSerialized()) {
+ heapValue = CachedDeserializableFactory.create(valAsBytes);
+ } else {
+ heapValue = valAsBytes;
+ }
+ return prepareValueForCache(r, heapValue, event, isEntryUpdate);
+ }
- if (val instanceof Chunk) {
++ if (val instanceof ObjectChunk) {
+ // if the reused guy has a refcount then need to inc it
- if (!((Chunk)val).retain()) {
++ if (!((ObjectChunk)val).retain()) {
+ throw new IllegalStateException("Could not use an off heap value because it was freed");
+ }
+ }
+ // else it is DataAsAddress. This code just returns it as prepared.
+ // TODO OFFHEAP: Review the callers to see if they will handle DataAsAddress correctly.
+ } else {
+ byte[] data;
+ boolean isSerialized = !(val instanceof byte[]);
+ if (isSerialized) {
+ if (event != null && event.getCachedSerializedNewValue() != null) {
+ data = event.getCachedSerializedNewValue();
+ } else if (val instanceof CachedDeserializable) {
+ data = ((CachedDeserializable)val).getSerializedValue();
+ // TODO OFFHEAP: cache data in event?
+ } else if (val instanceof PdxInstance) {
+ try {
+ data = ((ConvertableToBytes)val).toBytes();
+ // TODO OFFHEAP: cache data in event?
+ } catch (IOException e) {
+ throw new PdxSerializationException("Could not convert " + val + " to bytes", e);
+ }
+ } else {
+ data = EntryEventImpl.serialize(val);
+ // TODO OFFHEAP: cache data in event?
+ }
+ } else {
+ data = (byte[]) val;
+ }
+ byte[] compressedData = compressBytes(r, data);
+ boolean isCompressed = compressedData != data;
+ ReferenceCountHelper.setReferenceCountOwner(this);
+ MemoryAllocator ma = SimpleMemoryAllocatorImpl.getAllocator(); // fix for bug 47875
- val = ma.allocateAndInitialize(compressedData, isSerialized, isCompressed, GemFireChunk.TYPE); // TODO:KIRK:48068 race happens right after this line
++ val = ma.allocateAndInitialize(compressedData, isSerialized, isCompressed); // TODO:KIRK:48068 race happens right after this line
+ ReferenceCountHelper.setReferenceCountOwner(null);
- if (val instanceof GemFireChunk) {
- val = new com.gemstone.gemfire.internal.offheap.ChunkWithHeapForm((GemFireChunk)val, data);
++ if (val instanceof ObjectChunk) {
++ val = new ObjectChunkWithHeapForm((ObjectChunk)val, data);
+ }
+ // if (val instanceof Chunk && r instanceof LocalRegion) {
+ // Chunk c = (Chunk) val;
+ // LocalRegion lr = (LocalRegion) r;
+ // SimpleMemoryAllocatorImpl.debugLog("allocated @" + Long.toHexString(c.getMemoryAddress()) + " reg=" + lr.getFullPath(), false);
+ // }
+ }
+ return val;
+ }
+ @Unretained Object nv = val;
+ if (nv instanceof StoredObject) {
+ // This off heap value is being put into a on heap region.
+ byte[] data = ((StoredObject) nv).getSerializedValue();
+ nv = CachedDeserializableFactory.create(data);
+ }
+ // don't bother checking for SQLFire
+ if (!GemFireCacheImpl.sqlfSystem() && nv instanceof PdxInstanceImpl) {
+ // We do not want to put PDXs in the cache as values.
+ // So get the serialized bytes and use a CachedDeserializable.
+ try {
+ byte[] data = ((ConvertableToBytes)nv).toBytes();
+ byte[] compressedData = compressBytes(r, data);
+ if (data == compressedData) {
+ nv = CachedDeserializableFactory.create(data);
+ } else {
+ nv = compressedData;
+ }
+ } catch (IOException e) {
+ throw new PdxSerializationException("Could not convert " + nv + " to bytes", e);
+ }
+ } else {
+ nv = compress(r, nv, event);
+ }
+ return nv;
+ }
+
+ @Override
+ @Unretained
+ public final Object _getValue() {
+ return getValueField();
+ }
+
+ public final boolean isUpdateInProgress() {
+ return areAnyBitsSet(UPDATE_IN_PROGRESS);
+ }
+
+ public final void setUpdateInProgress(final boolean underUpdate) {
+ if (underUpdate) {
+ setBits(UPDATE_IN_PROGRESS);
+ } else {
+ clearBits(~UPDATE_IN_PROGRESS);
+ }
+ }
+
+
+ public final boolean isCacheListenerInvocationInProgress() {
+ return areAnyBitsSet(LISTENER_INVOCATION_IN_PROGRESS);
+ }
+
+ public final void setCacheListenerInvocationInProgress(final boolean listenerInvoked) {
+ if (listenerInvoked) {
+ setBits(LISTENER_INVOCATION_IN_PROGRESS);
+ } else {
+ clearBits(~LISTENER_INVOCATION_IN_PROGRESS);
+ }
+ }
+
+ @Override
+ public final boolean isInUseByTransaction() {
+ return areAnyBitsSet(IN_USE_BY_TX);
+ }
+
+ @Override
+ public final void setInUseByTransaction(final boolean v) {
+ if (v) {
+ setBits(IN_USE_BY_TX);
+ } else {
+ clearBits(~IN_USE_BY_TX);
+ }
+ }
+
+ @Override
+ public final synchronized void incRefCount() {
+ TXManagerImpl.incRefCount(this);
+ setInUseByTransaction(true);
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final boolean isMarkedForEviction() {
+ return areAnyBitsSet(MARKED_FOR_EVICTION);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void setMarkedForEviction() {
+ setBits(MARKED_FOR_EVICTION);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void clearMarkedForEviction() {
+ clearBits(~MARKED_FOR_EVICTION);
+ }
+
+ @Override
+ public final synchronized void decRefCount(NewLRUClockHand lruList, LocalRegion lr) {
+ if (TXManagerImpl.decRefCount(this)) {
+ if (isInUseByTransaction()) {
+ setInUseByTransaction(false);
+ if (lruList != null) {
+ // No more transactions, place in lru list
+ lruList.appendEntry((LRUClockNode)this);
+ }
+ if (lr != null && lr.isEntryExpiryPossible()) {
+ lr.addExpiryTaskIfAbsent(this);
+ }
+ }
+ }
+ }
+
+ @Override
+ public final synchronized void resetRefCount(NewLRUClockHand lruList) {
+ if (isInUseByTransaction()) {
+ setInUseByTransaction(false);
+ if (lruList != null) {
+ lruList.appendEntry((LRUClockNode)this);
+ }
+ }
+ }
+ /**
+ * soubhik: this method is overridden in sqlf flavor of entries.
+ * Instead of overriding this method; override areSetValue.
+ */
+ protected final void _setValue(Object val) {
+ setValueField(val);
+ }
+
+ @Override
+ public Token getValueAsToken() {
+ Object v = getValueField();
+ if (v == null || v instanceof Token) {
+ return (Token)v;
+ } else {
+ return Token.NOT_A_TOKEN;
+ }
+ }
+
+ /**
+ * Reads the value of this region entry.
+ * Provides low level access to the value field.
+ * @return possible OFF_HEAP_OBJECT (caller uses region entry reference)
+ */
+ @Unretained
+ protected abstract Object getValueField();
+ /**
+ * Set the value of this region entry.
+ * Provides low level access to the value field.
+ * @param v the new value to set
+ */
+ protected abstract void setValueField(@Unretained Object v);
+
+ @Retained
+ public Object getTransformedValue() {
+ return _getValueRetain(null, false);
+ }
+
+ public final boolean getValueWasResultOfSearch() {
+ return areAnyBitsSet(VALUE_RESULT_OF_SEARCH);
+ }
+
+ public final void setValueResultOfSearch(boolean v) {
+ if (v) {
+ setBits(VALUE_RESULT_OF_SEARCH);
+ } else {
+ clearBits(~VALUE_RESULT_OF_SEARCH);
+ }
+ }
+
+ public boolean hasValidVersion() {
+ VersionStamp stamp = (VersionStamp)this;
+ boolean has = stamp.getRegionVersion() != 0 || stamp.getEntryVersion() != 0;
+ return has;
+ }
+
+ public boolean hasStats() {
+ // override this in implementations that have stats
+ return false;
+ }
+
+ /**
+ * @see HashEntry#getMapValue()
+ */
+ public final Object getMapValue() {
+ return this;
+ }
+
+ /**
+ * @see HashEntry#setMapValue(Object)
+ */
+ public final void setMapValue(final Object newValue) {
+ if (this != newValue) {
+ Assert.fail("AbstractRegionEntry#setMapValue: unexpected setMapValue "
+ + "with newValue=" + newValue + ", this=" + this);
+ }
+ }
+
+ protected abstract void setEntryHash(int v);
+
+ @Override
+ public final String toString() {
+ final StringBuilder sb = new StringBuilder(this.getClass().getSimpleName())
+ .append('@').append(Integer.toHexString(System.identityHashCode(this)))
+ .append(" (");
+ return appendFieldsToString(sb).append(')').toString();
+ }
+
+ protected StringBuilder appendFieldsToString(final StringBuilder sb) {
+ sb.append("key=").append(getKey()).append("; rawValue=")
+ .append(_getValue()); // OFFHEAP _getValue ok: the current toString on OffHeapCachedDeserializable is safe to use without incing refcount.
+ VersionStamp stamp = getVersionStamp();
+ if (stamp != null) {
+ sb.append("; version=").append(stamp.asVersionTag()+";member="+stamp.getMemberID());
+ }
+ return sb;
+ }
+
+ /*
+ * (non-Javadoc)
+ * This generates version tags for outgoing messages for all subclasses
+ * supporting concurrency versioning. It also sets the entry's version
+ * stamp to the tag's values.
+ *
+ * @see com.gemstone.gemfire.internal.cache.RegionEntry#generateVersionTag(com.gemstone.gemfire.distributed.DistributedMember, boolean)
+ */
+ public VersionTag generateVersionTag(VersionSource mbr, boolean withDelta, LocalRegion region, EntryEventImpl event) {
+ VersionStamp stamp = this.getVersionStamp();
+ if (stamp != null && region.getServerProxy() == null) { // clients do not generate versions
+ int v = stamp.getEntryVersion()+1;
+ if (v > 0xFFFFFF) {
+ v -= 0x1000000; // roll-over
+ }
+ VersionSource previous = stamp.getMemberID();
+
+
+ //For non persistent regions, we allow the member to be null and
+ //when we send a message and the remote side can determine the member
+ //from the sender. For persistent regions, we need to send
+ //the persistent id to the remote side.
+ //
+ //TODO - RVV - optimize the way we send the persistent id to save
+ //space.
+ if(mbr == null) {
+ VersionSource regionMember = region.getVersionMember();
+ if(regionMember instanceof DiskStoreID) {
+ mbr = regionMember;
+ }
+ }
+
+ VersionTag tag = VersionTag.create(mbr);
+ tag.setEntryVersion(v);
+ if (region.getVersionVector() != null) {
+ // Use region version if already provided, else generate
+ long nextRegionVersion = event.getNextRegionVersion();
+ if (nextRegionVersion != -1) {
+ // Set on the tag and record it locally
+ tag.setRegionVersion(nextRegionVersion);
+ RegionVersionVector rvv = region.getVersionVector();
+ rvv.recordVersion(rvv.getOwnerId(),nextRegionVersion);
+ if (logger.isDebugEnabled()) {
+ logger.debug("recorded region version {}; region={}", nextRegionVersion, region.getFullPath());
+ }
+ } else {
+ tag.setRegionVersion(region.getVersionVector().getNextVersion());
+ }
+ }
+ if (withDelta) {
+ tag.setPreviousMemberID(previous);
+ }
+ VersionTag remoteTag = event.getVersionTag();
+ if (remoteTag != null && remoteTag.isGatewayTag()) {
+ // if this event was received from a gateway we use the remote system's
+ // timestamp and dsid.
+ tag.setVersionTimeStamp(remoteTag.getVersionTimeStamp());
+ tag.setDistributedSystemId(remoteTag.getDistributedSystemId());
+ tag.setAllowedByResolver(remoteTag.isAllowedByResolver());
+ } else {
+ long time = region.cacheTimeMillis();
+ int dsid = region.getDistributionManager().getDistributedSystemId();
+ // a locally generated change should always have a later timestamp than
+ // one received from a wan gateway, so fake a timestamp if necessary
+ if (time <= stamp.getVersionTimeStamp() && dsid != tag.getDistributedSystemId()) {
+ time = stamp.getVersionTimeStamp() + 1;
+ }
+ tag.setVersionTimeStamp(time);
+ tag.setDistributedSystemId(dsid);
+ }
+ stamp.setVersions(tag);
+ stamp.setMemberID(mbr);
+ event.setVersionTag(tag);
+ if (logger.isDebugEnabled()) {
+ logger.debug("generated tag {}; key={}; oldvalue={} newvalue={} client={} region={}; rvv={}", tag,
+ event.getKey(), event.getOldValueStringForm(), event.getNewValueStringForm(),
+ (event.getContext() == null? "none" : event.getContext().getDistributedMember().getName()),
+ region.getFullPath(), region.getVersionVector());
+ }
+ return tag;
+ }
+ return null;
+ }
+
+ /** set/unset the flag noting that a tombstone has been scheduled for this entry */
+ public void setTombstoneScheduled(boolean scheduled) {
+ if (scheduled) {
+ setBits(TOMBSTONE_SCHEDULED);
+ } else {
+ clearBits(~TOMBSTONE_SCHEDULED);
+ }
+ }
+
+ /**
+ * return the flag noting whether a tombstone has been scheduled for this entry. This should
+ * be called under synchronization on the region entry if you want an accurate result.
+ */
+ public boolean isTombstoneScheduled() {
+ return areAnyBitsSet(TOMBSTONE_SCHEDULED);
+ }
+
+ /*
+ * (non-Javadoc)
+ * This performs a concurrency check.
+ *
+ * This check compares the version number first, followed by the member ID.
+ *
+ * Wraparound of the version number is detected and handled by extending the
+ * range of versions by one bit.
+ *
+ * The normal membership ID comparison method is used.<p>
+ *
+ * Note that a tag from a remote (WAN) system may be in the event. If this
+ * is the case this method will either invoke a user plugin that allows/disallows
+ * the event (and may modify the value) or it determines whether to allow
+ * or disallow the event based on timestamps and distributedSystemIDs.
+ *
+ * @throws ConcurrentCacheModificationException if the event conflicts with
+ * an event that has already been applied to the entry.
+ *
+ * @see com.gemstone.gemfire.internal.cache.RegionEntry#concurrencyCheck(com.gemstone.gemfire.cache.EntryEvent)
+ */
+ public void processVersionTag(EntryEvent cacheEvent) {
+ processVersionTag(cacheEvent, true);
+ }
+
+
+ protected void processVersionTag(EntryEvent cacheEvent, boolean conflictCheck) {
+ EntryEventImpl event = (EntryEventImpl)cacheEvent;
+ VersionTag tag = event.getVersionTag();
+ if (tag == null) {
+ return;
+ }
+
+ try {
+ if (tag.isGatewayTag()) {
+ // this may throw ConcurrentCacheModificationException or modify the event
+ if (processGatewayTag(cacheEvent)) {
+ return;
+ }
+ assert false : "processGatewayTag failure - returned false";
+ }
+
+ if (!tag.isFromOtherMember()) {
+ if (!event.getOperation().isNetSearch()) {
+ // except for netsearch, all locally-generated tags can be ignored
+ return;
+ }
+ }
+
+ final InternalDistributedMember originator = (InternalDistributedMember)event.getDistributedMember();
+ final VersionSource dmId = event.getRegion().getVersionMember();
+ LocalRegion r = event.getLocalRegion();
+ boolean eventHasDelta = event.getDeltaBytes() != null && event.getRawNewValue() == null;
+
+ VersionStamp stamp = getVersionStamp();
+ // bug #46223, an event received from a peer or a server may be from a different
+ // distributed system than the last modification made to this entry so we must
+ // perform a gateway conflict check
+ if (stamp != null && !tag.isAllowedByResolver()) {
+ int stampDsId = stamp.getDistributedSystemId();
+ int tagDsId = tag.getDistributedSystemId();
+
+ if (stampDsId != 0 && stampDsId != tagDsId && stampDsId != -1) {
+ StringBuilder verbose = null;
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ verbose = new StringBuilder();
+ verbose.append("processing tag for key " + getKey() + ", stamp=" + stamp.asVersionTag() + ", tag=").append(tag);
+ }
+ long stampTime = stamp.getVersionTimeStamp();
+ long tagTime = tag.getVersionTimeStamp();
+ if (stampTime > 0 && (tagTime > stampTime
+ || (tagTime == stampTime && tag.getDistributedSystemId() >= stamp.getDistributedSystemId()))) {
+ if (verbose != null) {
+ verbose.append(" - allowing event");
+ logger.trace(LogMarker.TOMBSTONE, verbose);
+ }
+ // Update the stamp with event's version information.
+ applyVersionTag(r, stamp, tag, originator);
+ return;
+ }
+
+ if (stampTime > 0) {
+ if (verbose != null) {
+ verbose.append(" - disallowing event");
+ logger.trace(LogMarker.TOMBSTONE, verbose);
+ }
+ r.getCachePerfStats().incConflatedEventsCount();
+ persistConflictingTag(r, tag);
+ throw new ConcurrentCacheModificationException("conflicting event detected");
+ }
+ }
+ }
+
+ if (r.getVersionVector() != null &&
+ r.getServerProxy() == null &&
+ (r.getDataPolicy().withPersistence() ||
+ !r.getScope().isLocal())) { // bug #45258 - perf degradation for local regions and RVV
+ VersionSource who = tag.getMemberID();
+ if (who == null) {
+ who = originator;
+ }
+ r.getVersionVector().recordVersion(who, tag);
+ }
+
+ assert !tag.isFromOtherMember() || tag.getMemberID() != null : "remote tag is missing memberID";
+
+
+ // [bruce] for a long time I had conflict checks turned off in clients when
+ // receiving a response from a server and applying it to the cache. This lowered
+ // the CPU cost of versioning but eventually had to be pulled for bug #45453
+ // if (r.getServerProxy() != null && conflictCheck) {
+ // // events coming from servers while a local sync is held on the entry
+ // // do not require a conflict check. Conflict checks were already
+ // // performed on the server and here we just consume whatever was sent back.
+ // // Event.isFromServer() returns true for client-update messages and
+ // // for putAll/getAll, which do not hold syncs during the server operation.
+ // conflictCheck = event.isFromServer();
+ // }
+ // else
+
+ // [bruce] for a very long time we had conflict checks turned off for PR buckets.
+ // Bug 45669 showed a primary dying in the middle of distribution. This caused
+ // one backup bucket to have a v2. The other bucket was promoted to primary and
+ // generated a conflicting v2. We need to do the check so that if this second
+ // v2 loses to the original one in the delta-GII operation that the original v2
+ // will be the winner in both buckets.
+ // if (r.isUsedForPartitionedRegionBucket()) {
+ // conflictCheck = false; // primary/secondary model
+ // }
+
+ // The new value in event is not from GII, even it could be tombstone
+ basicProcessVersionTag(r, tag, false, eventHasDelta, dmId, originator, conflictCheck);
+ } catch (ConcurrentCacheModificationException ex) {
+ event.isConcurrencyConflict(true);
+ throw ex;
+ }
+ }
+
+ protected final void basicProcessVersionTag(LocalRegion region, VersionTag tag, boolean isTombstoneFromGII,
+ boolean deltaCheck, VersionSource dmId, InternalDistributedMember sender, boolean checkForConflict) {
+
+ StringBuilder verbose = null;
+
+ if (tag != null) {
+ VersionStamp stamp = getVersionStamp();
+
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ VersionTag stampTag = stamp.asVersionTag();
+ if (stampTag.hasValidVersion() && checkForConflict) { // only be verbose here if there's a possibility we might reject the operation
+ verbose = new StringBuilder();
+ verbose.append("processing tag for key " + getKey() + ", stamp=" + stamp.asVersionTag() + ", tag=").append(tag)
+ .append(", checkForConflict=").append(checkForConflict); //.append(", current value=").append(_getValue());
+ }
+ }
+
+ if (stamp == null) {
+ throw new IllegalStateException("message contained a version tag but this region has no version storage");
+ }
+
+ boolean apply = true;
+
+ try {
+ if (checkForConflict) {
+ apply = checkForConflict(region, stamp, tag, isTombstoneFromGII, deltaCheck, dmId, sender, verbose);
+ }
+ } catch (ConcurrentCacheModificationException e) {
+ // Even if we don't apply the operation we should always retain the
+ // highest timestamp in order for WAN conflict checks to work correctly
+ // because the operation may have been sent to other systems and been
+ // applied there
+ if (!tag.isGatewayTag()
+ && stamp.getDistributedSystemId() == tag.getDistributedSystemId()
+ && tag.getVersionTimeStamp() > stamp.getVersionTimeStamp()) {
+ stamp.setVersionTimeStamp(tag.getVersionTimeStamp());
+ tag.setTimeStampApplied(true);
+ if (verbose != null) {
+ verbose.append("\nThough in conflict the tag timestamp was more recent and was recorded.");
+ }
+ }
+ throw e;
+ } finally {
+ if (verbose != null) {
+ logger.trace(LogMarker.TOMBSTONE, verbose);
+ }
+ }
+
+ if (apply) {
+ applyVersionTag(region, stamp, tag, sender);
+ }
+ }
+ }
+
+
+ private void applyVersionTag(LocalRegion region, VersionStamp stamp, VersionTag tag, InternalDistributedMember sender) {
+ // stamp.setPreviousMemberID(stamp.getMemberID());
+ VersionSource mbr = tag.getMemberID();
+ if (mbr == null) {
+ mbr = sender;
+ }
+ mbr = region.getVersionVector().getCanonicalId(mbr);
+ tag.setMemberID(mbr);
+ stamp.setVersions(tag);
+ if (tag.hasPreviousMemberID()) {
+ if (tag.getPreviousMemberID() == null) {
+ tag.setPreviousMemberID(stamp.getMemberID());
+ } else {
+ tag.setPreviousMemberID(region.getVersionVector().getCanonicalId(
+ tag.getPreviousMemberID()));
+ }
+ }
+ }
+
+ /** perform conflict checking for a stamp/tag */
+ protected boolean checkForConflict(LocalRegion region,
+ VersionStamp stamp, VersionTag tag,
+ boolean isTombstoneFromGII,
+ boolean deltaCheck, VersionSource dmId,
+ InternalDistributedMember sender, StringBuilder verbose) {
+
+ int stampVersion = stamp.getEntryVersion();
+ int tagVersion = tag.getEntryVersion();
+
+ boolean throwex = false;
+ boolean apply = false;
+
+ if (stamp.getVersionTimeStamp() != 0) { // new entries have no timestamp
+ // check for wrap-around on the version number
+ long difference = tagVersion - stampVersion;
+ if (0x10000 < difference || difference < -0x10000) {
+ if (verbose != null) {
+ verbose.append("\nversion rollover detected: tag="+tagVersion + " stamp=" + stampVersion);
+ }
+ if (difference < 0) {
+ tagVersion += 0x1000000L;
+ } else {
+ stampVersion += 0x1000000L;
+ }
+ }
+ }
+ if (verbose != null) {
+ verbose.append("\nstamp=v").append(stampVersion)
+ .append(" tag=v").append(tagVersion);
+ }
+
+ if (deltaCheck) {
+ checkForDeltaConflict(region, stampVersion, tagVersion, stamp, tag, dmId, sender, verbose);
+ }
+
+ if (stampVersion == 0 || stampVersion < tagVersion) {
+ if (verbose != null) { verbose.append(" - applying change"); }
+ apply = true;
+ } else if (stampVersion > tagVersion) {
+ if (overwritingOldTombstone(region, stamp, tag, verbose) && tag.getVersionTimeStamp() > stamp.getVersionTimeStamp()) {
+ apply = true;
+ } else {
+ // check for an incoming expired tombstone from an initial image chunk.
+ if (tagVersion > 0
+ && isExpiredTombstone(region, tag.getVersionTimeStamp(), isTombstoneFromGII)
+ && tag.getVersionTimeStamp() > stamp.getVersionTimeStamp()) {
+ // A special case to apply: when remote entry is expired tombstone, then let local vs remote with newer timestamp to win
+ if (verbose != null) { verbose.append(" - applying change in Delta GII"); }
+ apply = true;
+ } else {
+ if (verbose != null) { verbose.append(" - disallowing"); }
+ throwex= true;
+ }
+ }
+ } else {
+ if (overwritingOldTombstone(region, stamp, tag, verbose)) {
+ apply = true;
+ } else {
+ // compare member IDs
+ VersionSource stampID = stamp.getMemberID();
+ if (stampID == null) {
+ stampID = dmId;
+ }
+ VersionSource tagID = tag.getMemberID();
+ if (tagID == null) {
+ tagID = sender;
+ }
+ if (verbose != null) { verbose.append("\ncomparing IDs"); }
+ int compare = stampID.compareTo(tagID);
+ if (compare < 0) {
+ if (verbose != null) { verbose.append(" - applying change"); }
+ apply = true;
+ } else if (compare > 0) {
+ if (verbose != null) { verbose.append(" - disallowing"); }
+ throwex = true;
+ } else if (tag.isPosDup()) {
+ if (verbose != null) { verbose.append(" - disallowing duplicate marked with posdup"); }
+ throwex = true;
+ } else /* if (isTombstoneFromGII && isTombstone()) {
+ if (verbose != null) { verbose.append(" - disallowing duplicate tombstone from GII"); }
+ return false; // bug #49601 don't schedule tombstones from GII if there's already one here
+ } else */ {
+ if (verbose != null) { verbose.append(" - allowing duplicate"); }
+ }
+ }
+ }
+
+ if (!apply && throwex) {
+ region.getCachePerfStats().incConflatedEventsCount();
+ persistConflictingTag(region, tag);
+ throw new ConcurrentCacheModificationException();
+ }
+
+ return apply;
+ }
+
+ private boolean isExpiredTombstone(LocalRegion region, long timestamp, boolean isTombstone) {
+ return isTombstone && (timestamp + TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT) <= region.cacheTimeMillis();
+ }
+
+ private boolean overwritingOldTombstone(LocalRegion region, VersionStamp stamp, VersionTag tag, StringBuilder verbose) {
+ // Tombstone GC does not use locking to stop operations when old tombstones
+ // are being removed. Because of this we might get an operation that was applied
+ // in another VM that has just reaped a tombstone and is now using a reset
+ // entry version number. Because of this we check the timestamp on the current
+ // local entry and see if it is old enough to have expired. If this is the case
+ // we accept the change and allow the tag to be recorded
+ long stampTime = stamp.getVersionTimeStamp();
+ if (isExpiredTombstone(region, stampTime, this.isTombstone())) {
+ // no local change since the tombstone would have timed out - accept the change
+ if (verbose != null) { verbose.append(" - accepting because local timestamp is old"); }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ protected void persistConflictingTag(LocalRegion region, VersionTag tag) {
+ // only persist region needs to persist conflict tag
+ }
+
+ /**
+ * for an event containing a delta we must check to see if the tag's
+ * previous member id is the stamp's member id and ensure that the
+ * version is only incremented by 1. Otherwise the delta is being
+ * applied to a value that does not match the source of the delta.
+ *
+ * @throws InvalidDeltaException
+ */
+ private void checkForDeltaConflict(LocalRegion region,
+ long stampVersion, long tagVersion,
+ VersionStamp stamp, VersionTag tag,
+ VersionSource dmId, InternalDistributedMember sender,
+ StringBuilder verbose) {
+
+ if (tagVersion != stampVersion+1) {
+ if (verbose != null) {
+ verbose.append("\ndelta requires full value due to version mismatch");
+ }
+ region.getCachePerfStats().incDeltaFailedUpdates();
+ throw new InvalidDeltaException("delta cannot be applied due to version mismatch");
+
+ } else {
+ // make sure the tag was based on the value in this entry by checking the
+ // tag's previous-changer ID against this stamp's current ID
+ VersionSource stampID = stamp.getMemberID();
+ if (stampID == null) {
+ stampID = dmId;
+ }
+ VersionSource tagID = tag.getPreviousMemberID();
+ if (tagID == null) {
+ tagID = sender;
+ }
+ if (!tagID.equals(stampID)) {
+ if (verbose != null) {
+ verbose.append("\ndelta requires full value. tag.previous=")
+ .append(tagID).append(" but stamp.current=").append(stampID);
+ }
+ region.getCachePerfStats().incDeltaFailedUpdates();
+ throw new InvalidDeltaException("delta cannot be applied due to version ID mismatch");
+ }
+ }
+ }
+
+ private boolean processGatewayTag(EntryEvent cacheEvent) {
+ // Gateway tags are installed in the server-side LocalRegion cache
+ // modification methods. They do not have version numbers or distributed
+ // member IDs. Instead they only have timestamps and distributed system IDs.
+
+ // If there is a resolver plug-in, invoke it. Otherwise we use the timestamps and
+ // distributed system IDs to determine whether to allow the event to proceed.
+
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+
+ if (this.isRemoved() && !this.isTombstone()) {
+ return true; // no conflict on a new entry
+ }
+ EntryEventImpl event = (EntryEventImpl)cacheEvent;
+ VersionTag tag = event.getVersionTag();
+ long stampTime = getVersionStamp().getVersionTimeStamp();
+ long tagTime = tag.getVersionTimeStamp();
+ int stampDsid = getVersionStamp().getDistributedSystemId();
+ int tagDsid = tag.getDistributedSystemId();
+ if (isDebugEnabled) {
+ logger.debug("processing gateway version information for {}. Stamp dsid={} time={} Tag dsid={} time={}",
+ event.getKey(), stampDsid, stampTime, tagDsid, tagTime);
+ }
+ if (tagTime == VersionTag.ILLEGAL_VERSION_TIMESTAMP) {
+ return true; // no timestamp received from other system - just apply it
+ }
+ if (tagDsid == stampDsid || stampDsid == -1) {
+ return true;
+ }
+ GatewayConflictResolver resolver = event.getRegion().getCache().getGatewayConflictResolver();
+ if (resolver != null) {
+ if (isDebugEnabled) {
+ logger.debug("invoking gateway conflict resolver");
+ }
+ final boolean[] disallow = new boolean[1];
+ final Object[] newValue = new Object[] { this };
+ GatewayConflictHelper helper = new GatewayConflictHelper() {
+ @Override
+ public void disallowEvent() {
+ disallow[0] = true;
+ }
+
+ @Override
+ public void changeEventValue(Object v) {
+ newValue[0] = v;
+ }
+ };
+ TimestampedEntryEventImpl timestampedEvent =
+ (TimestampedEntryEventImpl)event.getTimestampedEvent(tagDsid, stampDsid, tagTime, stampTime);
+
+ // gateway conflict resolvers will usually want to see the old value
+ if (!time
<TRUNCATED>